Главная страница

курсовая_v1. Курсовая работа Необходимо


Скачать 2.93 Mb.
НазваниеКурсовая работа Необходимо
Дата30.03.2022
Размер2.93 Mb.
Формат файлаpdf
Имя файлакурсовая_v1.pdf
ТипКурсовая
#429949
страница3 из 6
1   2   3   4   5   6

Как сериализовать данные с помощью aiohttp.PAYLOAD_REGISTRY
from
types
import
MappingProxyType
from
typing
import
Mapping
from
aiohttp
import
PAYLOAD_REGISTRY, JsonPayload
from
aiohttp.web
import
run_app, Application, Response, View
PAYLOAD_REGISTRY.register(JsonPayload, (Mapping, MappingProxyType))
class
SomeView
(View)
:
async
def
get
(self)
:
return
Response(body={
'hello'
:
'world'
}) app = Application() app.router.add_route(
'*'
,
'/hello'
, SomeView) run_app(app)
Важно понимать, что метод json_response
, как и aiohttp.JsonPayload
, используют стандартный json.dumps
, который не умеет сериализовать сложные типы данных, например datetime.date или asyncpg.Record
(
asyncpg возвращает записи из БД в виде экземпляров этого класса). Более того, одни сложные объекты могут содержать другие: в одной записи из БД может быть поле типа datetime.date
Разработчики Python предусмотрели эту проблему: метод json.dumps позволяет с помощью аргумента default указать функцию, которая вызывается, когда необходимо сериализовать незнакомый объект. Ожидается, что функция
приведет незнакомый объект к типу, который умеет сериализовать модуль json.
Как расширить JsonPayload для сериализации произвольных объектов
import
json
from
datetime
import
date
from
functools
import
partial, singledispatch
from
typing
import
Any
from
aiohttp.payload
import
JsonPayload
as
BaseJsonPayload
from
aiohttp.typedefs
import
JSONEncoder
@singledispatch
def
convert
(value)
:
raise
NotImplementedError(
f'Unserializable value:
{value!r}
'
)
@convert.register(Record)
def
convert_asyncpg_record
(value: Record)
:
"""
Позволяет автоматически сериализовать результаты запроса, возвращаемые asyncpg
"""
return
dict(value)
@convert.register(date)
def
convert_date
(value: date)
:
"""
В проекте объект date возвращается только в одном случае — если необходимо отобразить дату рождения. Для отображения даты рождения должен использоваться формат ДД.ММ.ГГГГ
"""
return
value.strftime(
'%d.%m.%Y'
) dumps = partial(json.dumps, default=convert)
class
JsonPayload
(BaseJsonPayload)
:
def
__init__
(self, value: Any, encoding: str =
'utf-8'
, content_type: str =
'application/json'
, dumps: JSONEncoder = dumps,
*args: Any,
**kwargs: Any)
->
None
: super().__init__(value, encoding, content_type, dumps, *args,
**kwargs)
Обработчики aiohttp позволяет реализовать обработчики асинхронными функциями и классами.
Классы более расширяемы: во-первых, код, относящийся к одному обработчику, можно разместить в одном месте, а во вторых, классы позволяют использовать наследование для избавления от дублирования кода (например, каждому
обработчику требуется соединение с базой данных).
Базовый класс обработчика
from
aiohttp.web_urldispatcher
import
View
from
asyncpgsa
import
PG
class
BaseView
(View)
:
URL_PATH: str
@property
def
pg
(self)
-> PG:
return
self.request.app[
'pg'
]
Так как один большой файл читать сложно, нужно разнести обработчики по файлам. Маленькие файлы поощряют слабую связность, а если, например, есть кольцевые импорты внутри хэндлеров — значит, возможно, что-то не так с композицией сущностей.
POST /imports
На вход обработчик получает json с данными о жителях. Максимально
допустимый размер запроса в aiohttp регулируется опцией client_max_size и по умолчанию равен 2 МБ
. При превышении лимита aiohttp вернет HTTP-ответ со статусом 413: Request Entity Too Large Error.
В то же время корректный json c максимально длинными строчками и цифрами будет весить

63 мегабайта, поэтому ограничения на размер запроса необходимо расширить.
Далее, необходимо проверить и десериализовать данные. Если они некорректные, нужно вернуть HTTP-ответ
400: Bad Request.
Потребуются две схемы
Marhsmallow. Первая, CitizenSchema, проверяет данные каждого отдельного жителя, а также десериализует строку с днем рождения в объект datetime.date:

