Главная страница
Навигация по странице:

  • Рис. 11.9.

  • Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


    Скачать 7.59 Mb.
    НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
    Дата21.06.2022
    Размер7.59 Mb.
    Формат файлаpdf
    Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
    ТипДокументы
    #609074
    страница37 из 39
    1   ...   31   32   33   34   35   36   37   38   39
    293
    Чтобы.обеспечить.хорошую.производительность.и.масштабирование,.необходимо.
    кэшировать.информацию.из.базы.данных.в.приложении.потоковой.обработки..
    Однако.управление.кэшем.может.оказаться.непростой.задачей:.как.предотвратить.
    устаревание.информации.в.нем?.Если.слишком.часто.обновлять.события,.то.на- грузка.на.базу.данных.все.равно.будет.большой.и.кэш.особо.не.поможет..Если.же.
    получать.новые.события.слишком.редко,.то.потоковая.обработка.будет.выполнять- ся.на.основе.устаревшей.информации.
    Но.если.бы.мы.смогли.захватывать.все.происходящие.с.таблицей.базы.данных.из- менения.в.поток.событий,.то.можно.было.бы.организовать.прослушивание.этого.
    потока.заданием,.которое.выполняет.потоковую.обработку,.и.обновлять.кэш.в.со- ответствии.с.событиями.изменения.базы.данных..Процесс.захвата.вносимых.в.базу.
    данных.изменений.в.виде.событий.потока.данных.носит.название.CDC..Разработ- чики,.использующие.Kafka.Connect,.обнаружат.там.множество.коннекторов,.пред- назначенных.для.выполнения.CDC.и.преобразования.таблиц.базы.данных.в.поток.
    событий.изменения..Благодаря.этому.вы.сможете.хранить.свою.собственную.копию.
    таблицы,.обновляя.ее.соответствующим.образом.при.получении.уведомления.
    о.каждом.событии.изменения.базы.данных.(рис..11.7).
    Рис. 11.7. Топология соединения таблицы и потока событий, благодаря которой нет необходимости использовать внешний источник данных при потоковой обработке

    294 Глава 11 • Потоковая обработка
    Далее.при.получении.событий.переходов.пользователей.по.ссылкам.вы.сможете.
    найти.
    user_id
    .в.локальном.кэше.и.выполнить.обогащение.события..Благодаря.
    применению.локального.кэша.такое.решение.масштабируется.намного.лучше.
    и.не.оказывает.негативного.влияния.на.базу.данных.и.другие.использующие.ее.
    приложения.
    Мы.будем.называть.этот.вариант.соединением потока данных с таблицей.(stream- table.join),.поскольку.один.из.потоков.отражает.изменения.в.кэшируемой.локально.
    таблице.
    Соединение потоков
    Иногда.бывает.нужно.соединить.два.потока.событий,.а.не.поток.с.таблицей..Благо- даря.чему.поток.данных.становится.настоящим?.Как.вы.помните.из.обсуждения.
    в.начале.главы,.потоки.неограниченны..При.использовании.потока.для.представле- ния.таблицы.можно.смело.проигнорировать.большую.часть.исторической.инфор- мации.из.него,.поскольку.нас.в.этом.случае.интересует.только.текущее.состояние..
    Но.соединение.двух.потоков.данных.означает.соединение.полной.истории.событий.
    и.поиск.соответствий.событий.из.одного.потока.событиям.из.другого,.относя- щимся.к.тем.же.временным.окнам.и.с.такими.же.ключами..Поэтому.соединение.
    потоков.называют.также.оконным соединением.(windowed-join).
    Например,.имеется.один.поток.данных.с.поисковыми.запросами,.которые.поль- зователи.вводили.на.нашем.веб-сайте,.а.второй.—.со.сделанными.ими.щелчками.
    мышью.на.ссылках,.в.том.числе.на.результатах.запросов..Нужно.найти.соответ- ствия.поисковых.запросов.результатам,.на.которых.щелкнули.пользователи,.чтобы.
    выяснить,.какие.результаты.наиболее.популярны.при.каком.запросе..Разумеется,.
    нам.хотелось.бы.найти.соответствия.результатов.по.ключевым.словам,.но.только.
    соответствия.в.пределах.определенного.временно'го.окна..Мы.предполагаем,.что.
    пользователь.выполняет.щелчок.на.результате.поиска.в.течение.нескольких.секунд.
    после.ввода.запроса.в.поисковую.систему..Так.что.имеет.смысл.использовать.для.
    каждого.потока.окна.небольшие,.длиной.в.несколько.секунд,.и.искать.соответствие.
    результатов.для.каждого.из.них.(рис..11.8).
    В.библиотеке.Kafka.Streams.это.работает.следующим.образом:.оба.потока.данных,.
    запросы.и.щелчки.на.ссылках.секционируются.по.одним.и.тем.же.ключам,.которые.
    представляют.собой.и.ключи.соединения..При.этом.все.события.щелчков.от.пользо- вателя.user_id:42.попадают.в.раздел.5.темы.событий.щелчков,.а.все.события.поиска.
    для.user_id:42.—.в.раздел.5.темы.событий.поиска..После.этого.Kafka.Streams.обе- спечивает.назначение.раздела.5.обеих.тем.одной.задаче.Kafka,.так.что.этой.задаче.
    оказываются.доступны.все.соответствующие.события.для.пользователя.user_id:42..
    Она.хранит.во.встроенном.кэше.RocksDB.временно'е.окно.соединения.для.обеих.
    тем.и.благодаря.этому.может.выполнить.соединение.

    Паттерны проектирования потоковой обработки 295
    Рис. 11.8. Соединение двух потоков событий. В таких случаях всегда применяется временнóе окно
    Внеочередные события
    Обработка.событий,.поступивших.в.поток.несвоевременно,.—.непростая.задача.
    не.только.в.потоковой.обработке,.но.и.в.традиционных.ETL-системах..Внеочеред- ные.события.—.довольно.часто.встречающееся.обыденное.явление.в.сценариях.
    Интернета.вещей.(IoT).(рис..11.9)..Например,.мобильное.устройство.может.по- терять.сигнал.Wi-Fi.на.несколько.часов.и.отправить.данные.за.это.время.после.
    восстановления.соединения..Подобное.случается.и.при.мониторинге.сетевого.
    оборудования.(сбойный.сетевой.коммутатор.не.отправляет.диагностических.сиг- налов.о.своем.состоянии,.пока.не.будет.починен).или.в.машиностроении.(печально.
    известны.своей.нестабильностью.сетевые.соединения.на.фабриках,.особенно.в.раз- вивающихся.странах).
    Рис. 11.9. Внеочередные события

    296 Глава 11 • Потоковая обработка
    Наши.потоковые.приложения.должны.корректно.работать.при.подобных.сцена- риях..Обычно.это.означает,.что.такое.приложение.должно:
    ‰
    ‰
    распознать.несвоевременное.поступление.события,.для.чего.прочитать.время.
    события.и.определить,.что.оно.меньше.текущего;
    ‰
    ‰
    определиться.с.интервалом.времени,.в.течение.которого.оно.будет.пытаться.син- хронизировать.внеочередные.события..Скажем,.при.задержке.в.три.часа.событие.
    можно.синхронизировать,.а.события.трехнедельной.давности.можно.отбросить;
    ‰
    ‰
    обладать.достаточными.возможностями.для.синхронизации.данного.события..
    Именно.в.этом.и.состоит.основное.различие.между.потоковыми.приложениями.
    и.пакетными.заданиями..Если.несколько.событий.поступили.после.завершения.
    ежедневного.пакетного.задания,.можно.просто.запустить.вчерашнее.задание.
    повторно.и.обновить.события..В.случае.же.потоковой.обработки.возможности.
    запустить.вчерашнее.задание.повторно.нет.—.один.и.тот.же.непрерывный.про- цесс.должен.обрабатывать.как.старые,.так.и.новые.события;
    ‰
    ‰
    иметь.возможность.обновить.результаты..Если.результаты.потоковой.обработки.
    записываются.в.базу.данных,.для.их.обновления.достаточно.команды.
    put
    .или.
    update
    ..В.случае.отправки.потоковым.приложением.результатов.по.электронной.
    почте.выполнение.обновлений.может.оказаться.более.сложной.задачей.
    В.нескольких.фреймворках.потоковой.обработки,.в.том.числе.Dataflow.компании.
    Google.и.Kafka.Streams,.есть.встроенная.поддержка.независимого.от.времени.об- работки.(основного.времени).представления.времени,.а.также.возможность.обра- ботки.событий,.время.которых.больше.или.меньше.текущего.основного.времени..
    Обычно.для.этого.в.локальном.состоянии.хранятся.несколько.доступных.для.
    обновления.окон.агрегирования,.причем.разработчики.могут.настраивать.проме- жуток.времени,.в.течение.которого.они.доступны.для.обновления..Конечно,.чем.
    больше.этот.промежуток,.тем.больше.памяти.необходимо.для.хранения.локального.
    состояния.
    API.Kafka.Streams.всегда.записывает.результаты.агрегирования.в.темы.результатов..
    Обычно.они.представляют.собой.сжатые.темы,.то.есть.для.каждого.ключа.сохра- няется.только.последнее.значение..При.необходимости.обновления.результатов.
    окна.агрегирования.вследствие.поступления.запоздавшего.события.Kafka.Streams.
    просто.записывает.новый.результат.для.данного.окна.агрегирования,.перезаписы- вая.предыдущий.
    Повторная обработка
    Последний.из.важных.паттернов.—.обработка.событий..Существует.два.его.вари- анта.
    ‰
    ‰
    У.нас.появилась.новая.версия.приложения.потоковой.обработки,.и.нужно.ор- ганизовать.обработку.этой.версией.того.же.потока.событий,.который.обрабаты- вает.старая,.получить.новый.поток.результатов,.не.замещающий.первой.версии,.

    Kafka Streams в примерах 297
    сравнить.две.версии.результатов.и.в.какой-то.момент.перевести.клиентов.на.
    использование.новых.результатов.вместо.существующих.
    ‰
    ‰
    В.существующее.приложение.потоковой.обработки.вкралась.программная.
    ошибка..Мы.ее.исправили.и.хотели.бы.заново.обработать.поток.событий.и.вы- числить.новые.результаты.
    Первый.сценарий.основан.на.том,.что.Apache.Kafka.в.течение.длительного.времени.
    хранит.потоки.событий.целиком.в.масштабируемом.хранилище.данных..Это.зна- чит,.что.для.работы.двух.версий.приложения.потоковой.обработки,.записывающих.
    два.потока.результатов,.достаточно.выполнить.следующее.
    ‰
    ‰
    Развернуть.новую.версию.приложения.в.качестве.новой.группы.потребителей.
    ‰
    ‰
    Настроить.новую.версию.так,.чтобы.она.начала.обработку.с.первого.смещения.
    исходных.тем.(а.значит,.у.нее.была.своя.копия.всех.событий.из.входных.по- токов).
    ‰
    ‰
    Продолжить.работу.нового.приложения.и.переключить.клиентские.приложе- ния.на.новый.поток.результатов.после.того,.как.новая.версия.выполняющего.
    обработку.задания.наверстает.отставание.
    Второй.сценарий.сложнее.—.он.требует.перенастроить.существующее.приложение.
    так,.чтобы.начать.обработку.с.начала.входных.потоков.данных,.сбросить.локальное.
    состояние.(чтобы.не.смешались.результаты,.полученные.от.двух.версий.приложе- ния).и,.возможно,.очистить.предыдущий.выходной.поток..Хотя.в.составе.библи- отеки.Kafka.Streams.есть.утилита.для.сброса.состояния.приложения.потоковой.
    обработки,.мы.рекомендуем.использовать.первый.вариант.во.всех.случаях,.когда.
    есть.ресурсы.для.запуска.двух.копий.приложения.и.генерирования.двух.потоков.
    результатов..Первый.метод.намного.безопаснее.—.он.позволяет.переключаться.
    между.несколькими.версиями.и.сравнивать.их.результаты.без.риска.потерять.
    критически.важные.данные.или.внести.ошибки.в.процессе.очистки.
    Kafka Streams в примерах
    Приведем.несколько.примеров.использования.API.фреймворка.Apache.Kafka.
    Streams,.чтобы.продемонстрировать.реализацию.рассмотренных.паттернов.на.
    практике..Мы.берем.именно.этот.конкретный.API.по.причине.простоты.его.при- менения,.а.также.потому.что.он.поставляется.вместе.с.уже.имеющимся.у.вас.Apache.
    Kafka..Важно.помнить,.что.эти.паттерны.можно.реализовать.в.любом.фреймворке.
    потоковой.обработки.или.библиотеке.—.сами.паттерны.универсальны,.конкретны.
    только.примеры.
    В.Apache.Kafka.есть.два.потоковых.API.—.низкоуровневый.Processor.API.и.высо- коуровневый.Streams.DSL..Мы.воспользуемся.Kafka.Streams.DSL..Он.позволяет.
    задавать.приложение.потоковой.обработки.путем.описания.последовательности.
    преобразований.событий.потока..Преобразования.могут.быть.простыми,.например,.
    фильтрами,.или.сложными,.например,.соединениями.потоков..Низкоуровневый.

    298 Глава 11 • Потоковая обработка
    API.позволяет.создавать.собственные.преобразования,.но,.как.вы.увидите,.это.
    редко.используется.на.практике.
    Создание.приложения,.задействующего.API.DSL,.всегда.начинается.с.формирова- ния.с.помощью.StreamBuilder.топологии.обработки.—.ориентированного.ацикли- ческого.графа.(DAG).преобразований,.применяемых.ко.всем.событиям.потоков..
    Затем.на.основе.топологии.создается.исполняемый.объект.
    KafkaStreams
    ..При.за- пуске.объекта.
    KafkaStreams
    .создается.несколько.потоков.выполнения,.каждый.
    из.которых.использует.топологию.обработки.к.событиям.из.потоков..Обработка.
    завершается.по.закрытии.объекта.
    KafkaStreams
    Мы.рассмотрим.несколько.примеров.использования.Kafka.Streams.для.реализации.
    некоторых.из.обсуждавшихся.ранее.паттернов.проектирования..Для.демонстрации.
    паттерна.отображения/свертки.и.простых.сводных.показателей.воспользуемся.
    простым.примером.с.подсчетом.слов..Затем.перейдем.к.примеру.с.вычислением.
    различных.сводных.статистических.показателей.для.рынка.ценных.бумаг,.ко- торый.позволит.продемонстрировать.сводные.показатели.по.временным.окнам..
    И.наконец,.проиллюстрируем.соединение.потоков.на.примере.обогащения.потока.
    переходов.по.ссылкам.
    Подсчет количества слов
    Вкратце.рассмотрим.сокращенный.вариант.примера.подсчета.слов.для.Kafka.
    Streams..Полный.пример.вы.можете.найти.на.GitHub.(
    http://www.bit.ly/2ri00gj
    ).
    Прежде.всего.при.создании.приложения.потоковой.обработки.необходимо.настроить.
    Kafka.Streams..У.него.есть.множество.параметров,.которые.мы.не.станем.тут.обсуж- дать,.так.как.их.описание.можно.найти.в.документации.(
    http://www.bit.ly/2t7obPU
    )..
    Кроме.того,.можно.настроить.встроенные.в.Kafka.Streams.производитель.и.по- требитель,.добавив.любые.нужные.настройки.производителя.или.потребителя.
    в.объект.Properties:
    public class WordCountExample {
    public static void main(String[] args) throws Exception{
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG,
    "wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
    "localhost:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
    Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
    Serdes.String().getClass().getName());
    .У.каждого.приложения.Kafka.должен.быть.свой.идентификатор.приложения..
    Он.используется.для.координации.действий.экземпляров.приложения,.а.также.

    Kafka Streams в примерах 299
    именования.внутренних.локальных.хранилищ.и.относящихся.к.ним.тем..Среди.
    приложений.Kafka.Streams,.работающих.в.пределах.одного.кластера,.иденти- фикаторы.не.должны.повторяться.
    .Приложения.Kafka.Streams.всегда.читают.данные.из.тем.Kafka.и.записывают.
    результаты.в.темы.Kafka..Как.мы.увидим.далее,.приложения.Kafka.Streams.так- же.применяют.Kafka.для.координации.своих.действий..Так.что.лучше.указать.
    приложению,.где.искать.Kafka.
    .Приложение.должно.выполнять.сериализацию.и.десериализацию.при.чтении.
    и.записи.данных,.поэтому.мы.указываем.классы,.наследующие.интерфейс.
    Serde для.использования.по.умолчанию.
    Задав.настройки,.можно.перейти.к.построению.топологии.потоков:
    KStreamBuilder builder = new KStreamBuilder();
    KStream source =
    builder.stream("wordcount-input");
    final Pattern pattern = Pattern.compile("\\W+");
    KStream counts = source.flatMapValues(value->
    Arrays.asList(pattern.split(value.toLowerCase())))
    .map((key, value) -> new KeyValue Object>(value, value))
    .filter((key, value) -> (!value.equals("the")))
    .groupByKey()
    .count("CountStore").mapValues(value->
    Long.toString(value)).toStream(); counts.to("wordcount-output");
    .Создаем.объект.класса.
    KStreamBuilder
    .и.приступаем.к.описанию.потока,.пере- давая.название.входной.темы.
    .Все.читаемые.из.темы-производителя.события.представляют.собой.строки.слов..
    Мы.разбиваем.их.с.помощью.регулярного.выражения.на.последовательности.
    отдельных.слов..Затем.вставляем.каждое.из.слов.(значение.записи.для.какого- либо.события).в.ключ.записи.этого.события.для.дальнейшего.использования.
    в.операции.группировки.
    .Отфильтровываем.слово.the.просто.для.демонстрации.того,.как.просто.это.
    делать.
    .И.группируем.по.ключу,.получая.наборы.событий.для.каждого.уникального.
    слова.
    .Подсчитываем.количество.событий.в.каждом.наборе..Заносим.результаты.
    в.значение.типа.
    Long
    ..Преобразуем.его.в.
    String
    .для.большей.удобочитаемости.
    результатов.
    .Осталось.только.записать.результаты.обратно.в.Kafka.

    300 Глава 11 • Потоковая обработка
    После.описания.последовательности.выполняемых.приложением.преобразований.
    нам.осталось.лишь.запустить.его:
    KafkaStreams streams = new KafkaStreams(builder, props); streams.start();
    // Обычно потоковое приложение работает постоянно,
    // в данном же примере мы запустим его на некоторое время,
    // а затем остановим, поскольку входные данные не бесконечны.
    Thread.sleep(5000L);
    streams.close();
    }
    }
    .Описываем.объект.
    KafkaStreams
    .на.основе.нашей.топологии.и.заданных.нами.
    свойств.
    .Запускаем.Kafka.Streams.
    .Через.некоторое.время.останавливаем.
    Вот.и.все!.Понадобилось.всего.несколько.строк,.чтобы.реализовать.паттерн.об- работки.отдельных.событий.(мы.выполнили.отображение,.а.затем.фильтрацию.
    событий)..Мы.заново.разделили.данные,.добавив.оператор.
    group-by
    ,.после.чего.
    на.основе.простого.локального.состояния.подсчитали.число.записей,.в.которых.
    каждое.уникальное.слово.является.ключом,.то.есть.количество.вхождений.каждого.
    из.слов.
    Теперь.мы.рекомендуем.запустить.полный.вариант.примера..Инструкции.по.вы- полнению.полного.примера.можно.найти.в.файле.
    README
    .репозитория.на.GitHub.
    (
    http://www.bit.ly/2sOXzUN
    ).
    Отметим,.что.для.запуска.всего.примера.не.требуется.ничего.устанавливать,.кроме.
    самой.Apache.Kafka..Вы.могли.наблюдать.подобное.при.использовании.Spark,.на- пример,.в.локальном режиме.(Local.Mode)..Основное.различие:.если.во.входной.теме.
    несколько.разделов,.можно.путем.запуска.нескольких.экземпляров.приложения.
    WordCount
    .(подобно.запуску.приложения.в.нескольких.различных.вкладках.терми- нала).создать.свой.первый.кластер.обработки.Kafka.Streams..Экземпляры.приложе- ния.
    WordCount
    .при.этом.смогут.взаимодействовать.друг.с.другом.и.согласовывать.
    свою.работу..Один.из.главных.порогов.вхождения.для.Spark.—.то,.что.использовать.
    его.в.локальном.режиме.очень.просто,.но.для.эксплуатации.кластера.необходимо.
    установить.YARN.или.Mesos,.после.чего.установить.Spark.на.всех.машинах,.а.затем.
    разобраться.с.запуском.приложения.на.кластере..В.случае.же.API.Kafka.Streams,.
    можно.просто.запустить.несколько.экземпляров.приложения.—.и.кластер.готов..
    Одно.и.то.же.приложение.работает.на.машине.разработчика.и.при.промышленной.
    эксплуатации.

    Kafka Streams в примерах 301
    Сводные показатели фондовой биржи
    Следующий.пример.сложнее.—.мы.прочитаем.поток.событий.биржевых.операций,.
    включающий.символы.акций,.цену.и.величину.предложения..В.биржевых.операци- ях.цена предложения.(ask.price).—.это.то,.сколько.просит.за.акции.продавец,.а.цена
    заявки.(bid.price).—.то,.что.готов.заплатить.покупатель..Величина.предложения.
    (ask.size).—.число.акций,.которое.продавец.согласен.продать.по.данной.цене..
    Для.упрощения.примера.мы.полностью.проигнорируем.заявки..Не.станем.также.
    включать.в.данные.метки.даты/времени,.вместо.этого.воспользуемся.временем.
    события,.передаваемым.производителем.Kafka.
    Затем.мы.создадим.выходные.потоки.данных,.содержащие.несколько.оконных.
    сводных.показателей:
    ‰
    ‰
    наилучшую,.то.есть.минимальную.цену.предложения.для.каждого.пятисекунд- ного.окна;
    ‰
    ‰
    число.сделок.для.каждого.пятисекундного.окна;
    ‰
    ‰
    среднюю.цену.предложения.для.каждого.пятисекундного.окна.
    Все.сводные.показатели.будут.обновляться.каждую.секунду.
    Для.простоты.предположим,.что.на.бирже.торгуется.лишь.десять.символов.акций..
    Настройки.очень.похожи.на.те,.которые.мы.ранее.использовали.в.примере.с.под- счетом.слов.на.странице:
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stockstat");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
    Constants.BROKER);
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
    Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
    TradeSerde.class.getName());
    Основное.отличие.—.в.использовании.классов.
    Serde
    ..В.примере.подсчета.количе- ства.слов.на.странице.как.для.ключей,.так.и.для.значений.применялся.строковый.
    тип.данных,.а.следовательно,.в.качестве.сериализатора.и.десериализатора.для.обоих.
    задействовался.метод.
    Serdes.String()
    ..В.данном.же.примере.ключ.—.по-прежнему.
    строка,.но.значение.представляет.собой.объект.класса.
    Trade
    ,.содержащий.символ.
    акции,.цену.и.величину.предложения..Для.сериализации.и.десериализации.дан- ного.объекта,.а.также.нескольких.других.объектов,.применяемых.в.этом.неболь- шом.приложении,.воспользуемся.библиотекой.Gson.от.компании.Google,.чтобы.
    генерировать.сериализатор.и.десериализатор.JSON.на.основе.Java-объекта..Затем.
    создадим.небольшой.адаптер,.формирующий.из.них.объект.
    Serde
    ..Объект.
    Serde создаем.следующим.образом:
    static public final class TradeSerde extends WrapperSerde {
    public TradeSerde() {

    1   ...   31   32   33   34   35   36   37   38   39


    написать администратору сайта