Главная страница
Навигация по странице:

  • Рис. 11.13.

  • Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


    Скачать 7.59 Mb.
    НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
    Дата21.06.2022
    Размер7.59 Mb.
    Формат файлаpdf
    Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
    ТипДокументы
    #609074
    страница38 из 39
    1   ...   31   32   33   34   35   36   37   38   39
    302 Глава 11 • Потоковая обработка super(new JsonSerializer(),
    new JsonDeserializer(Trade.class));
    }
    }
    Ничего.особенного,.но.не.забудьте,.что.для.каждого.объекта,.который.вы.хотели.бы.
    хранить.в.Kafka.—.входного,.выходного,.а.в.некоторых.случаях.и.объектов.для.
    промежуточных.результатов,.—.вам.понадобится.передать.объект.класса.
    Serde
    Для.упрощения.рекомендуем.генерировать.объекты.
    Serde
    .с.помощью.таких.про- ектов,.как.GSon,.Avro,.Protobufs.и.т..п.
    Теперь,.когда.настройка.завершена,.можно.заняться.топологией:
    KStream stats = source.groupByKey()
    .aggregate(TradeStats::new,
    (k, v, tradestats) -> tradestats.add(v),
    TimeWindows.of(5000).advanceBy(1000), new TradeStatsSerde(),
    "trade-stats-store")
    .toStream((key, value) -> new TickerWindow(key.key(),
    key.window().start()))
    .mapValues((trade) -> trade.computeAvgPrice()); stats.to(new TickerWindowSerde(), new TradeStatsSerde(),
    "stockstats-output");
    .Мы.начинаем.с.чтения.событий.из.входной.темы.и.выполнения.операции.
    groupByKey()
    ..Несмотря.на.название,.эта.операция.ничего.не.группирует..
    Она.обеспечивает.разделение.потока.событий.на.разделы.по.ключам.записей..
    А.поскольку.мы.записываем.данные.в.тему.с.ключами.и.не.меняем.последние.
    до.вызова.
    groupByKey()
    ,.то.данные.остаются.разделенными.по.ключам,.так.что.
    этот.метод.в.нашем.случае.ничего.не.делает.
    .Обеспечив.правильное.секционирование,.мы.приступаем.к.оконному.агреги- рованию..Выполнение.метода.
    aggregate
    .приведет.к.разбивке.потока.данных.на.
    перекрывающиеся.окна.(пятисекундные.окна.с.обновлением.каждую.секунду).
    с.последующим.применением.агрегирующего.метода.ко.всем.событиям.каждого.
    окна..Первый.параметр.этого.метода.представляет.собой.новый.объект,.в.ко- торый.будут.помещены.результаты.агрегирования,.—.в.нашем.случае.объект.
    класса.
    Tradestats
    ..Он.был.создан.в.качестве.вместилища.всех.интересующих.
    нас.сводных.показателей.по.каждому.временному.окну:.минимальной.цены,.
    средней.цены.и.числа.сделок.
    .Далее.мы.указываем.метод.для.собственно.агрегирования.записей.—.в.данном.
    случае.метод.
    add
    .объекта.
    Tradestats
    .используется.для.обновления.значений.
    минимальной.цены,.числа.сделок.и.итоговых.цен.по.окну.при.поступлении.
    новой.записи.
    .Задаем.окно,.в.данном.случае.пятисекундное.(5000.мс),.перемещающееся.впе- ред.раз.в.секунду.

    Kafka Streams в примерах 303
    .Далее.создаем.объект.
    Serde
    .для.сериализации.и.десериализации.результатов.
    агрегирования.(объект.
    TradeStats
    ).
    .Как.уже.упоминалось.в.разделе.«Паттерны.проектирования.потоковой.обработ- ки»,.оконное.агрегирование.требует.хранения.состояния.и.локального.хранили- ща,.в.котором.его.можно.хранить..Последний.параметр.метода.агрегирования.
    как.раз.и.представляет.собой.название.хранилища.состояния..Им.может.быть.
    любое.уникальное.название.
    .Результаты.агрегирования.представляют.собой.таблицу.с.символом.акции,.вре- менным.окном.в.качестве.первичного.ключа.и.результатом.агрегирования.в.ка- честве.значения..Мы.возвращаем.таблицу.обратно.в.поток.событий.и.заменяем.
    ключ,.в.котором.содержится.определение.временного.окна.нашим.собственным.
    ключом,.содержащим.только.символ.акции.и.начальное.время.окна..Этот.метод.
    toStream
    .преобразует.таблицу.в.поток,.а.также.преобразует.ключ.в.объект.типа.
    TickerWindow
    .Последний.шаг.—.обновление.средней.цены..В.настоящий.момент.результаты.
    агрегирования.включают.сумму.цен.и.число.сделок..Мы.проходим.по.этим.за- писям.и.вычисляем.на.основе.существующих.сводных.показателей.среднюю.
    цену,.которую.затем.можно.будет.включить.в.выходной.поток.
    .И.наконец,.записываем.результаты.в.поток.
    stockstats-output
    После.описания.потока.выполнения.можно.воспользоваться.им.для.генерации.
    и.выполнения.объекта.
    KafkaStreams
    .подобно.тому,.как.мы.поступили.в.разделе.
    «Подсчет.количества.слов».
    Этот.пример.демонстрирует.возможности.выполнения.операций.оконного.агреги- рования.над.потоком.данных.—.вероятно,.самый.часто.встречающийся.сценарий.
    использования.потоковой.обработки..Стоит.отметить,.как.просто.хранить.локаль- ное.состояние.агрегирования.—.достаточно.объекта.
    Serde
    .и.названия.хранилища.
    состояния..Тем.не.менее.это.приложение.способно.масштабироваться.на.много.эк- земпляров.и.автоматически.восстанавливаться.после.сбоев.отдельных.экземпляров.
    посредством.делегирования.обработки.части.разделов.одному.из.продолжающих.
    работать.экземпляров..Мы.подробнее.рассмотрим.эту.процедуру.в.разделе.«Kafka.
    Streams:.обзор.архитектуры».далее.
    Как.обычно,.вы.можете.найти.полный.пример,.включая.инструкции.по.запуску,.на.
    GitHub.(
    http://www.bit.ly/2r6BLm1
    ).
    Обогащение потока событий перехода по ссылкам
    Последний.пример.будет.посвящен.демонстрации.соединений.потоков.путем.
    обогащения.потока.событий.перехода.по.ссылкам.на.сайте..Мы.сгенерируем.по- ток.имитационных.щелчков.по.ссылкам,.поток.обновлений.таблицы.базы.данных.
    с.фиктивными.профилями,.а.также.поток.операций.поиска.в.Сети..Затем.соединим.

    304 Глава 11 • Потоковая обработка все.три.потока,.чтобы.получить.полный.обзор.деятельности.всех.пользователей..
    Что.искали.пользователи?.По.каким.результатам.поиска.они.переходили?.Меня- ли.ли.они.список.интересов.в.своих.пользовательских.профилях?.Подобные.соеди- нения.позволяют.получить.массу.информации.для.анализа..На.такой.информации.
    часто.основаны.рекомендации.товаров:.если.пользователь.искал.велосипеды,.щел- кал.по.ссылкам.для.слова.Trek,.значит,.ему.интересны.велосипедные.путешествия,.
    так.что.можно.рекламировать.ему.велосипеды.Trek,.шлемы.и.велотуры.в.экзоти- ческие.места,.например,.штат.Небраска.
    Поскольку.настройка.приложения.такая.же,.как.в.предыдущих.примерах,.про- пустим.этот.этап.и.сразу.перейдем.к.топологии.соединения.нескольких.потоков:
    KStream views =
    builder.stream(Serdes.Integer(),
    new PageViewSerde(), Constants.PAGE_VIEW_TOPIC);
    KStream searches =
    builder.stream(Serdes.Integer(), new SearchSerde(),
    Constants.SEARCH_TOPIC);
    KTable profiles =
    builder.table(Serdes.Integer(), new ProfileSerde(),
    Constants.USER_PROFILE_TOPIC, "profile-store");
    KStream viewsWithProfile = views.leftJoin(profiles,
    (page, profile) -> new UserActivity(profile.getUserID(),
    profile.getUserName(), profile.getZipcode(),
    profile.getInterests(), "", page.getPage()));
    KStream userActivityKStream =
    viewsWithProfile.leftJoin(searches,
    (userActivity, search) ->
    userActivity.updateSearch(search.getSearchTerms()),
    JoinWindows.of(1000), Serdes.Integer(),
    new UserActivitySerde(), new SearchSerde());
    .Прежде.всего.мы.создаем.объекты.потоков.для.двух.потоков,.которые.собира- емся.объединять,.—.переходов.по.ссылкам.и.операций.поиска.
    .Создаем.также.таблицу.типа.
    KTable
    .для.профилей.пользователей..
    KTable
    .пред- ставляет.собой.локальный.кэш,.обновляемый.посредством.потока.изменений.
    .Далее.мы.обогащаем.поток.переходов.по.ссылкам.информацией.о.профилях.
    пользователей,.соединяя.поток.событий.с.таблицей.профилей..При.соединении.
    потока.данных.с.таблицей.каждое.событие.в.потоке.получает.информацию.из.
    закэшированной.копии.таблицы.профилей..Мы.выполняем.левое.внешнее.
    соединение,.так.что.в.результаты.попадут.переходы.по.ссылкам,.для.которых.
    выполнивший.их.пользователь.неизвестен.
    .Это.и.есть.метод,.выполняющий.соединение,.—.он.принимает.на.входе.два.
    значения,.одно.из.потока,.а.второе.из.записи,.и.возвращает.третье.значение..
    В.отличие.от.баз.данных,.разработчик.должен.сам.решить,.как.эти.два.значения.
    будут.объединены.в.единый.результат..В.данном.случае.мы.создали.один.объект.

    Kafka Streams: обзор архитектуры 305
    activity
    ,.содержащий.как.информацию.о.пользователе,.так.и.просмотренную.
    им.страницу.
    .Далее.необходимо.объединить.информацию.о.переходах.по.ссылкам.с.инфор- мацией.о.выполненных.соответствующим.пользователем.операциях.поиска..
    Соединение.остается.левым,.но.теперь.соединяются.два.потока,.а.не.поток.
    с.таблицей.
    .Это.и.есть.метод,.выполняющий.соединение,.—.мы.просто.добавляем.ключевые.
    слова.поиска.ко.всем.соответствующим.просмотрам.страниц.
    .А.вот.это.самое.интересное.—.соединение потока с потоком.представляет.собой.
    соединение.с.временным.окном..Соединение.всех.переходов.по.ссылкам.с.ин- формацией.об.операциях.поиска.особого.смысла.не.имеет.—.необходимо.соеди- нить.каждую.операцию.поиска.с.соответствующими.переходами.по.ссылкам,.
    то.есть.щелчками.по.ссылкам,.выполненными.в.течение.короткого.промежутка.
    времени.после.поиска..Так.что.мы.зададим.размер.окна.соединения,.равный.
    одной.секунде..То.есть.будем.считать.подходящими.щелчки.на.ссылках,.выпол- ненные.в.течение.не.более.чем.одной.секунды.после.поиска,.и.соответствующие.
    поисковые.ключевые.слова.будут.включаться.в.запись.о.действиях.пользова- теля,.содержащую.информацию.о.переходе.по.ссылке,.и.профиль.пользователя..
    Благодаря.этому.появится.возможность.провести.полный.анализ.операций.
    поиска.и.их.результатов.
    После.завершения.описания.последовательности.операций.можно.воспользовать- ся.ею.для.генерирования.и.выполнения.объекта.
    KafkaStreams
    .подобно.тому,.как.
    мы.поступили.в.разделе.«Подсчет.количества.слов».
    Этот.пример.демонстрирует,.что.в.потоковой.обработке.возможны.два.различных.
    паттерна.соединений..Один.относится.к.соединению.потока.с.таблицей.для.обога- щения.всех.событий.потока.информацией.из.таблицы..Он.напоминает.соединение.
    таблицы.фактов.с.измерением.при.выполнении.запросов.к.складу.данных..Второй.
    паттерн.относится.к.соединению.двух.потоков.на.основе.временного.окна..Эта.опе- рация.встречается.только.в.сфере.потоковой.обработки.
    Как.обычно,.вы.можете.найти.полный.пример,.включая.инструкции.по.запуску,.на.
    GitHub.(
    http://www.bit.ly/2sq096i
    ).
    Kafka Streams: обзор архитектуры
    Примеры.из.предыдущего.раздела.демонстрируют.использование.API.Kafka.
    Streams.для.реализации.нескольких.широко.известных.паттернов.проектирования.
    потоковой.обработки..Но.чтобы.лучше.понять,.как.библиотека.Kafka.Streams.на.
    самом.деле.работает.и.масштабируется,.необходимо.«заглянуть.под.капот».и.разо- браться.с.некоторыми.базовыми.принципами.архитектуры.этого.API.

    306 Глава 11 • Потоковая обработка
    Построение топологии
    Любое.потоковое.приложение.реализует.и.выполняет.по.крайней.мере.одну.топо-
    логию..Топология,.называемая.в.других.фреймворках.потоковой.обработки.также.
    DAG.(directed.acyclic.graph.—.ориентированный.ациклический.граф),.представ- ляет.собой.набор.операций.и.преобразований,.через.которые.проходят.все.события.
    на.пути.от.входных.данных.до.результатов..На.рис..11.10.показана.топология.для.
    примера.с.подсчетом.количества.слов.
    Рис. 11.10. Топология для примера подсчета числа слов с помощью потоковой обработки
    Даже.у.простых.приложений.топология.нетривиальна..Она.состоит.из.узлов.об- работки.—.узлов.графа.топологии.(на.схеме.они.представлены.овалами)..Боль- шинство.узлов.обработки.реализуют.операции.над.данными.—.фильтрацию,.ото- бражение,.агрегирование.и.т..п..Существуют.также.узлы.обработки.—.источники,.
    потребляющие.данные.из.тем.и.передающих.их.дальше,.а.также.узлы.обработ- ки.—.приемники,.получающие.данные.из.предыдущих.узлов.обработки.и.генери- рующие.их.в.тему..Топология.всегда.начинается.с.одного.или.нескольких.узлов.
    обработки.—.производителей.и.заканчивается.одним.или.несколькими.узлами.
    обработки.—.приемниками.
    Масштабирование топологии
    Kafka.Streams.масштабируется.за.счет.того,.что.внутри.одного.экземпляра.прило- жения.могут.работать.несколько.потоков.выполнения,.а.также.благодаря.балан- сировке.нагрузки.между.распределенными.экземплярами.приложения..Можно.

    Kafka Streams: обзор архитектуры 307
    запустить.приложение.Kafka.Streams.на.одной.машине.в.многопоточном.режиме.
    или.на.нескольких.машинах.—.в.любом.случае.обработкой.данных.будут.занимать- ся.все.активные.потоки.выполнения.приложения.
    Движок.Streams.распараллеливает.выполнение.топологии,.разбивая.ее.на.задачи..
    Число.задач.определяется.движком.Streams.и.зависит.от.количества.разделов.
    в.обрабатываемых.приложением.темах..Каждая.задача.отвечает.за.какое-то.под- множество.разделов:.она.подписывается.на.эти.разделы.и.читает.из.них.события..
    Для.каждого.прочитанного.события.задача.выполняет.по.порядку.все.подходящие.
    для.этого.раздела.шаги.обработки,.после.чего.записывает.результаты.в.приемник..
    Эти.задачи.—.базовая.единица.параллелизма.в.Kafka.Streams,.поскольку.любую.
    задачу.можно.выполнять.независимо.от.остальных.(рис..11.11).
    Рис. 11.11. Две задачи, реализующие одну топологию, — по одной для каждого раздела входной темы
    У.разработчика.приложения.есть.возможность.выбрать.число.потоков.выполнения.
    для.каждого.экземпляра.приложения..При.доступности.нескольких.потоков.вы- полнения.каждый.из.них.будет.выполнять.часть.создаваемых.приложением.задач..
    Если.несколько.экземпляров.приложения.работают.на.нескольких.серверах,.то.
    в.каждом.потоке.на.каждом.сервере.будут.выполняться.различные.задачи..Имен- но.таким.образом.масштабируются.потоковые.приложения:.задач.будет.столько,.
    сколько.имеется.разделов.в.обрабатываемых.темах..Если.нужно.повысить.скорость.
    обработки,.увеличьте.число.потоков.выполнения..Если.на.сервере.заканчиваются.
    ресурсы,.запустите.еще.один.экземпляр.приложения.на.другом.сервере..Kafka.
    автоматически.координирует.работу.—.каждой.задаче.будет.назначаться.свое.под- множество.разделов,.события.из.которых.она.будет.обрабатывать.независимо.от.
    других.задач,.поддерживая.собственное.локальное.состояние.с.соответствующими.
    сводными.показателями,.если.этого.требует.топология.(рис..11.12).

    308 Глава 11 • Потоковая обработка
    Рис. 11.12. Задачи потоковой обработки могут выполняться в нескольких потоках и на нескольких серверах
    Возможно,.вы.обратили.внимание.на.то,.что.для.шага.обработки.иногда.требу- ются.результаты.из.нескольких.разделов,.вследствие.чего.между.задачами.могут.
    возникать.зависимости..Например,.при.соединении.двух.потоков.данных.(как.
    в.примере.из.раздела.«Обогащение.потока.событий.перехода.по.ссылкам».ранее.
    в.этой.главе).для.получения.результата.понадобятся.данные.из.раздела.каждого.из.
    потоков..Фреймворк.Kafka.Streams.решает.эту.проблему.за.счет.назначения.всех.
    необходимых.для.одного.соединения.разделов.одной.задаче,.так.что.задачи.могут.

    Kafka Streams: обзор архитектуры 309
    читать.данные.из.всех.нужных.разделов.и.выполнять.соединение.независимо.друг.
    от.друга..Именно.поэтому.для.Kafka.Streams.требуется,.чтобы.во.всех.участвующих.
    в.операции.соединения.темах.было.одинаковое.количество.разделов.и.чтобы.они.
    были.секционированы.по.ключу.соединения.
    Еще.один.пример.возникновения.зависимостей.между.задачами.—.случай,.когда.
    для.приложения.требуется.повторное.секционирование..Например,.в.примере.
    с.потоком.событий.переходов.ключ.всех.событий.—.идентификатор.пользователя..
    Но.что.если.нам.понадобится.сгенерировать.сводные.показатели.по.страницам?.
    Или.по.почтовому.индексу?.В.подобном.случае.придется.повторно.разделить.
    данные.по.почтовому.индексу.и.выполнить.их.агрегирование.на.основе.новых.
    разделов..Если.задача.1.обрабатывает.данные.из.раздела.1.и.доходит.до.узла.обра- ботки,.который.секционирует.данные.повторно.(операция.
    groupBy
    ),.понадобится.
    перетасовка.(shuffle),.а.значит,.придется.отправлять.события.другим.задачам.для.
    обработки..В.отличие.от.других.фреймворков.потоковой.обработки.Kafka.Streams.
    выполняет.повторное.разделение.путем.записи.событий.в.новую.тему.с.новыми.
    ключами.и.разделами..Далее.иной.набор.задач.читает.эти.события.из.новой.темы.
    и.продолжает.обработку..Шаг.повторного.разделения.разбивает.топологию.на.две.
    субтопологии,.каждая.со.своими.задачами..Второй.набор.задач.зависит.от.первого,.
    поскольку.обрабатывает.результаты.первой.субтопологии..Однако.первый.и.второй.
    наборы.задач.все.же.можно.запускать.независимо.друг.от.друга.и.параллельно,.
    поскольку.первый.записывает.данные.в.тему.с.одной.скоростью,.а.второй.читает.
    и.обрабатывает.данные.оттуда.—.с.другой..Между.их.задачами.нет.никакого.вза- имодействия.и.разделения.ресурсов,.и.они.не.обязаны.работать.в.одних.потоках.
    выполнения.на.серверах..Это.одна.из.самых.полезных.черт.Kafka.—.снижение.
    количества.зависимостей.между.различными.частями.конвейера.(рис..11.13).
    Рис. 11.13. Два набора задач, обрабатывающих события, с темой для повторного разбиения на разделы событий между ними

    310 Глава 11 • Потоковая обработка
    Как пережить отказ
    Та.же.модель,.которая.позволяет.масштабировать.приложение,.дает.возможность.
    изящно.справляться.с.отказами..Получить.доступ.к.Kafka.легко,.следовательно,.
    сохраняемые.в.ней.данные.также.высокодоступны..Так.что.приложение.в.случае.
    сбоя.и.необходимости.перезапуска.может.узнать.из.Kafka.свою.последнюю.пози- цию.в.потоке.и.продолжить.обработку.с.последнего.зафиксированного.ею.перед.
    сбоем.смещения..Отметим,.что.в.случае.утраты.хранилища.локального.состояния.
    (например,.при.необходимости.замены.сервера,.на.котором.оно.находилось).пото- ковое.приложение.всегда.может.создать.его.заново.на.основе.хранящегося.в.Kafka.
    журнала.изменений.
    Фреймворк.Kafka.Streams.также.использует.предоставляемую.Kafka.координацию.
    потребителей.с.целью.обеспечения.высокой.доступности.для.задач..Если.задача.
    завершилась.неудачей,.но.есть.другие.активные.потоки.или.экземпляры.потокового.
    приложения,.ее.можно.перезапустить.в.одном.из.доступных.потоков..Это.напоми- нает.то,.как.группа.потребителей.справляется.с.отказом.одного.из.потребителей.
    группы.посредством.переназначения.его.разделов.одному.из.оставшихся.потре- бителей.
    Сценарии использования потоковой обработки
    На.протяжении.данной.главы.мы.изучали.потоковую.обработку.—.от.основных.по- нятий.и.паттернов.до.конкретных.примеров.применения.Kafka.Streams..На.данном.
    этапе.имеет.смысл.взглянуть.на.часто.встречающиеся.сценарии.использования.по- токовой.обработки..Как.объяснялось.в.начале.главы,.потоковая.обработка,.она.же.
    непрерывная.обработка,.полезна.в.тех.случаях,.когда.нужно.обрабатывать.события.
    одно.за.другим,.а.не.ждать.часами.следующего.пакета,.но.когда.ответ.также.не.ожидает- ся.в.течение.миллисекунд..Все.это.справедливо,.но.слишком.расплывчато..Рассмотрим.
    несколько.настоящих.задач,.решаемых.с.помощью.потоковой.обработки.
    ‰
    ‰
    Обслуживание клиентов. Допустим,.вы.только.что.забронировали.комнату.
    в.большой.сети.отелей.и.ожидаете.получения.подтверждения.по.электронной.
    почте.и.платежной.квитанции..Через.несколько.минут.после.бронирования,.
    когда.подтверждение.все.еще.не.прибыло,.вы.звоните.в.отдел.по.обслуживанию.
    клиентов.для.подтверждения.брони..Представьте,.что.менеджер.по.обслужива- нию.клиентов.говорит.вам:.«Я.не.вижу.заказа.в.системе,.но.пакетное.задание,.
    загружающее.данные.из.системы.бронирования.в.систему.отеля.и.систему.
    обслуживания.клиентов,.выполняется.только.раз.в.сутки,.так.что.перезвоните.
    завтра,.пожалуйста..Подтверждение.по.электронной.почте.придет.вам.в.тече- ние.2–3.рабочих.дней»..Создается.впечатление,.что.сервис.здесь.не.слишком.
    хорош,.но.у.меня.не.раз.случались.такие.разговоры.с.отелями.из.крупных.сетей..
    Желательно,.чтобы.обновленные.данные.о.бронировании.в.течение.нескольких.
    секунд.или.минут.после.него.получала.каждая.система.в.сети.отелей,.в.том.

    Сценарии использования потоковой обработки
    1   ...   31   32   33   34   35   36   37   38   39


    написать администратору сайта