Тип данных, формат и наличие всех обязательных полей;

Отсутствие незнакомых полей;

Дата рождения должна быть указана в формате
DD.MM.YYYY и не может иметь значения из будущего;

Список родственников каждого жителя должен содержать уникальные существующие в этой выгрузке идентификаторы жителей.
Вторая схема,
ImportSchema, проверяет выгрузку в целом:
• citizen_id каждого жителя в рамках выгрузки должен быть уникален;


Родственные связи должны быть двусторонними (если у жителя #1 в списке родственников указан житель #2, то и у жителя #2 должен быть родственник
#1).
Если данные корректные, их необходимо добавить в БД с новым уникальным import_id.
Для добавления данных потребуется выполнить несколько запросов в разные таблицы. Чтобы в БД не осталось частично добавленных данных в случае возникновения ошибки или исключения (например, при отключении клиента, который не получил ответ полностью, aiohttp бросит исколючение
CancelledError
), необходимо использовать транзакцию.
Добавлять данные в таблицы необходимо частями, так как в одном запросе к
PostgreSQL может быть не более 32 767 аргументов. В таблице citizens 9 полей. Соответственно, за 1 запрос в эту таблицу можно вставить только 32 767 /
9 = 3640 строк, а в одной выгрузке может быть до 10 000 жителей.
GET /imports/$import_id/citizens
Обработчик возвращает всех жителей для выгрузки с указанным import_id
. Если указанная выгрузка не существует, необходимо вернуть HTTP-ответ 404: Not
Found. Это поведение выглядит общим для обработчиков, которым требуется существующая выгрузка, поэтому я вынес код проверки в отдельный класс.
Базовый класс для обработчиков с выгрузками
from
aiohttp.web_exceptions
import
HTTPNotFound
from
sqlalchemy
import
select, exists
from
analyzer.db.schema
import
imports_table
class
BaseImportView
(BaseView)
:
@property
def
import_id
(self)
:
return
int(self.request.match_info.get(
'import_id'
))
async
def
check_import_exists
(self)
: query = select([ exists().where(imports_table.c.import_id == self.import_id)
])
if
not
await
self.pg.fetchval(query):
raise
HTTPNotFound()
Чтобы получить список родственников для каждого жителя, потребуется выполнить
LEFT JOIN
из таблицы citizens в таблицу relations
, агрегируя поле relations.relative_id с группировкой по import_id и citizen_id
Если у жителя нет родственников, то
LEFT JOIN
вернет для него в поле relations.relative_id значение
NULL
и в результате агрегации список родственников будет выглядеть как
[NULL]
Чтобы исправить это некорректное значение, я воспользовался функцией array_remove

БД хранит дату в формате
YYYY-MM-DD
, а нам нужен формат
DD.MM.YYYY
Технически форматировать дату можно либо SQL-запросом, либо на стороне
Python в момент сериализации ответа с json.dumps
(asyncpg возвращает значение поля birth_date как экземпляр класса datetime.date
).
Я выбрал сериализацию на стороне Python, учитывая, что birth_date
— единственный объект datetime.date в проекте с единым форматом (см. раздел
«Сериализация данных»
).
Несмотря на то, что в обработчике выполняется два запроса (проверка на существование выгрузки и запрос на получение списка жителей), использовать
транзакцию необязательно. По умолчанию PostgreSQL использует уровень изоляции
READ COMMITTED
и даже в рамках одной транзакции будут видны все изменения других, успешно завершенных транзакций (добавление новых строк, изменение существующих).
Самая большая выгрузка в текстовом представлении может занимать 63 мегабайта — это достаточно много, особенно учитывая, что одновременно может прийти несколько запросов на получение данных. Есть достаточно интересный способ получать данные из БД с помощью
курсора
и отправлять их клиенту
по частям.
Для этого нам потребуется реализовать два объекта:
1. Объект
SelectQuery типа
AsyncIterable
, возвращающий записи из базы данных. При первом обращении подключается к базе, открывает транзакцию и создает курсор, при дальнейшей итерации возвращает записи из БД. Возвращается обработчиком.
Код SelectQuery
from
collections
import
AsyncIterable
from
asyncpgsa.transactionmanager
import
ConnectionTransactionContextManager
from
sqlalchemy.sql
import
Select
class
SelectQuery
(AsyncIterable)
:
"""
Используется, чтобы отправлять данные из PostgreSQL клиенту сразу после получения, по частям, без буфферизации всех данных """
PREFETCH =
500
__slots__ = (
'query'
,
'transaction_ctx'
,
'prefetch'
,
'timeout'
)
def
__init__
(self, query: Select, transaction_ctx: ConnectionTransactionContextManager, prefetch: int = None,
timeout: float = None)
: self.query = query self.transaction_ctx = transaction_ctx self.prefetch = prefetch
or
self.PREFETCH self.timeout = timeout
async
def
__aiter__
(self)
:
async
with
self.transaction_ctx
as
conn: cursor = conn.cursor(self.query, prefetch=self.prefetch, timeout=self.timeout)
async
for
row
in
cursor:
yield
row
2. Сериализатор
AsyncGenJSONListPayload
, который умеет итерироваться по асинхронным генераторам, сериализовать данные из асинхронного генератора в JSON и отправлять данные клиентам по частям.
Регистрируется в aiohttp.PAYLOAD_REGISTRY
как сериализатор объектов
AsyncIterable
Код AsyncGenJSONListPayload
import
json
from
functools
import
partial
from
aiohttp
import
Payload
# Функция, умеющая сериализовать в JSON объекты asyncpg.Record и datetime.date dumps = partial(json.dumps, default=convert, ensure_ascii=
False
)
class
AsyncGenJSONListPayload
(Payload)
:
"""
Итерируется по объектам AsyncIterable, частями сериализует данные из них в JSON и отправляет клиенту """
def
__init__
(self, value, encoding: str =
'utf-8'
, content_type: str =
'application/json'
, root_object: str =
'data'
,
*args, **kwargs)
: self.root_object = root_object super().__init__(value, content_type=content_type, encoding=encoding,
*args, **kwargs)
async
def
write
(self, writer)
:
# Начало объекта
await
writer.write(
(
'{"%s":['
% self.root_object).encode(self._encoding)
) first =
True
async
for
row
in
self._value:
# Перед первой строчкой запятая не нужнаа
if
not
first:
await
writer.write(
b','
)
else
: first =
False

