Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
293 Чтобы.обеспечить.хорошую.производительность.и.масштабирование,.необходимо. кэшировать.информацию.из.базы.данных.в.приложении.потоковой.обработки.. Однако.управление.кэшем.может.оказаться.непростой.задачей:.как.предотвратить. устаревание.информации.в.нем?.Если.слишком.часто.обновлять.события,.то.на- грузка.на.базу.данных.все.равно.будет.большой.и.кэш.особо.не.поможет..Если.же. получать.новые.события.слишком.редко,.то.потоковая.обработка.будет.выполнять- ся.на.основе.устаревшей.информации. Но.если.бы.мы.смогли.захватывать.все.происходящие.с.таблицей.базы.данных.из- менения.в.поток.событий,.то.можно.было.бы.организовать.прослушивание.этого. потока.заданием,.которое.выполняет.потоковую.обработку,.и.обновлять.кэш.в.со- ответствии.с.событиями.изменения.базы.данных..Процесс.захвата.вносимых.в.базу. данных.изменений.в.виде.событий.потока.данных.носит.название.CDC..Разработ- чики,.использующие.Kafka.Connect,.обнаружат.там.множество.коннекторов,.пред- назначенных.для.выполнения.CDC.и.преобразования.таблиц.базы.данных.в.поток. событий.изменения..Благодаря.этому.вы.сможете.хранить.свою.собственную.копию. таблицы,.обновляя.ее.соответствующим.образом.при.получении.уведомления. о.каждом.событии.изменения.базы.данных.(рис..11.7). Рис. 11.7. Топология соединения таблицы и потока событий, благодаря которой нет необходимости использовать внешний источник данных при потоковой обработке 294 Глава 11 • Потоковая обработка Далее.при.получении.событий.переходов.пользователей.по.ссылкам.вы.сможете. найти. user_id .в.локальном.кэше.и.выполнить.обогащение.события..Благодаря. применению.локального.кэша.такое.решение.масштабируется.намного.лучше. и.не.оказывает.негативного.влияния.на.базу.данных.и.другие.использующие.ее. приложения. Мы.будем.называть.этот.вариант.соединением потока данных с таблицей.(stream- table.join),.поскольку.один.из.потоков.отражает.изменения.в.кэшируемой.локально. таблице. Соединение потоков Иногда.бывает.нужно.соединить.два.потока.событий,.а.не.поток.с.таблицей..Благо- даря.чему.поток.данных.становится.настоящим?.Как.вы.помните.из.обсуждения. в.начале.главы,.потоки.неограниченны..При.использовании.потока.для.представле- ния.таблицы.можно.смело.проигнорировать.большую.часть.исторической.инфор- мации.из.него,.поскольку.нас.в.этом.случае.интересует.только.текущее.состояние.. Но.соединение.двух.потоков.данных.означает.соединение.полной.истории.событий. и.поиск.соответствий.событий.из.одного.потока.событиям.из.другого,.относя- щимся.к.тем.же.временным.окнам.и.с.такими.же.ключами..Поэтому.соединение. потоков.называют.также.оконным соединением.(windowed-join). Например,.имеется.один.поток.данных.с.поисковыми.запросами,.которые.поль- зователи.вводили.на.нашем.веб-сайте,.а.второй.—.со.сделанными.ими.щелчками. мышью.на.ссылках,.в.том.числе.на.результатах.запросов..Нужно.найти.соответ- ствия.поисковых.запросов.результатам,.на.которых.щелкнули.пользователи,.чтобы. выяснить,.какие.результаты.наиболее.популярны.при.каком.запросе..Разумеется,. нам.хотелось.бы.найти.соответствия.результатов.по.ключевым.словам,.но.только. соответствия.в.пределах.определенного.временно'го.окна..Мы.предполагаем,.что. пользователь.выполняет.щелчок.на.результате.поиска.в.течение.нескольких.секунд. после.ввода.запроса.в.поисковую.систему..Так.что.имеет.смысл.использовать.для. каждого.потока.окна.небольшие,.длиной.в.несколько.секунд,.и.искать.соответствие. результатов.для.каждого.из.них.(рис..11.8). В.библиотеке.Kafka.Streams.это.работает.следующим.образом:.оба.потока.данных,. запросы.и.щелчки.на.ссылках.секционируются.по.одним.и.тем.же.ключам,.которые. представляют.собой.и.ключи.соединения..При.этом.все.события.щелчков.от.пользо- вателя.user_id:42.попадают.в.раздел.5.темы.событий.щелчков,.а.все.события.поиска. для.user_id:42.—.в.раздел.5.темы.событий.поиска..После.этого.Kafka.Streams.обе- спечивает.назначение.раздела.5.обеих.тем.одной.задаче.Kafka,.так.что.этой.задаче. оказываются.доступны.все.соответствующие.события.для.пользователя.user_id:42.. Она.хранит.во.встроенном.кэше.RocksDB.временно'е.окно.соединения.для.обеих. тем.и.благодаря.этому.может.выполнить.соединение. Паттерны проектирования потоковой обработки 295 Рис. 11.8. Соединение двух потоков событий. В таких случаях всегда применяется временнóе окно Внеочередные события Обработка.событий,.поступивших.в.поток.несвоевременно,.—.непростая.задача. не.только.в.потоковой.обработке,.но.и.в.традиционных.ETL-системах..Внеочеред- ные.события.—.довольно.часто.встречающееся.обыденное.явление.в.сценариях. Интернета.вещей.(IoT).(рис..11.9)..Например,.мобильное.устройство.может.по- терять.сигнал.Wi-Fi.на.несколько.часов.и.отправить.данные.за.это.время.после. восстановления.соединения..Подобное.случается.и.при.мониторинге.сетевого. оборудования.(сбойный.сетевой.коммутатор.не.отправляет.диагностических.сиг- налов.о.своем.состоянии,.пока.не.будет.починен).или.в.машиностроении.(печально. известны.своей.нестабильностью.сетевые.соединения.на.фабриках,.особенно.в.раз- вивающихся.странах). Рис. 11.9. Внеочередные события 296 Глава 11 • Потоковая обработка Наши.потоковые.приложения.должны.корректно.работать.при.подобных.сцена- риях..Обычно.это.означает,.что.такое.приложение.должно: распознать.несвоевременное.поступление.события,.для.чего.прочитать.время. события.и.определить,.что.оно.меньше.текущего; определиться.с.интервалом.времени,.в.течение.которого.оно.будет.пытаться.син- хронизировать.внеочередные.события..Скажем,.при.задержке.в.три.часа.событие. можно.синхронизировать,.а.события.трехнедельной.давности.можно.отбросить; обладать.достаточными.возможностями.для.синхронизации.данного.события.. Именно.в.этом.и.состоит.основное.различие.между.потоковыми.приложениями. и.пакетными.заданиями..Если.несколько.событий.поступили.после.завершения. ежедневного.пакетного.задания,.можно.просто.запустить.вчерашнее.задание. повторно.и.обновить.события..В.случае.же.потоковой.обработки.возможности. запустить.вчерашнее.задание.повторно.нет.—.один.и.тот.же.непрерывный.про- цесс.должен.обрабатывать.как.старые,.так.и.новые.события; иметь.возможность.обновить.результаты..Если.результаты.потоковой.обработки. записываются.в.базу.данных,.для.их.обновления.достаточно.команды. put .или. update ..В.случае.отправки.потоковым.приложением.результатов.по.электронной. почте.выполнение.обновлений.может.оказаться.более.сложной.задачей. В.нескольких.фреймворках.потоковой.обработки,.в.том.числе.Dataflow.компании. Google.и.Kafka.Streams,.есть.встроенная.поддержка.независимого.от.времени.об- работки.(основного.времени).представления.времени,.а.также.возможность.обра- ботки.событий,.время.которых.больше.или.меньше.текущего.основного.времени.. Обычно.для.этого.в.локальном.состоянии.хранятся.несколько.доступных.для. обновления.окон.агрегирования,.причем.разработчики.могут.настраивать.проме- жуток.времени,.в.течение.которого.они.доступны.для.обновления..Конечно,.чем. больше.этот.промежуток,.тем.больше.памяти.необходимо.для.хранения.локального. состояния. API.Kafka.Streams.всегда.записывает.результаты.агрегирования.в.темы.результатов.. Обычно.они.представляют.собой.сжатые.темы,.то.есть.для.каждого.ключа.сохра- няется.только.последнее.значение..При.необходимости.обновления.результатов. окна.агрегирования.вследствие.поступления.запоздавшего.события.Kafka.Streams. просто.записывает.новый.результат.для.данного.окна.агрегирования,.перезаписы- вая.предыдущий. Повторная обработка Последний.из.важных.паттернов.—.обработка.событий..Существует.два.его.вари- анта. У.нас.появилась.новая.версия.приложения.потоковой.обработки,.и.нужно.ор- ганизовать.обработку.этой.версией.того.же.потока.событий,.который.обрабаты- вает.старая,.получить.новый.поток.результатов,.не.замещающий.первой.версии,. Kafka Streams в примерах 297 сравнить.две.версии.результатов.и.в.какой-то.момент.перевести.клиентов.на. использование.новых.результатов.вместо.существующих. В.существующее.приложение.потоковой.обработки.вкралась.программная. ошибка..Мы.ее.исправили.и.хотели.бы.заново.обработать.поток.событий.и.вы- числить.новые.результаты. Первый.сценарий.основан.на.том,.что.Apache.Kafka.в.течение.длительного.времени. хранит.потоки.событий.целиком.в.масштабируемом.хранилище.данных..Это.зна- чит,.что.для.работы.двух.версий.приложения.потоковой.обработки,.записывающих. два.потока.результатов,.достаточно.выполнить.следующее. Развернуть.новую.версию.приложения.в.качестве.новой.группы.потребителей. Настроить.новую.версию.так,.чтобы.она.начала.обработку.с.первого.смещения. исходных.тем.(а.значит,.у.нее.была.своя.копия.всех.событий.из.входных.по- токов). Продолжить.работу.нового.приложения.и.переключить.клиентские.приложе- ния.на.новый.поток.результатов.после.того,.как.новая.версия.выполняющего. обработку.задания.наверстает.отставание. Второй.сценарий.сложнее.—.он.требует.перенастроить.существующее.приложение. так,.чтобы.начать.обработку.с.начала.входных.потоков.данных,.сбросить.локальное. состояние.(чтобы.не.смешались.результаты,.полученные.от.двух.версий.приложе- ния).и,.возможно,.очистить.предыдущий.выходной.поток..Хотя.в.составе.библи- отеки.Kafka.Streams.есть.утилита.для.сброса.состояния.приложения.потоковой. обработки,.мы.рекомендуем.использовать.первый.вариант.во.всех.случаях,.когда. есть.ресурсы.для.запуска.двух.копий.приложения.и.генерирования.двух.потоков. результатов..Первый.метод.намного.безопаснее.—.он.позволяет.переключаться. между.несколькими.версиями.и.сравнивать.их.результаты.без.риска.потерять. критически.важные.данные.или.внести.ошибки.в.процессе.очистки. Kafka Streams в примерах Приведем.несколько.примеров.использования.API.фреймворка.Apache.Kafka. Streams,.чтобы.продемонстрировать.реализацию.рассмотренных.паттернов.на. практике..Мы.берем.именно.этот.конкретный.API.по.причине.простоты.его.при- менения,.а.также.потому.что.он.поставляется.вместе.с.уже.имеющимся.у.вас.Apache. Kafka..Важно.помнить,.что.эти.паттерны.можно.реализовать.в.любом.фреймворке. потоковой.обработки.или.библиотеке.—.сами.паттерны.универсальны,.конкретны. только.примеры. В.Apache.Kafka.есть.два.потоковых.API.—.низкоуровневый.Processor.API.и.высо- коуровневый.Streams.DSL..Мы.воспользуемся.Kafka.Streams.DSL..Он.позволяет. задавать.приложение.потоковой.обработки.путем.описания.последовательности. преобразований.событий.потока..Преобразования.могут.быть.простыми,.например,. фильтрами,.или.сложными,.например,.соединениями.потоков..Низкоуровневый. 298 Глава 11 • Потоковая обработка API.позволяет.создавать.собственные.преобразования,.но,.как.вы.увидите,.это. редко.используется.на.практике. Создание.приложения,.задействующего.API.DSL,.всегда.начинается.с.формирова- ния.с.помощью.StreamBuilder.топологии.обработки.—.ориентированного.ацикли- ческого.графа.(DAG).преобразований,.применяемых.ко.всем.событиям.потоков.. Затем.на.основе.топологии.создается.исполняемый.объект. KafkaStreams ..При.за- пуске.объекта. KafkaStreams .создается.несколько.потоков.выполнения,.каждый. из.которых.использует.топологию.обработки.к.событиям.из.потоков..Обработка. завершается.по.закрытии.объекта. KafkaStreams Мы.рассмотрим.несколько.примеров.использования.Kafka.Streams.для.реализации. некоторых.из.обсуждавшихся.ранее.паттернов.проектирования..Для.демонстрации. паттерна.отображения/свертки.и.простых.сводных.показателей.воспользуемся. простым.примером.с.подсчетом.слов..Затем.перейдем.к.примеру.с.вычислением. различных.сводных.статистических.показателей.для.рынка.ценных.бумаг,.ко- торый.позволит.продемонстрировать.сводные.показатели.по.временным.окнам.. И.наконец,.проиллюстрируем.соединение.потоков.на.примере.обогащения.потока. переходов.по.ссылкам. Подсчет количества слов Вкратце.рассмотрим.сокращенный.вариант.примера.подсчета.слов.для.Kafka. Streams..Полный.пример.вы.можете.найти.на.GitHub.( http://www.bit.ly/2ri00gj ). Прежде.всего.при.создании.приложения.потоковой.обработки.необходимо.настроить. Kafka.Streams..У.него.есть.множество.параметров,.которые.мы.не.станем.тут.обсуж- дать,.так.как.их.описание.можно.найти.в.документации.( http://www.bit.ly/2t7obPU ).. Кроме.того,.можно.настроить.встроенные.в.Kafka.Streams.производитель.и.по- требитель,.добавив.любые.нужные.настройки.производителя.или.потребителя. в.объект.Properties: public class WordCountExample { public static void main(String[] args) throws Exception{ Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); .У.каждого.приложения.Kafka.должен.быть.свой.идентификатор.приложения.. Он.используется.для.координации.действий.экземпляров.приложения,.а.также. Kafka Streams в примерах 299 именования.внутренних.локальных.хранилищ.и.относящихся.к.ним.тем..Среди. приложений.Kafka.Streams,.работающих.в.пределах.одного.кластера,.иденти- фикаторы.не.должны.повторяться. .Приложения.Kafka.Streams.всегда.читают.данные.из.тем.Kafka.и.записывают. результаты.в.темы.Kafka..Как.мы.увидим.далее,.приложения.Kafka.Streams.так- же.применяют.Kafka.для.координации.своих.действий..Так.что.лучше.указать. приложению,.где.искать.Kafka. .Приложение.должно.выполнять.сериализацию.и.десериализацию.при.чтении. и.записи.данных,.поэтому.мы.указываем.классы,.наследующие.интерфейс. Serde для.использования.по.умолчанию. Задав.настройки,.можно.перейти.к.построению.топологии.потоков: KStreamBuilder builder = new KStreamBuilder(); KStream builder.stream("wordcount-input"); final Pattern pattern = Pattern.compile("\\W+"); KStream counts = source.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase()))) .map((key, value) -> new KeyValue 300 Глава 11 • Потоковая обработка После.описания.последовательности.выполняемых.приложением.преобразований. нам.осталось.лишь.запустить.его: KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); // Обычно потоковое приложение работает постоянно, // в данном же примере мы запустим его на некоторое время, // а затем остановим, поскольку входные данные не бесконечны. Thread.sleep(5000L); streams.close(); } } .Описываем.объект. KafkaStreams .на.основе.нашей.топологии.и.заданных.нами. свойств. .Запускаем.Kafka.Streams. .Через.некоторое.время.останавливаем. Вот.и.все!.Понадобилось.всего.несколько.строк,.чтобы.реализовать.паттерн.об- работки.отдельных.событий.(мы.выполнили.отображение,.а.затем.фильтрацию. событий)..Мы.заново.разделили.данные,.добавив.оператор. group-by ,.после.чего. на.основе.простого.локального.состояния.подсчитали.число.записей,.в.которых. каждое.уникальное.слово.является.ключом,.то.есть.количество.вхождений.каждого. из.слов. Теперь.мы.рекомендуем.запустить.полный.вариант.примера..Инструкции.по.вы- полнению.полного.примера.можно.найти.в.файле. README .репозитория.на.GitHub. ( http://www.bit.ly/2sOXzUN ). Отметим,.что.для.запуска.всего.примера.не.требуется.ничего.устанавливать,.кроме. самой.Apache.Kafka..Вы.могли.наблюдать.подобное.при.использовании.Spark,.на- пример,.в.локальном режиме.(Local.Mode)..Основное.различие:.если.во.входной.теме. несколько.разделов,.можно.путем.запуска.нескольких.экземпляров.приложения. WordCount .(подобно.запуску.приложения.в.нескольких.различных.вкладках.терми- нала).создать.свой.первый.кластер.обработки.Kafka.Streams..Экземпляры.приложе- ния. WordCount .при.этом.смогут.взаимодействовать.друг.с.другом.и.согласовывать. свою.работу..Один.из.главных.порогов.вхождения.для.Spark.—.то,.что.использовать. его.в.локальном.режиме.очень.просто,.но.для.эксплуатации.кластера.необходимо. установить.YARN.или.Mesos,.после.чего.установить.Spark.на.всех.машинах,.а.затем. разобраться.с.запуском.приложения.на.кластере..В.случае.же.API.Kafka.Streams,. можно.просто.запустить.несколько.экземпляров.приложения.—.и.кластер.готов.. Одно.и.то.же.приложение.работает.на.машине.разработчика.и.при.промышленной. эксплуатации. Kafka Streams в примерах 301 Сводные показатели фондовой биржи Следующий.пример.сложнее.—.мы.прочитаем.поток.событий.биржевых.операций,. включающий.символы.акций,.цену.и.величину.предложения..В.биржевых.операци- ях.цена предложения.(ask.price).—.это.то,.сколько.просит.за.акции.продавец,.а.цена заявки.(bid.price).—.то,.что.готов.заплатить.покупатель..Величина.предложения. (ask.size).—.число.акций,.которое.продавец.согласен.продать.по.данной.цене.. Для.упрощения.примера.мы.полностью.проигнорируем.заявки..Не.станем.также. включать.в.данные.метки.даты/времени,.вместо.этого.воспользуемся.временем. события,.передаваемым.производителем.Kafka. Затем.мы.создадим.выходные.потоки.данных,.содержащие.несколько.оконных. сводных.показателей: наилучшую,.то.есть.минимальную.цену.предложения.для.каждого.пятисекунд- ного.окна; число.сделок.для.каждого.пятисекундного.окна; среднюю.цену.предложения.для.каждого.пятисекундного.окна. Все.сводные.показатели.будут.обновляться.каждую.секунду. Для.простоты.предположим,.что.на.бирже.торгуется.лишь.десять.символов.акций.. Настройки.очень.похожи.на.те,.которые.мы.ранее.использовали.в.примере.с.под- счетом.слов.на.странице: Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stockstat"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.BROKER); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, TradeSerde.class.getName()); Основное.отличие.—.в.использовании.классов. Serde ..В.примере.подсчета.количе- ства.слов.на.странице.как.для.ключей,.так.и.для.значений.применялся.строковый. тип.данных,.а.следовательно,.в.качестве.сериализатора.и.десериализатора.для.обоих. задействовался.метод. Serdes.String() ..В.данном.же.примере.ключ.—.по-прежнему. строка,.но.значение.представляет.собой.объект.класса. Trade ,.содержащий.символ. акции,.цену.и.величину.предложения..Для.сериализации.и.десериализации.дан- ного.объекта,.а.также.нескольких.других.объектов,.применяемых.в.этом.неболь- шом.приложении,.воспользуемся.библиотекой.Gson.от.компании.Google,.чтобы. генерировать.сериализатор.и.десериализатор.JSON.на.основе.Java-объекта..Затем. создадим.небольшой.адаптер,.формирующий.из.них.объект. Serde ..Объект. Serde создаем.следующим.образом: static public final class TradeSerde extends WrapperSerde public TradeSerde() { |