курсовая_v1. Курсовая работа Необходимо
Скачать 2.93 Mb.
|
from types import MappingProxyType from typing import Mapping from aiohttp import PAYLOAD_REGISTRY, JsonPayload from aiohttp.web import run_app, Application, Response, View PAYLOAD_REGISTRY.register(JsonPayload, (Mapping, MappingProxyType)) class SomeView (View) : async def get (self) : return Response(body={ 'hello' : 'world' }) app = Application() app.router.add_route( '*' , '/hello' , SomeView) run_app(app) Важно понимать, что метод json_response , как и aiohttp.JsonPayload , используют стандартный json.dumps , который не умеет сериализовать сложные типы данных, например datetime.date или asyncpg.Record ( asyncpg возвращает записи из БД в виде экземпляров этого класса). Более того, одни сложные объекты могут содержать другие: в одной записи из БД может быть поле типа datetime.date Разработчики Python предусмотрели эту проблему: метод json.dumps позволяет с помощью аргумента default указать функцию, которая вызывается, когда необходимо сериализовать незнакомый объект. Ожидается, что функция приведет незнакомый объект к типу, который умеет сериализовать модуль json. Как расширить JsonPayload для сериализации произвольных объектов import json from datetime import date from functools import partial, singledispatch from typing import Any from aiohttp.payload import JsonPayload as BaseJsonPayload from aiohttp.typedefs import JSONEncoder @singledispatch def convert (value) : raise NotImplementedError( f'Unserializable value: {value!r} ' ) @convert.register(Record) def convert_asyncpg_record (value: Record) : """ Позволяет автоматически сериализовать результаты запроса, возвращаемые asyncpg """ return dict(value) @convert.register(date) def convert_date (value: date) : """ В проекте объект date возвращается только в одном случае — если необходимо отобразить дату рождения. Для отображения даты рождения должен использоваться формат ДД.ММ.ГГГГ """ return value.strftime( '%d.%m.%Y' ) dumps = partial(json.dumps, default=convert) class JsonPayload (BaseJsonPayload) : def __init__ (self, value: Any, encoding: str = 'utf-8' , content_type: str = 'application/json' , dumps: JSONEncoder = dumps, *args: Any, **kwargs: Any) -> None : super().__init__(value, encoding, content_type, dumps, *args, **kwargs) Обработчики aiohttp позволяет реализовать обработчики асинхронными функциями и классами. Классы более расширяемы: во-первых, код, относящийся к одному обработчику, можно разместить в одном месте, а во вторых, классы позволяют использовать наследование для избавления от дублирования кода (например, каждому обработчику требуется соединение с базой данных). Базовый класс обработчика from aiohttp.web_urldispatcher import View from asyncpgsa import PG class BaseView (View) : URL_PATH: str @property def pg (self) -> PG: return self.request.app[ 'pg' ] Так как один большой файл читать сложно, нужно разнести обработчики по файлам. Маленькие файлы поощряют слабую связность, а если, например, есть кольцевые импорты внутри хэндлеров — значит, возможно, что-то не так с композицией сущностей. POST /imports На вход обработчик получает json с данными о жителях. Максимально допустимый размер запроса в aiohttp регулируется опцией client_max_size и по умолчанию равен 2 МБ . При превышении лимита aiohttp вернет HTTP-ответ со статусом 413: Request Entity Too Large Error. В то же время корректный json c максимально длинными строчками и цифрами будет весить 63 мегабайта, поэтому ограничения на размер запроса необходимо расширить. Далее, необходимо проверить и десериализовать данные. Если они некорректные, нужно вернуть HTTP-ответ 400: Bad Request. Потребуются две схемы Marhsmallow. Первая, CitizenSchema, проверяет данные каждого отдельного жителя, а также десериализует строку с днем рождения в объект datetime.date: • Тип данных, формат и наличие всех обязательных полей; • Отсутствие незнакомых полей; • Дата рождения должна быть указана в формате DD.MM.YYYY и не может иметь значения из будущего; • Список родственников каждого жителя должен содержать уникальные существующие в этой выгрузке идентификаторы жителей. Вторая схема, ImportSchema, проверяет выгрузку в целом: • citizen_id каждого жителя в рамках выгрузки должен быть уникален; • Родственные связи должны быть двусторонними (если у жителя #1 в списке родственников указан житель #2, то и у жителя #2 должен быть родственник #1). Если данные корректные, их необходимо добавить в БД с новым уникальным import_id. Для добавления данных потребуется выполнить несколько запросов в разные таблицы. Чтобы в БД не осталось частично добавленных данных в случае возникновения ошибки или исключения (например, при отключении клиента, который не получил ответ полностью, aiohttp бросит исколючение CancelledError ), необходимо использовать транзакцию. Добавлять данные в таблицы необходимо частями, так как в одном запросе к PostgreSQL может быть не более 32 767 аргументов. В таблице citizens 9 полей. Соответственно, за 1 запрос в эту таблицу можно вставить только 32 767 / 9 = 3640 строк, а в одной выгрузке может быть до 10 000 жителей. GET /imports/$import_id/citizens Обработчик возвращает всех жителей для выгрузки с указанным import_id . Если указанная выгрузка не существует, необходимо вернуть HTTP-ответ 404: Not Found. Это поведение выглядит общим для обработчиков, которым требуется существующая выгрузка, поэтому я вынес код проверки в отдельный класс. Базовый класс для обработчиков с выгрузками from aiohttp.web_exceptions import HTTPNotFound from sqlalchemy import select, exists from analyzer.db.schema import imports_table class BaseImportView (BaseView) : @property def import_id (self) : return int(self.request.match_info.get( 'import_id' )) async def check_import_exists (self) : query = select([ exists().where(imports_table.c.import_id == self.import_id) ]) if not await self.pg.fetchval(query): raise HTTPNotFound() Чтобы получить список родственников для каждого жителя, потребуется выполнить LEFT JOIN из таблицы citizens в таблицу relations , агрегируя поле relations.relative_id с группировкой по import_id и citizen_id Если у жителя нет родственников, то LEFT JOIN вернет для него в поле relations.relative_id значение NULL и в результате агрегации список родственников будет выглядеть как [NULL] Чтобы исправить это некорректное значение, я воспользовался функцией array_remove БД хранит дату в формате YYYY-MM-DD , а нам нужен формат DD.MM.YYYY Технически форматировать дату можно либо SQL-запросом, либо на стороне Python в момент сериализации ответа с json.dumps (asyncpg возвращает значение поля birth_date как экземпляр класса datetime.date ). Я выбрал сериализацию на стороне Python, учитывая, что birth_date — единственный объект datetime.date в проекте с единым форматом (см. раздел «Сериализация данных» ). Несмотря на то, что в обработчике выполняется два запроса (проверка на существование выгрузки и запрос на получение списка жителей), использовать транзакцию необязательно. По умолчанию PostgreSQL использует уровень изоляции READ COMMITTED и даже в рамках одной транзакции будут видны все изменения других, успешно завершенных транзакций (добавление новых строк, изменение существующих). Самая большая выгрузка в текстовом представлении может занимать 63 мегабайта — это достаточно много, особенно учитывая, что одновременно может прийти несколько запросов на получение данных. Есть достаточно интересный способ получать данные из БД с помощью курсора и отправлять их клиенту по частям. Для этого нам потребуется реализовать два объекта: 1. Объект SelectQuery типа AsyncIterable , возвращающий записи из базы данных. При первом обращении подключается к базе, открывает транзакцию и создает курсор, при дальнейшей итерации возвращает записи из БД. Возвращается обработчиком. Код SelectQuery from collections import AsyncIterable from asyncpgsa.transactionmanager import ConnectionTransactionContextManager from sqlalchemy.sql import Select class SelectQuery (AsyncIterable) : """ Используется, чтобы отправлять данные из PostgreSQL клиенту сразу после получения, по частям, без буфферизации всех данных """ PREFETCH = 500 __slots__ = ( 'query' , 'transaction_ctx' , 'prefetch' , 'timeout' ) def __init__ (self, query: Select, transaction_ctx: ConnectionTransactionContextManager, prefetch: int = None, timeout: float = None) : self.query = query self.transaction_ctx = transaction_ctx self.prefetch = prefetch or self.PREFETCH self.timeout = timeout async def __aiter__ (self) : async with self.transaction_ctx as conn: cursor = conn.cursor(self.query, prefetch=self.prefetch, timeout=self.timeout) async for row in cursor: yield row 2. Сериализатор AsyncGenJSONListPayload , который умеет итерироваться по асинхронным генераторам, сериализовать данные из асинхронного генератора в JSON и отправлять данные клиентам по частям. Регистрируется в aiohttp.PAYLOAD_REGISTRY как сериализатор объектов AsyncIterable Код AsyncGenJSONListPayload import json from functools import partial from aiohttp import Payload # Функция, умеющая сериализовать в JSON объекты asyncpg.Record и datetime.date dumps = partial(json.dumps, default=convert, ensure_ascii= False ) class AsyncGenJSONListPayload (Payload) : """ Итерируется по объектам AsyncIterable, частями сериализует данные из них в JSON и отправляет клиенту """ def __init__ (self, value, encoding: str = 'utf-8' , content_type: str = 'application/json' , root_object: str = 'data' , *args, **kwargs) : self.root_object = root_object super().__init__(value, content_type=content_type, encoding=encoding, *args, **kwargs) async def write (self, writer) : # Начало объекта await writer.write( ( '{"%s":[' % self.root_object).encode(self._encoding) ) first = True async for row in self._value: # Перед первой строчкой запятая не нужнаа if not first: await writer.write( b',' ) else : first = False await writer.write(dumps(row).encode(self._encoding)) # Конец объекта await writer.write( b']}' ) Далее, в обработчике можно будет создать объект SelectQuery , передать ему SQL запрос и функцию для открытия транзакции и вернуть его в Response body : Код обработчика # analyzer/api/handlers/citizens.py from aiohttp.web_response import Response from aiohttp_apispec import docs, response_schema from analyzer.api.schema import CitizensResponseSchema from analyzer.db.schema import citizens_table as citizens_t from analyzer.utils.pg import SelectQuery from .query import CITIZENS_QUERY from .base import BaseImportView class CitizensView (BaseImportView) : URL_PATH = r'/imports/{import_id:\d+}/citizens' @docs(summary='Отобразить жителей для указанной выгрузки') @response_schema(CitizensResponseSchema()) async def get (self) : await self.check_import_exists() query = CITIZENS_QUERY.where( citizens_t.c.import_id == self.import_id ) body = SelectQuery(query, self.pg.transaction()) return Response(body=body) aiohttp обнаружит в реестре aiohttp.PAYLOAD_REGISTRY зарегистрированный сериализатор AsyncGenJSONListPayload для объектов типа AsyncIterable . Затем сериализатор будет итерироваться по объекту SelectQuery и отправлять данные клиенту. При первом обращении объект SelectQuery получает соединение к БД, открывает транзакцию и создает курсор, при дальнейшей итерации будет получать данные из БД курсором и возвращать их построчно. Этот подход позволяет не выделять память на весь объем данных при каждом запросе, но у него есть особенность: приложение не сможет вернуть клиенту соответствующий HTTP-статус, если возникнет ошибка (ведь клиенту уже был отправлен HTTP-статус, заголовки, и пишутся данные). При возникновении исключения не остается ничего, кроме как разорвать соединение. Исключение, конечно, можно залогировать, но клиент не сможет понять, какая именно ошибка произошла. С другой стороны, похожая ситуация может возникнуть, даже если обработчик получит все данные из БД, но при передаче данных клиенту моргнет сеть — от этого никто не застрахован. PATCH /imports/$import_id/citizens/$citizen_id Обработчик получает на вход идентификатор выгрузки import_id , жителя citizen_id , а также json с новыми данными о жителе. В случае обращения к несуществующей выгрузке или жителю необходимо вернуть HTTP-ответ 404: Not Found Переданные клиентом данные требуется проверить и десериализовать. Если они некорректные — необходимо вернуть HTTP-ответ 400: Bad Request . Я реализовал Marshmallow-схему PatchCitizenSchema , которая проверяет: • Тип и формат данных для указанных полей. • Дату рождения. Она должна быть указана в формате DD.MM.YYYY и не может иметь значения из будущего. • Список родственников каждого жителя. Он должен иметь уникальные идентификаторы жителей Существование родственников, указанных в поле relatives , можно отдельно не проверять: при добавлении в таблицу relations несуществующего жителя PostgreSQL вернет ошибку ForeignKeyViolationError , которую можно обработать и вернуть HTTP-статус 400: Bad Request Какой статус возвращать, если клиент прислал некорректные данные для несуществующего жителя или выгрузки? Семантически правильнее проверять сначала существование выгрузки и жителя (если такого нет — возвращать 404: Not Found ) и только потом —корректные ли данные прислал клиент (если нет — возвращать 400: Bad Request ). На практике часто бывает дешевле сначала проверить данные, и только если они корректные, обращаться к базе. Оба варианта приемлемы, но я решил выбрать более дешевый второй вариант, так как в любом случае результат операции — ошибка, которая ни на что не влияет (клиент исправит данные и потом так же узнает, что житель не существует). Если данные корректные, необходимо обновить информацию о жителе в БД. В обработчике потребуется сделать несколько запросов к разным таблицам. Если возникнет ошибка или исключение, изменения в базе данных должны быть отменены, поэтому запросы необходимо выполнять в транзакции. Метод PATCH позволяет передавать лишь некоторые поля для изменяемого жителя. Обработчик необходимо написать таким образом, чтобы он не падал при обращении к данным, которые не указал клиент, а также не выполнял запросы к таблицам, данные в которых не изменились. Если клиент указал поле relatives , необходимо получить список существующих родственников. Если он изменился — определить, какие записи из таблицы relatives необходимо удалить, а какие добавить, чтобы привести базу данных в соответствие с запросом клиента. По умолчанию в PostgreSQL для изоляции транзакций используется уровень READ COMMITTED . Это означает, что в рамках текущей транзакции будут видны изменения существующих (а также добавления новых) записей других завершенных транзакций. Это может привести к состоянию гонки между конкурентными запросами. Предположим, существует выгрузка с жителями #1 , #2 , #3 , без родственных связей. Сервис получает два одновременных запроса на изменение жителя #1: {"relatives": [2]} и {"relatives": [3]} . aiohttp создаст два обработчика, которые одновременно получат текущее состояние жителя из PostgreSQL. Каждый обработчик не обнаружит ни одной родственной связи и примет решение добавить новую связь с указанным родственником. В результате у жителя #1 поле relatives равно [2,3] Такое поведение нельзя назвать очевидным. Есть два варианта ожидаемо решить исход гонки: выполнить только первый запрос, а для второго вернуть HTTP-ответ 409: Conflict (чтобы клиент повторил запрос), либо выполнить запросы по очереди (второй запрос будет обработан только после завершения первого). Первый вариант можно реализовать, включив режим изоляции SERIALIZABLE . Если во время обработки запроса кто-то уже успел изменить и закоммитить данные, будет брошено исключение, которое можно обработать и вернуть соответствующий HTTP-статус. Минус такого решения — большое число блокировок в PostgreSQL, SERIALIZABLE будет вызывать исключение, даже если конкурентные запросы меняют записи жителей из разных выгрузок. Также можно воспользоваться механизмом рекомендательных блокировок . Если получить такую блокировку по import_id , конкурентные запросы для разных выгрузок смогут выполняться параллельно. Для обработки конкурентных запросов в одной выгрузке можно реализовать поведение любого из вариантов: функция pg_try_advisory_xact_lock пытается получить блокировку и возвращает результат boolean немедленно (если блокировку получить не удалось — можно бросить исключение), а pg_advisory_xact_lock ожидает, пока ресурс не станет доступен для блокировки (в этом случае запросы выполнятся последовательно, я остановился на этом варианте). В итоге обработчик должен |