await
writer.write(dumps(row).encode(self._encoding))
# Конец объекта
await
writer.write(
b']}'
)
Далее, в обработчике можно будет создать объект
SelectQuery
, передать ему SQL запрос и функцию для открытия транзакции и вернуть его в
Response body
:
Код обработчика
# analyzer/api/handlers/citizens.py
from
aiohttp.web_response
import
Response
from
aiohttp_apispec
import
docs, response_schema
from
analyzer.api.schema
import
CitizensResponseSchema
from
analyzer.db.schema
import
citizens_table
as
citizens_t
from
analyzer.utils.pg
import
SelectQuery
from
.query
import
CITIZENS_QUERY
from
.base
import
BaseImportView
class
CitizensView
(BaseImportView)
:
URL_PATH = r'/imports/{import_id:\d+}/citizens'
@docs(summary='Отобразить жителей для указанной выгрузки')
@response_schema(CitizensResponseSchema())
async
def
get
(self)
:
await
self.check_import_exists() query = CITIZENS_QUERY.where( citizens_t.c.import_id == self.import_id
) body = SelectQuery(query, self.pg.transaction())
return
Response(body=body) aiohttp обнаружит в реестре aiohttp.PAYLOAD_REGISTRY
зарегистрированный сериализатор
AsyncGenJSONListPayload для объектов типа
AsyncIterable
. Затем сериализатор будет итерироваться по объекту
SelectQuery и отправлять данные клиенту. При первом обращении объект
SelectQuery получает соединение к БД, открывает транзакцию и создает курсор, при дальнейшей итерации будет получать данные из БД курсором и возвращать их построчно.
Этот подход позволяет не выделять память на весь объем данных при каждом запросе, но у него есть особенность: приложение не сможет вернуть клиенту соответствующий HTTP-статус, если возникнет ошибка (ведь клиенту уже был отправлен HTTP-статус, заголовки, и пишутся данные).
При возникновении исключения не остается ничего, кроме как разорвать соединение. Исключение, конечно, можно залогировать, но клиент не сможет понять, какая именно ошибка произошла.
С другой стороны, похожая ситуация может возникнуть, даже если обработчик
получит все данные из БД, но при передаче данных клиенту моргнет сеть — от этого никто не застрахован.
PATCH /imports/$import_id/citizens/$citizen_id
Обработчик получает на вход идентификатор выгрузки import_id
, жителя citizen_id
, а также json с новыми данными о жителе. В случае обращения к несуществующей выгрузке или жителю необходимо вернуть HTTP-ответ
404:
Not Found
Переданные клиентом данные требуется проверить и десериализовать. Если они некорректные — необходимо вернуть HTTP-ответ
400: Bad Request
. Я реализовал Marshmallow-схему
PatchCitizenSchema
, которая проверяет:

