Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
Не забывайте о часовых поясах Работая с временем, важно не забывать о часовых поясах. Весь конвейер дан- ных должен работать с единым часовым поясом, иначе результаты потоковых операций окажутся запутанными, а зачастую бессмысленными. Если вам нужно работать с потоками данных в различных часовых поясах, убедитесь, что можете преобразовать события к одному часовому поясу, прежде чем выполнять операции с временными окнами. Часто это означает необходимость сохранения часового пояса в самой записи. Состояние До.тех.пор.пока.требуется.обрабатывать.события.по.отдельности,.потоковая.об- работка.—.вещь.очень.простая..Например,.для.простого.чтения.потока.транзакций. о.покупках.в.интернет-магазине.из.Kafka,.поиска.среди.них.транзакции.на.сумму. более.10.000.долларов.и.отправки.сообщения.электронной.почты.о.них.соответ- ствующему.торговцу.достаточно.нескольких.строк.кода.с.использованием.потре- бителя.Kafka.и.SMTP-библиотеки. 284 Глава 11 • Потоковая обработка Наиболее.интересной.потоковая.обработка.становится.при.необходимости.вы- полнения.операций.с.несколькими.событиями:.подсчета.числа.событий.по.типам,. вычисления.скользящих.средних,.объединения.двух.потоков.данных.для.обогащения. потока.информации.и.т..д..В.подобных.случаях.недостаточно.рассматривать.события. по.отдельности..Необходимо.отслеживать.дополнительную.информацию,.напри- мер,.сколько.событий.каждого.типа.встретилось.нам.за.час,.хранить.список.всех. требующих.объединения.событий,.сумм,.средних.значений.и.т..д..Мы.будем.называть. эту.информацию,.хранимую.дополнительно.к.событиям,.состоянием.(state). Заманчиво.было.бы.хранить.состояние.в.локальных.переменных.приложения.по- токовой.обработки,.например,.хранить.скользящие.средние.в.простой.хэш-таблице.. Однако.такой.подход.к.хранению.состояния.при.потоковой.обработке.ненадежен,. поскольку.при.останове.приложения.потоковой.обработки.состояние.сбрасыва- ется,.что.приводит.к.изменению.результатов..Обычно.это.нежелательно,.так.что. не.забывайте.сохранять.последнее.состояние.и.восстанавливать.его.при.запуске. приложения. В.потоковой.обработке.используются.несколько.типов.состояния. Локальное (внутреннее) состояние. Состояние,.доступное.только.конкретному. экземпляру.приложения.потоковой.обработки..Обычно.хранится.и.контроли- руется.встроенной.базой.данных.в.оперативной.памяти,.работающей.внутри. приложения..Преимущество.локального.состояния.—.исключительная.быстрота. работы.с.ним..Недостаток.—.ваши.возможности.ограничены.объемом.доступной. памяти..В.результате.многие.паттерны.проектирования.в.сфере.потоковой.об- работки.нацелены.на.разбиение.данных.на.субпотоки,.допускающие.обработку. при.ограниченном.размере.локального.состояния. Внешнее состояние. Состояние,.хранимое.во.внешнем.хранилище.данных,. обычно.NoSQL-системе.наподобие.Cassandra..Преимущества.внешнего.состо- яния.—.практически.полное.отсутствие.ограничений.размера.и.возможность. доступа.к.нему.из.различных.экземпляров.приложения.или.даже.различных. приложений..Недостатки.—.повышение.времени.задержки.и.привносимая.еще. одной.системой.дополнительная.сложность..Большинство.приложений.по- токовой.обработки.стараются.избегать.работы.с.внешним.хранилищем.или.по. крайней.мере.ограничивать.накладные.расходы.из-за.задержки.за.счет.кэши- рования.информации.в.локальном.состоянии.и.взаимодействовать.с.внешним. хранилищем.как.можно.реже. Таблично-потоковый дуализм Все.знают,.что.такое.таблица.базы.данных..Таблица.—.это.набор.записей,.идентифи- цируемых.по.первичному.ключу.и.содержащих.набор.заданных.схемой.атрибутов.. Записи.таблицы.изменяемые,.то.есть.в.таблицах.разрешены.операции.обновления. и.удаления..С.помощью.запроса.к.таблице.можно.узнать.состояние.данных.на.кон- Основные понятия потоковой обработки 285 кретный.момент.времени..Например,.при.запросе.к.таблице. CUSTOMERS_CONTACTS базы.данных.мы.ожидаем,.что.получим.подробные.актуальные.контактные.данные. всех.наших.покупателей..Если.речь.не.идет.о.специально.созданной.«историче- ской».таблице,.то.предыдущих.контактных.данных.в.ней.не.будет. В.отличие.от.таблиц,.в.потоках.содержится.история.изменений..Потоки.пред- ставляют.собой.последовательность.событий,.в.которой.каждое.событие.яв- ляется.причиной.изменения.данных..Из.этого.описания.очевидно,.что.потоки. и.таблицы.—.две.стороны.одной.монеты:.мир.непрерывно.меняется,.и.иногда. нас.интересуют.вызвавшие.изменения.события,.а.иногда.—.текущее.состояние.. Возможности.систем,.которые.позволяют.перемещаться.между.двумя.пред- ставлениями.данных,.шире.возможностей.систем,.поддерживающих.лишь.одно. представление. Для.преобразования.потока.в.таблицу.необходимо.фиксировать.вызывающие.ее. модификацию.события..Следует.сохранить.все.события. insert ,. update .и. delete в.таблице..Большинство.СУБД.с.этой.целью.предоставляют.утилиты.для.захвата. изменений.данных.(change.data.capture,.CDC)..Кроме.того,.существует.множество. коннекторов.Kafka.для.конвейерной.передачи.этих.изменений.в.Kafka.и.дальней- шей.их.потоковой.обработки. Для.преобразования.потока.данных.в.таблицу.необходимо.применить.все.содер- жащиеся.в.этом.потоке.изменения..Этот.процесс.называется.материализацией. (materializing).потока.данных..Создается.таблица.в.оперативной.памяти,.внутрен- нем.хранилище.состояний.или.внешней.базе.данных,.после.чего.мы.проходим.по. всем.событиям.из.потока.данных,.от.начала.до.конца,.изменяя.состояние.по.мере. продвижения..По.окончании.у.нас.будет.пригодная.для.использования.таблица,. отражающая.состояние.на.конкретный.момент.времени. Допустим,.у.нас.есть.магазин,.продающий.обувь..Потоковое.представление.рознич- ных.продаж.может.представлять.собой.поток.следующих.событий. «Прибыла.партия.красных,.синих.и.зеленых.туфель». «Проданы.синие.туфли». «Проданы.красные.туфли». «Покупатель.вернул.синие.туфли». «Проданы.зеленые.туфли». Чтобы.узнать,.что.находится.на.складе.в.настоящий.момент.или.сколько.денег.мы. уже.заработали,.необходимо.материализовать.представление.(рис..11.1)..Для.того. чтобы.увидеть,.насколько.оживленно.идет.торговля,.можно.посмотреть.на.поток. данных.в.целом.и.узнать,.что.было.выполнено.пять.транзакций..Возможно,.нам. захочется.также.выяснить,.почему.вернули.синие.туфли. 286 Глава 11 • Потоковая обработка Рис. 11.1. Материализация изменений товарных остатков Временные окна Большинство.операций.над.потоками.данных.—.оконные,.то.есть.оперирующие. над.временными.интервалами:.скользящие.средние,.самые.продаваемые.товары. за.неделю,.99-й.процентиль.нагрузки.на.систему.и.т..д..Операции.объединения. двух.потоков.данных.также.носят.оконный.характер.—.при.этом.объединяются. события,.произошедшие.в.один.промежуток.времени..Очень.немногие.люди.оста- навливаются.хоть.на.секунду,.чтобы.задуматься,.какой.именно.тип.временного. окна.им.требуется..Например,.при.вычислении.скользящих.средних.необходимо. знать.следующее. Размер.окна:.нужно.вычислить.среднее.значение.по.всем.событиям.из.каждого. пятиминутного.окна?.Каждого.15-минутного.окна?.Или.за.целый.день?.Чем. больше.окно,.тем.лучше.сглаживание,.но.и.больше.отставание.—.чтобы.заметить. увеличение.цены,.понадобится.больше.времени,.чем.при.меньшем.окне. Насколько.часто.окно.сдвигается.(интервал опережения,.advance.interval):. обновлять.ли.пятиминутные.средние.значения.каждую.минуту,.секунду.или. при.каждом.поступлении.нового.события?.Окно,.размер.которого.равен.его. интервалу опережения,.иногда.называют.«кувыркающимся» (tumbling.window).. Окно,.которое.перемещается.при.каждой.новой.записи,.иногда.называют.скольз- ящим (sliding.window). В.течение.какого.времени.сохраняется.возможность.обновления.окна?.Допустим,. что.пятиминутное.скользящее.среднее.подсчитывается.для.окна.00:00–00:05.. Паттерны проектирования потоковой обработки 287 А.через.час.мы.получаем.еще.несколько.результатов,.полученных.в.00:02..Обнов- лять.ли.результаты.для.периода.00:00–00:05?.Или.что.было,.то.прошло?.Опти- мально.было.бы.задавать.определенный.промежуток.времени,.в.течение.которо- го.события.могут.добавляться.к.соответствующему.временно'му.срезу..Например,. если.они.наступили.не.позднее.чем.через.четыре.часа,.необходимо.пересчитать. и.обновить.результаты..Если.же.позже,.то.их.можно.игнорировать. Можно.выравнивать.окна.по.показаниям.часов,.то.есть.первым.срезом.пятими- нутного.окна,.перемещающегося.каждую.минуту,.будет.00:00–00:05,.а.вторым.—. 00:01–00:06..Или.можно.не.выравнивать,.а.просто.начинать.окно.с.момента.запуска. приложения,.так.что.первым.срезом.будет,.например,.03:17–03:22..Скользящие. окна.никогда.не.выравниваются,.потому.что.перемещаются.при.каждой.новой. запи.си..Различия.между.этими.типами.окон.показаны.на.рис..11.2. Рис. 11.2. «Кувыркающиеся» и «прыгающие» окна Паттерны проектирования потоковой обработки Между.собой.различаются.все.системы.потоковой.обработки,.от.простых.сочета- ний.потребителя,.логики.обработки.и.производителя.до.таких.сложных.кластеров,. как.Spark.Streaming.с.его.библиотеками.машинного.обучения,.включая.множество. промежуточных.вариантов..Но.существуют.базовые.паттерны.проектирования,. разработанные.для.удовлетворения.часто.встречающихся.требований.архитектур. потоковой.обработки..Рассмотрим.несколько.широко.известных.паттернов.и.по- кажем.примеры.их.применения. 288 Глава 11 • Потоковая обработка Обработка событий по отдельности Простейший.паттерн.потоковой.обработки.—.обработка.каждого.события.по.от- дельности..Он.известен.также.под.названием.паттерна.отображения/фильтрации,. поскольку.часто.используется.для.фильтрации.ненужных.событий.из.потока.или. преобразования.событий..(Термин.«отображение».(map).ведет.начало.от.паттерна. отображения/свертки.(map/reduce),.в.котором.на.этапе.отображения.события.пре- образуются,.после.чего.агрегируются.на.этапе.свертки.) В.этом.паттерне.приложение.потоковой.обработки.читает.события.из.потока,. модифицирует.каждое.из.них,.после.чего.генерирует.события.в.другой.поток.. В.качестве.примера.можно.привести.приложение,.читающее.журнальные.сообще- ния.из.потока.данных.и.записывающее.события. ERROR .в.поток.с.максимальным. приоритетом,.а.остальные.—.в.поток.с.минимальным.приоритетом..Еще.один.при- мер.—.приложение,.читающее.события.из.потока.данных.и.меняющее.их.формат. с.JSON.на.Avro..Подобные.приложения.могут.не.хранить.внутри.себя.состояние,. поскольку.события.могут.обрабатываться.по.отдельности..Это.значит,.что.вос- становление.после.сбоев.или.балансировка.нагрузки.чрезвычайно.упрощаются,. ведь.восстанавливать.состояние.не.нужно,.можно.просто.делегировать.обработку. событий.другому.экземпляру.приложения. Для.этого.паттерна.вполне.достаточно.простого.производителя.и.потребителя. (рис..11.3). Рис. 11.3. Топология обработки событий по отдельности Обработка с использованием локального состояния Для.большинства.приложений.потоковой.обработки.важную.роль.играет.агре- гирование.информации,.особенно.по.временным.окнам..Примером.этого.может. служить.поиск.минимальной.и.максимальной.цены.акций.для.каждого.дня.торгов. и.вычисление.скользящего.среднего. Подобное.агрегирование.требует.сохранения.состояния.потока.данных..В.нашем. примере.для.вычисления.минимальной.и.средней.цены.акций.за.день.необходимо. Паттерны проектирования потоковой обработки 289 хранить.встречавшиеся.до.сих.пор.минимальное.и.максимальное.значения.и.срав- нивать.с.ними.каждое.новое.значение.из.потока.данных. Для.этого.можно.использовать.локальное.(не.разделяемое).состояние,.посколь- ку.все.операции.в.примере.представляют.собой.агрегирование.типа.group by,.то. есть.производящееся.по.каждому.символу.акции,.а.не.по.рынку.акций.в.целом.. Чтобы.гарантировать.запись.событий.с.одним.символом.акции.в.один.раздел,.вос- пользуемся.объектом. Partitioner .Kafka..Далее.каждый.экземпляр.приложения. получит.все.события.из.назначенных.ему.разделов.(это.гарантирует.потребитель. Kafka)..Это.значит,.что.каждый.экземпляр.приложения.может.хранить.состояние. для.подмножества.символов.акций,.записанных.в.соответствующие.ему.разделы. (рис..11.4). Рис. 11.4. Топология обработки событий с применением локального состояния Приложения.потоковой.обработки.существенно.усложняются.при.наличии.у.при- ложения.локального.состояния,.так.как.возникает.несколько.проблем,.которые. должно.решить.такое.приложение. Использование памяти. Локальное.состояние.должно.помещаться.в.доступной. экземпляру.приложения.оперативной.памяти. Сохраняемость. Необходимо.гарантировать,.что.состояние.не.будет.утрачено. при.останове.экземпляров.приложения.и.есть.возможность.восстановить.его. при.повторном.их.запуске.или.замене.на.другой.экземпляр..С.этим.отлично. 290 Глава 11 • Потоковая обработка справляется.библиотека.Kafka.Streams.—.локальное.состояние.сохраняется. в.оперативной.памяти.с.помощью.встроенной.базы.RocksDB,.сохраняющей. также.данные.на.диск.для.быстрого.восстановления.после.перезапуска..Но.все. изменения.в.локальном.состоянии.отправляются.и.в.тему.Kafka..В.случае. останова.узла.потока.данных.локальное.состояние.не.утрачивается.—.его. можно.легко.восстановить.путем.повторного.чтения.событий.из.темы.Kafka.. Например,.если.локальное.состояние.содержало.текущий.минимум.для.акций. IBM.=.167,19,.оно.сохраняется.в.Kafka,.так.что.позднее.можно.будет.повторно. заполнить.локальный.кэш.на.основе.этих.данных..Kafka.применяет.для.тем.сжа- тие.журналов,.чтобы.гарантировать,.что.они.не.будут.расти.до.бесконечности,. и.иметь.возможность.восстановить.состояние. Перераспределение..Иногда.разделы.переназначаются.другому.потребителю.. При.этом.экземпляр,.у.которого.«отобрали».раздел,.должен.сохранить.по- следнее.рабочее.состояние,.а.экземпляр,.получивший.раздел,.—.восстановить. нужное.состояние. Фреймворки.потоковой.обработки.в.разной.степени.обеспечивают.разработчикам. возможность.администрирования.нужного.им.локального.состояния..Если.вашему. приложению.требуется.хранить.локальное.состояние,.проверьте,.какие.гарантии. обеспечивает.используемый.фреймворк..В.конце.главы.мы.приведем.краткое.срав- нительное.руководство.по.ним,.но,.как.всем.известно,.программное.обеспечение. меняется.очень.быстро,.особенно.фреймворки.потоковой.обработки. Многоэтапная обработка/повторное разделение на разделы Локальное.состояние.—.отличная.вещь,.если.требуется.агрегирование.типа.group by..Но.что.если.результаты.должны.задействовать.всю.доступную.информацию?. Например,.допустим,.что.нужно.каждый.день.публиковать.10.самых.быстро.расту- щих.ценных.бумаг.—.10.ценных.бумаг,.стоимость.которых.сильнее.всего.выросла.за. день.торгов.(с.открытия.торгов.до.закрытия.биржи)..Конечно,.никаких.локальных. действий.на.отдельном.экземпляре.приложения.не.будет.для.этого.достаточно,. поскольку.все.10.нужных.ценных.бумаг.могут.находиться.на.разделах,.относя- щихся.к.другим.экземплярам..Нам.понадобится.двухэтапный.подход..Сначала. нужно.вычислить.ежедневный.рост/падение.цены.для.каждого.из.символов.акций.. Это.можно.сделать.на.каждом.из.отдельных.экземпляров.с.помощью.локального.со- стояния..Затем.следует.записать.результаты.в.новую.тему.из.одного.раздела..Далее. отдельный.экземпляр.приложения.читает.этот.раздел.и.находит.там.10.наиболее. быстро.растущих.в.цене.акций..Вторая.тема,.содержащая.лишь.сводные.показатели. по.символам.акций,.очевидно,.будет.намного.меньше.(как.и.объем.ее.трафика),. чем.темы,.содержащие.саму.информацию.о.сделках,.а.значит,.ее.сможет.обрабо- тать.один.экземпляр.приложения..Иногда.для.получения.результата.необходимо. больше.шагов.(рис..11.5). Паттерны проектирования потоковой обработки 291 Рис. 11.5. Топология, включающая как использование локального состояния, так и повторное секци онирование 292 Глава 11 • Потоковая обработка Такая.разновидность.многоэтапной.обработки.хорошо.знакома.тем,.кому.случалось. писать.код.отображения/свертки,.в.котором.часто.приходится.прибегать.к.несколь- ким.этапам.свертки..Если.вы.хотя.бы.раз.писали.такой.код,.то.помните,.что.для. каждого.этапа.свертки.необходимо.отдельное.приложение..В.отличие.от.MapReduce,. при.использовании.большинства.фреймворков.потоковой.обработки.можно.вклю- чить.все.этапы.в.одно.приложение,.в.то.время.как.фреймворк.возьмет.на.себя.рас- пределение.выполнения.этапов.по.экземплярам.или.исполнителям.приложения. Обработка с применением внешнего справочника: соединение потока данных с таблицей Иногда.для.потоковой.обработки.необходима.интеграция.с.внешним.по.отношению. к.потоку.производителем.данных..Это.нужно,.например,.для.проверки.соответствия. транзакций.набору.хранимых.в.базе.данных.правил.или.для.обогащения.данных. о.маршрутах.перемещения.по.веб-сайту.пользователей.информацией.о.них. Очевидный.вариант.использования.внешнего.справочника.для.обогащения.дан- ных.выглядит.примерно.так:.при.каждом.встреченном.в.потоке.событии.перехода. пользователя.по.ссылке.находить.соответствующего.пользователя.в.базе.данных. профилей.и.записывать.в.другую.тему.событие,.включающее.первоначальный. щелчок.на.ссылке.плюс.возраст.и.пол.пользователя.(рис..11.6). Рис. 11.6. Потоковая обработка с использованием внешнего источника данных Проблема.с.этим.напрашивающимся.вариантом.состоит.в.том,.что.внешний.спра- вочник.существенно.увеличивает.время.обработки.каждой.записи.—.обычно.на. 5–15.мс..Во.многих.случаях.это.недопустимо..Зачастую.неприемлема.и.возни- кающая.дополнительная.нагрузка.на.внешнее.хранилище.—.системы.потоковой. обработки.обычно.способны.обрабатывать.100–500.тысяч.событий.в.секунду,. а.базы.данных,.вероятно,.лишь.10.тысяч.событий.в.секунду.при.сносной.произво- дительности..Хотелось.бы.найти.решение,.которое.бы.масштабировалось.лучше. |