Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
302 Глава 11 • Потоковая обработка super(new JsonSerializer new JsonDeserializer } } Ничего.особенного,.но.не.забудьте,.что.для.каждого.объекта,.который.вы.хотели.бы. хранить.в.Kafka.—.входного,.выходного,.а.в.некоторых.случаях.и.объектов.для. промежуточных.результатов,.—.вам.понадобится.передать.объект.класса. Serde Для.упрощения.рекомендуем.генерировать.объекты. Serde .с.помощью.таких.про- ектов,.как.GSon,.Avro,.Protobufs.и.т..п. Теперь,.когда.настройка.завершена,.можно.заняться.топологией: KStream .aggregate(TradeStats::new, (k, v, tradestats) -> tradestats.add(v), TimeWindows.of(5000).advanceBy(1000), new TradeStatsSerde(), "trade-stats-store") .toStream((key, value) -> new TickerWindow(key.key(), key.window().start())) .mapValues((trade) -> trade.computeAvgPrice()); stats.to(new TickerWindowSerde(), new TradeStatsSerde(), "stockstats-output"); .Мы.начинаем.с.чтения.событий.из.входной.темы.и.выполнения.операции. groupByKey() ..Несмотря.на.название,.эта.операция.ничего.не.группирует.. Она.обеспечивает.разделение.потока.событий.на.разделы.по.ключам.записей.. А.поскольку.мы.записываем.данные.в.тему.с.ключами.и.не.меняем.последние. до.вызова. groupByKey() ,.то.данные.остаются.разделенными.по.ключам,.так.что. этот.метод.в.нашем.случае.ничего.не.делает. .Обеспечив.правильное.секционирование,.мы.приступаем.к.оконному.агреги- рованию..Выполнение.метода. aggregate .приведет.к.разбивке.потока.данных.на. перекрывающиеся.окна.(пятисекундные.окна.с.обновлением.каждую.секунду). с.последующим.применением.агрегирующего.метода.ко.всем.событиям.каждого. окна..Первый.параметр.этого.метода.представляет.собой.новый.объект,.в.ко- торый.будут.помещены.результаты.агрегирования,.—.в.нашем.случае.объект. класса. Tradestats ..Он.был.создан.в.качестве.вместилища.всех.интересующих. нас.сводных.показателей.по.каждому.временному.окну:.минимальной.цены,. средней.цены.и.числа.сделок. .Далее.мы.указываем.метод.для.собственно.агрегирования.записей.—.в.данном. случае.метод. add .объекта. Tradestats .используется.для.обновления.значений. минимальной.цены,.числа.сделок.и.итоговых.цен.по.окну.при.поступлении. новой.записи. .Задаем.окно,.в.данном.случае.пятисекундное.(5000.мс),.перемещающееся.впе- ред.раз.в.секунду. Kafka Streams в примерах 303 .Далее.создаем.объект. Serde .для.сериализации.и.десериализации.результатов. агрегирования.(объект. TradeStats ). .Как.уже.упоминалось.в.разделе.«Паттерны.проектирования.потоковой.обработ- ки»,.оконное.агрегирование.требует.хранения.состояния.и.локального.хранили- ща,.в.котором.его.можно.хранить..Последний.параметр.метода.агрегирования. как.раз.и.представляет.собой.название.хранилища.состояния..Им.может.быть. любое.уникальное.название. .Результаты.агрегирования.представляют.собой.таблицу.с.символом.акции,.вре- менным.окном.в.качестве.первичного.ключа.и.результатом.агрегирования.в.ка- честве.значения..Мы.возвращаем.таблицу.обратно.в.поток.событий.и.заменяем. ключ,.в.котором.содержится.определение.временного.окна.нашим.собственным. ключом,.содержащим.только.символ.акции.и.начальное.время.окна..Этот.метод. toStream .преобразует.таблицу.в.поток,.а.также.преобразует.ключ.в.объект.типа. TickerWindow .Последний.шаг.—.обновление.средней.цены..В.настоящий.момент.результаты. агрегирования.включают.сумму.цен.и.число.сделок..Мы.проходим.по.этим.за- писям.и.вычисляем.на.основе.существующих.сводных.показателей.среднюю. цену,.которую.затем.можно.будет.включить.в.выходной.поток. .И.наконец,.записываем.результаты.в.поток. stockstats-output После.описания.потока.выполнения.можно.воспользоваться.им.для.генерации. и.выполнения.объекта. KafkaStreams .подобно.тому,.как.мы.поступили.в.разделе. «Подсчет.количества.слов». Этот.пример.демонстрирует.возможности.выполнения.операций.оконного.агреги- рования.над.потоком.данных.—.вероятно,.самый.часто.встречающийся.сценарий. использования.потоковой.обработки..Стоит.отметить,.как.просто.хранить.локаль- ное.состояние.агрегирования.—.достаточно.объекта. Serde .и.названия.хранилища. состояния..Тем.не.менее.это.приложение.способно.масштабироваться.на.много.эк- земпляров.и.автоматически.восстанавливаться.после.сбоев.отдельных.экземпляров. посредством.делегирования.обработки.части.разделов.одному.из.продолжающих. работать.экземпляров..Мы.подробнее.рассмотрим.эту.процедуру.в.разделе.«Kafka. Streams:.обзор.архитектуры».далее. Как.обычно,.вы.можете.найти.полный.пример,.включая.инструкции.по.запуску,.на. GitHub.( http://www.bit.ly/2r6BLm1 ). Обогащение потока событий перехода по ссылкам Последний.пример.будет.посвящен.демонстрации.соединений.потоков.путем. обогащения.потока.событий.перехода.по.ссылкам.на.сайте..Мы.сгенерируем.по- ток.имитационных.щелчков.по.ссылкам,.поток.обновлений.таблицы.базы.данных. с.фиктивными.профилями,.а.также.поток.операций.поиска.в.Сети..Затем.соединим. 304 Глава 11 • Потоковая обработка все.три.потока,.чтобы.получить.полный.обзор.деятельности.всех.пользователей.. Что.искали.пользователи?.По.каким.результатам.поиска.они.переходили?.Меня- ли.ли.они.список.интересов.в.своих.пользовательских.профилях?.Подобные.соеди- нения.позволяют.получить.массу.информации.для.анализа..На.такой.информации. часто.основаны.рекомендации.товаров:.если.пользователь.искал.велосипеды,.щел- кал.по.ссылкам.для.слова.Trek,.значит,.ему.интересны.велосипедные.путешествия,. так.что.можно.рекламировать.ему.велосипеды.Trek,.шлемы.и.велотуры.в.экзоти- ческие.места,.например,.штат.Небраска. Поскольку.настройка.приложения.такая.же,.как.в.предыдущих.примерах,.про- пустим.этот.этап.и.сразу.перейдем.к.топологии.соединения.нескольких.потоков: KStream builder.stream(Serdes.Integer(), new PageViewSerde(), Constants.PAGE_VIEW_TOPIC); KStream builder.stream(Serdes.Integer(), new SearchSerde(), Constants.SEARCH_TOPIC); KTable builder.table(Serdes.Integer(), new ProfileSerde(), Constants.USER_PROFILE_TOPIC, "profile-store"); KStream (page, profile) -> new UserActivity(profile.getUserID(), profile.getUserName(), profile.getZipcode(), profile.getInterests(), "", page.getPage())); KStream viewsWithProfile.leftJoin(searches, (userActivity, search) -> userActivity.updateSearch(search.getSearchTerms()), JoinWindows.of(1000), Serdes.Integer(), new UserActivitySerde(), new SearchSerde()); .Прежде.всего.мы.создаем.объекты.потоков.для.двух.потоков,.которые.собира- емся.объединять,.—.переходов.по.ссылкам.и.операций.поиска. .Создаем.также.таблицу.типа. KTable .для.профилей.пользователей.. KTable .пред- ставляет.собой.локальный.кэш,.обновляемый.посредством.потока.изменений. .Далее.мы.обогащаем.поток.переходов.по.ссылкам.информацией.о.профилях. пользователей,.соединяя.поток.событий.с.таблицей.профилей..При.соединении. потока.данных.с.таблицей.каждое.событие.в.потоке.получает.информацию.из. закэшированной.копии.таблицы.профилей..Мы.выполняем.левое.внешнее. соединение,.так.что.в.результаты.попадут.переходы.по.ссылкам,.для.которых. выполнивший.их.пользователь.неизвестен. .Это.и.есть.метод,.выполняющий.соединение,.—.он.принимает.на.входе.два. значения,.одно.из.потока,.а.второе.из.записи,.и.возвращает.третье.значение.. В.отличие.от.баз.данных,.разработчик.должен.сам.решить,.как.эти.два.значения. будут.объединены.в.единый.результат..В.данном.случае.мы.создали.один.объект. Kafka Streams: обзор архитектуры 305 activity ,.содержащий.как.информацию.о.пользователе,.так.и.просмотренную. им.страницу. .Далее.необходимо.объединить.информацию.о.переходах.по.ссылкам.с.инфор- мацией.о.выполненных.соответствующим.пользователем.операциях.поиска.. Соединение.остается.левым,.но.теперь.соединяются.два.потока,.а.не.поток. с.таблицей. .Это.и.есть.метод,.выполняющий.соединение,.—.мы.просто.добавляем.ключевые. слова.поиска.ко.всем.соответствующим.просмотрам.страниц. .А.вот.это.самое.интересное.—.соединение потока с потоком.представляет.собой. соединение.с.временным.окном..Соединение.всех.переходов.по.ссылкам.с.ин- формацией.об.операциях.поиска.особого.смысла.не.имеет.—.необходимо.соеди- нить.каждую.операцию.поиска.с.соответствующими.переходами.по.ссылкам,. то.есть.щелчками.по.ссылкам,.выполненными.в.течение.короткого.промежутка. времени.после.поиска..Так.что.мы.зададим.размер.окна.соединения,.равный. одной.секунде..То.есть.будем.считать.подходящими.щелчки.на.ссылках,.выпол- ненные.в.течение.не.более.чем.одной.секунды.после.поиска,.и.соответствующие. поисковые.ключевые.слова.будут.включаться.в.запись.о.действиях.пользова- теля,.содержащую.информацию.о.переходе.по.ссылке,.и.профиль.пользователя.. Благодаря.этому.появится.возможность.провести.полный.анализ.операций. поиска.и.их.результатов. После.завершения.описания.последовательности.операций.можно.воспользовать- ся.ею.для.генерирования.и.выполнения.объекта. KafkaStreams .подобно.тому,.как. мы.поступили.в.разделе.«Подсчет.количества.слов». Этот.пример.демонстрирует,.что.в.потоковой.обработке.возможны.два.различных. паттерна.соединений..Один.относится.к.соединению.потока.с.таблицей.для.обога- щения.всех.событий.потока.информацией.из.таблицы..Он.напоминает.соединение. таблицы.фактов.с.измерением.при.выполнении.запросов.к.складу.данных..Второй. паттерн.относится.к.соединению.двух.потоков.на.основе.временного.окна..Эта.опе- рация.встречается.только.в.сфере.потоковой.обработки. Как.обычно,.вы.можете.найти.полный.пример,.включая.инструкции.по.запуску,.на. GitHub.( http://www.bit.ly/2sq096i ). Kafka Streams: обзор архитектуры Примеры.из.предыдущего.раздела.демонстрируют.использование.API.Kafka. Streams.для.реализации.нескольких.широко.известных.паттернов.проектирования. потоковой.обработки..Но.чтобы.лучше.понять,.как.библиотека.Kafka.Streams.на. самом.деле.работает.и.масштабируется,.необходимо.«заглянуть.под.капот».и.разо- браться.с.некоторыми.базовыми.принципами.архитектуры.этого.API. 306 Глава 11 • Потоковая обработка Построение топологии Любое.потоковое.приложение.реализует.и.выполняет.по.крайней.мере.одну.топо- логию..Топология,.называемая.в.других.фреймворках.потоковой.обработки.также. DAG.(directed.acyclic.graph.—.ориентированный.ациклический.граф),.представ- ляет.собой.набор.операций.и.преобразований,.через.которые.проходят.все.события. на.пути.от.входных.данных.до.результатов..На.рис..11.10.показана.топология.для. примера.с.подсчетом.количества.слов. Рис. 11.10. Топология для примера подсчета числа слов с помощью потоковой обработки Даже.у.простых.приложений.топология.нетривиальна..Она.состоит.из.узлов.об- работки.—.узлов.графа.топологии.(на.схеме.они.представлены.овалами)..Боль- шинство.узлов.обработки.реализуют.операции.над.данными.—.фильтрацию,.ото- бражение,.агрегирование.и.т..п..Существуют.также.узлы.обработки.—.источники,. потребляющие.данные.из.тем.и.передающих.их.дальше,.а.также.узлы.обработ- ки.—.приемники,.получающие.данные.из.предыдущих.узлов.обработки.и.генери- рующие.их.в.тему..Топология.всегда.начинается.с.одного.или.нескольких.узлов. обработки.—.производителей.и.заканчивается.одним.или.несколькими.узлами. обработки.—.приемниками. Масштабирование топологии Kafka.Streams.масштабируется.за.счет.того,.что.внутри.одного.экземпляра.прило- жения.могут.работать.несколько.потоков.выполнения,.а.также.благодаря.балан- сировке.нагрузки.между.распределенными.экземплярами.приложения..Можно. Kafka Streams: обзор архитектуры 307 запустить.приложение.Kafka.Streams.на.одной.машине.в.многопоточном.режиме. или.на.нескольких.машинах.—.в.любом.случае.обработкой.данных.будут.занимать- ся.все.активные.потоки.выполнения.приложения. Движок.Streams.распараллеливает.выполнение.топологии,.разбивая.ее.на.задачи.. Число.задач.определяется.движком.Streams.и.зависит.от.количества.разделов. в.обрабатываемых.приложением.темах..Каждая.задача.отвечает.за.какое-то.под- множество.разделов:.она.подписывается.на.эти.разделы.и.читает.из.них.события.. Для.каждого.прочитанного.события.задача.выполняет.по.порядку.все.подходящие. для.этого.раздела.шаги.обработки,.после.чего.записывает.результаты.в.приемник.. Эти.задачи.—.базовая.единица.параллелизма.в.Kafka.Streams,.поскольку.любую. задачу.можно.выполнять.независимо.от.остальных.(рис..11.11). Рис. 11.11. Две задачи, реализующие одну топологию, — по одной для каждого раздела входной темы У.разработчика.приложения.есть.возможность.выбрать.число.потоков.выполнения. для.каждого.экземпляра.приложения..При.доступности.нескольких.потоков.вы- полнения.каждый.из.них.будет.выполнять.часть.создаваемых.приложением.задач.. Если.несколько.экземпляров.приложения.работают.на.нескольких.серверах,.то. в.каждом.потоке.на.каждом.сервере.будут.выполняться.различные.задачи..Имен- но.таким.образом.масштабируются.потоковые.приложения:.задач.будет.столько,. сколько.имеется.разделов.в.обрабатываемых.темах..Если.нужно.повысить.скорость. обработки,.увеличьте.число.потоков.выполнения..Если.на.сервере.заканчиваются. ресурсы,.запустите.еще.один.экземпляр.приложения.на.другом.сервере..Kafka. автоматически.координирует.работу.—.каждой.задаче.будет.назначаться.свое.под- множество.разделов,.события.из.которых.она.будет.обрабатывать.независимо.от. других.задач,.поддерживая.собственное.локальное.состояние.с.соответствующими. сводными.показателями,.если.этого.требует.топология.(рис..11.12). 308 Глава 11 • Потоковая обработка Рис. 11.12. Задачи потоковой обработки могут выполняться в нескольких потоках и на нескольких серверах Возможно,.вы.обратили.внимание.на.то,.что.для.шага.обработки.иногда.требу- ются.результаты.из.нескольких.разделов,.вследствие.чего.между.задачами.могут. возникать.зависимости..Например,.при.соединении.двух.потоков.данных.(как. в.примере.из.раздела.«Обогащение.потока.событий.перехода.по.ссылкам».ранее. в.этой.главе).для.получения.результата.понадобятся.данные.из.раздела.каждого.из. потоков..Фреймворк.Kafka.Streams.решает.эту.проблему.за.счет.назначения.всех. необходимых.для.одного.соединения.разделов.одной.задаче,.так.что.задачи.могут. Kafka Streams: обзор архитектуры 309 читать.данные.из.всех.нужных.разделов.и.выполнять.соединение.независимо.друг. от.друга..Именно.поэтому.для.Kafka.Streams.требуется,.чтобы.во.всех.участвующих. в.операции.соединения.темах.было.одинаковое.количество.разделов.и.чтобы.они. были.секционированы.по.ключу.соединения. Еще.один.пример.возникновения.зависимостей.между.задачами.—.случай,.когда. для.приложения.требуется.повторное.секционирование..Например,.в.примере. с.потоком.событий.переходов.ключ.всех.событий.—.идентификатор.пользователя.. Но.что.если.нам.понадобится.сгенерировать.сводные.показатели.по.страницам?. Или.по.почтовому.индексу?.В.подобном.случае.придется.повторно.разделить. данные.по.почтовому.индексу.и.выполнить.их.агрегирование.на.основе.новых. разделов..Если.задача.1.обрабатывает.данные.из.раздела.1.и.доходит.до.узла.обра- ботки,.который.секционирует.данные.повторно.(операция. groupBy ),.понадобится. перетасовка.(shuffle),.а.значит,.придется.отправлять.события.другим.задачам.для. обработки..В.отличие.от.других.фреймворков.потоковой.обработки.Kafka.Streams. выполняет.повторное.разделение.путем.записи.событий.в.новую.тему.с.новыми. ключами.и.разделами..Далее.иной.набор.задач.читает.эти.события.из.новой.темы. и.продолжает.обработку..Шаг.повторного.разделения.разбивает.топологию.на.две. субтопологии,.каждая.со.своими.задачами..Второй.набор.задач.зависит.от.первого,. поскольку.обрабатывает.результаты.первой.субтопологии..Однако.первый.и.второй. наборы.задач.все.же.можно.запускать.независимо.друг.от.друга.и.параллельно,. поскольку.первый.записывает.данные.в.тему.с.одной.скоростью,.а.второй.читает. и.обрабатывает.данные.оттуда.—.с.другой..Между.их.задачами.нет.никакого.вза- имодействия.и.разделения.ресурсов,.и.они.не.обязаны.работать.в.одних.потоках. выполнения.на.серверах..Это.одна.из.самых.полезных.черт.Kafka.—.снижение. количества.зависимостей.между.различными.частями.конвейера.(рис..11.13). Рис. 11.13. Два набора задач, обрабатывающих события, с темой для повторного разбиения на разделы событий между ними 310 Глава 11 • Потоковая обработка Как пережить отказ Та.же.модель,.которая.позволяет.масштабировать.приложение,.дает.возможность. изящно.справляться.с.отказами..Получить.доступ.к.Kafka.легко,.следовательно,. сохраняемые.в.ней.данные.также.высокодоступны..Так.что.приложение.в.случае. сбоя.и.необходимости.перезапуска.может.узнать.из.Kafka.свою.последнюю.пози- цию.в.потоке.и.продолжить.обработку.с.последнего.зафиксированного.ею.перед. сбоем.смещения..Отметим,.что.в.случае.утраты.хранилища.локального.состояния. (например,.при.необходимости.замены.сервера,.на.котором.оно.находилось).пото- ковое.приложение.всегда.может.создать.его.заново.на.основе.хранящегося.в.Kafka. журнала.изменений. Фреймворк.Kafka.Streams.также.использует.предоставляемую.Kafka.координацию. потребителей.с.целью.обеспечения.высокой.доступности.для.задач..Если.задача. завершилась.неудачей,.но.есть.другие.активные.потоки.или.экземпляры.потокового. приложения,.ее.можно.перезапустить.в.одном.из.доступных.потоков..Это.напоми- нает.то,.как.группа.потребителей.справляется.с.отказом.одного.из.потребителей. группы.посредством.переназначения.его.разделов.одному.из.оставшихся.потре- бителей. Сценарии использования потоковой обработки На.протяжении.данной.главы.мы.изучали.потоковую.обработку.—.от.основных.по- нятий.и.паттернов.до.конкретных.примеров.применения.Kafka.Streams..На.данном. этапе.имеет.смысл.взглянуть.на.часто.встречающиеся.сценарии.использования.по- токовой.обработки..Как.объяснялось.в.начале.главы,.потоковая.обработка,.она.же. непрерывная.обработка,.полезна.в.тех.случаях,.когда.нужно.обрабатывать.события. одно.за.другим,.а.не.ждать.часами.следующего.пакета,.но.когда.ответ.также.не.ожидает- ся.в.течение.миллисекунд..Все.это.справедливо,.но.слишком.расплывчато..Рассмотрим. несколько.настоящих.задач,.решаемых.с.помощью.потоковой.обработки. Обслуживание клиентов. Допустим,.вы.только.что.забронировали.комнату. в.большой.сети.отелей.и.ожидаете.получения.подтверждения.по.электронной. почте.и.платежной.квитанции..Через.несколько.минут.после.бронирования,. когда.подтверждение.все.еще.не.прибыло,.вы.звоните.в.отдел.по.обслуживанию. клиентов.для.подтверждения.брони..Представьте,.что.менеджер.по.обслужива- нию.клиентов.говорит.вам:.«Я.не.вижу.заказа.в.системе,.но.пакетное.задание,. загружающее.данные.из.системы.бронирования.в.систему.отеля.и.систему. обслуживания.клиентов,.выполняется.только.раз.в.сутки,.так.что.перезвоните. завтра,.пожалуйста..Подтверждение.по.электронной.почте.придет.вам.в.тече- ние.2–3.рабочих.дней»..Создается.впечатление,.что.сервис.здесь.не.слишком. хорош,.но.у.меня.не.раз.случались.такие.разговоры.с.отелями.из.крупных.сетей.. Желательно,.чтобы.обновленные.данные.о.бронировании.в.течение.нескольких. секунд.или.минут.после.него.получала.каждая.система.в.сети.отелей,.в.том. |