Тип и формат данных для указанных полей.

Дату рождения. Она должна быть указана в формате
DD.MM.YYYY
и не может иметь значения из будущего.

Список родственников каждого жителя. Он должен иметь уникальные идентификаторы жителей
Существование родственников, указанных в поле relatives
, можно отдельно не проверять: при добавлении в таблицу relations несуществующего жителя
PostgreSQL вернет ошибку
ForeignKeyViolationError
, которую можно обработать и вернуть HTTP-статус
400: Bad Request
Какой статус возвращать, если клиент прислал некорректные данные для
несуществующего жителя или выгрузки? Семантически правильнее проверять сначала существование выгрузки и жителя (если такого нет — возвращать
404:
Not Found
) и только потом —корректные ли данные прислал клиент (если нет — возвращать
400: Bad Request
). На практике часто бывает дешевле сначала проверить данные, и только если они корректные, обращаться к базе.
Оба варианта приемлемы, но я решил выбрать более дешевый второй вариант, так как в любом случае результат операции — ошибка, которая ни на что не влияет (клиент исправит данные и потом так же узнает, что житель не существует).
Если данные корректные, необходимо обновить информацию о жителе в БД. В обработчике потребуется сделать несколько запросов к разным таблицам. Если возникнет ошибка или исключение, изменения в базе данных должны быть отменены, поэтому запросы необходимо выполнять в транзакции.
Метод
PATCH
позволяет передавать лишь некоторые поля для изменяемого жителя.
Обработчик необходимо написать таким образом, чтобы он не падал при обращении к данным, которые не указал клиент, а также не выполнял запросы к таблицам, данные в которых не изменились.
Если клиент указал поле relatives
, необходимо получить список существующих родственников. Если он изменился — определить, какие записи из
таблицы relatives необходимо удалить, а какие добавить, чтобы привести базу данных в соответствие с запросом клиента. По умолчанию в PostgreSQL для изоляции транзакций используется уровень READ COMMITTED
. Это означает, что в рамках текущей транзакции будут видны изменения существующих (а также добавления новых) записей других завершенных транзакций. Это может привести к состоянию гонки между конкурентными запросами.
Предположим, существует выгрузка с жителями
#1
,
#2
,
#3
, без родственных связей. Сервис получает два одновременных запроса на изменение жителя
#1:
{"relatives": [2]}
и
{"relatives": [3]}
. aiohttp создаст два обработчика, которые одновременно получат текущее состояние жителя из PostgreSQL.
Каждый обработчик не обнаружит ни одной родственной связи и примет решение добавить новую связь с указанным родственником. В результате у жителя #1 поле relatives равно
[2,3]
Такое поведение нельзя назвать очевидным. Есть два варианта ожидаемо решить исход гонки: выполнить только первый запрос, а для второго вернуть HTTP-ответ
409: Conflict
(чтобы клиент повторил запрос), либо выполнить запросы по очереди (второй запрос будет обработан только после завершения первого).
Первый вариант можно реализовать, включив режим изоляции
SERIALIZABLE
. Если во время обработки запроса кто-то уже успел изменить и закоммитить данные,
будет брошено исключение, которое можно обработать и вернуть соответствующий HTTP-статус.
Минус такого решения — большое число блокировок в
PostgreSQL,
SERIALIZABLE
будет вызывать исключение, даже если конкурентные запросы меняют записи жителей из разных выгрузок.
Также можно воспользоваться механизмом рекомендательных блокировок
. Если получить такую блокировку по import_id
, конкурентные запросы для разных выгрузок смогут выполняться параллельно.
Для обработки конкурентных запросов в одной выгрузке можно реализовать поведение любого из вариантов: функция pg_try_advisory_xact_lock пытается получить блокировку и возвращает результат boolean немедленно (если блокировку получить не удалось
— можно бросить исключение), а pg_advisory_xact_lock ожидает, пока ресурс не станет доступен для блокировки (в этом случае запросы выполнятся последовательно, я остановился на этом варианте).
В итоге обработчик должен
1   2   3   4   5   6


написать администратору сайта