Главная страница
Навигация по странице:

  • Автономный режим

  • Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


    Скачать 7.59 Mb.
    НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
    Дата21.06.2022
    Размер7.59 Mb.
    Формат файлаpdf
    Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
    ТипДокументы
    #609074
    страница21 из 39
    1   ...   17   18   19   20   21   22   23   24   ...   39
    167
    Связывание и быстрота адаптации
    Одна.из.важнейших.задач.конвейеров.данных.—.расцепление.источников.и.при- емников.данных..Случайное.связывание.может.возникнуть.множеством.спо- собов.
    ‰
    ‰
    Узкоспециализированные конвейеры..Некоторые.компании.создают.по.отдельному.
    конвейеру.для.каждой.пары.приложений,.которые.нужно.связать..Например,.
    они.используют.Logstash,.чтобы.выгрузить.журналы.в.Elasticsearch,.Flume,.
    чтобы.выгрузить.журналы.в.HDFS,.GoldenGate.для.передачи.данных.из.Oracle.
    в.HDFS,.Informatica.для.переброски.данных.из.MySQL.и.XML-файлов.в.Oracle.
    и.т..д..Такая.практика.приводит.к.сильному.связыванию.конвейера.данных.
    с.конкретными.конечными.точками.и.образует.мешанину.из.точек.интеграции,.
    требующую.немалых.затрат.труда.для.развертывания,.сопровождения.и.мо- ниторинга..Из-за.этого.возрастают.затраты.на.внедрение.новых.технологий.
    и.усложняются.инновации,.ведь.для.каждой.новой.появляющейся.в.компании.
    системы.приходится.создавать.дополнительные.конвейеры.
    ‰
    ‰
    Потери метаданных..Если.конвейер.данных.не.сохраняет.метаданные.схемы.
    и.не.позволяет.ей.эволюционировать,.производящее.данные.программное.
    обеспечение.окажется.в.конечном.итоге.сильно.связанным.с.программным.
    обеспечением,.их.использующим..Без.информации.о.схеме.каждый.из.этих.про- граммных.продуктов.должен.будет.содержать.информацию.о.способе.разбора.
    данных.и.их.интерпретации..Если.данные.движутся.из.Oracle.в.HDFS.и.адми- нистратор.базы.данных.добавил.в.Oracle.новое.поле,.не.сохранив.информацию.
    о.схеме.и.не.разрешив.ей.эволюционировать,.то.всем.разработчикам.придется.
    одновременно.модифицировать.свои.приложения..В.противном.случае.все.при- ложения,.читающие.из.HDFS.данные,.перестанут.работать..Оба.эти.варианта.
    отнюдь.не.означают,.что.адаптация.будет.быстрой..Если.конвейер.поддерживает.
    эволюцию.схемы,.то.все.команды.разработчиков.могут.менять.свои.приложения.
    независимо.друг.от.друга,.не.волнуясь,.что.далее.по.конвейеру.что-то.перестанет.
    работать.
    ‰
    ‰
    Чрезмерная обработка..Как.мы.уже.упоминали.при.обсуждении.преобразований.
    данных,.определенная.обработка.данных.—.неотъемлемое.свойство.конвейеров..
    В.конце.концов,.данные.перемещаются.между.разными.системами,.в.которых.
    используются.разные.форматы.данных.и.поддерживаются.различные.сцена- рии..Однако.чрезмерная.обработка.ограничивает.располагающиеся.далее.по.
    конвейеру.системы.решениями,.принятыми.при.создании.конвейера:.о.том,.
    какие.поля.сохранять,.как.агрегировать.данные.и.т..д..Часто.из-за.этого.кон- вейер.постоянно.изменяется.по.мере.смены.требований.от.приложений,.рас- полагающихся.далее.по.конвейеру,.что.неэффективно,.небезопасно.и.плохо.
    соответствует.концепции.быстрой.адаптации..Чтобы.адаптация.была.быстрой,.
    стоит.сохранить.как.можно.больше.необработанных.данных.и.разрешить.рас- полагающимся.далее.по.конвейеру.приложениям.самим.решать,.как.их.обра- батывать.и.агрегировать.

    168 Глава 7 • Создание конвейеров данных
    Когда использовать Kafka Connect, а когда клиенты-производители и клиенты-потребители
    При.записи.данных.в.Kafka.или.чтении.из.нее.можно.задействовать.традиционные.
    клиент-производитель.и.клиент-потребитель,.как.описано.в.главах.3.и.4,.или.вос- пользоваться.API.Kafka.Connect.и.коннекторами,.как.мы.покажем.далее..Прежде.
    чем.углубиться.в.нюансы.Kafka.Connect,.задумаемся.о.том,.когда.каждую.из.этих.
    возможностей.применить.
    Как.мы.уже.видели,.клиенты.Kafka.представляют.собой.клиенты,.встраиваемые.
    в.ваше.же.приложение..Благодаря.этому.приложение.может.читать.данные.из.Kafka.
    и.записывать.данные.в.нее..Используйте.клиенты.Kafka.тогда,.когда.у.вас.есть.воз- можность.модифицировать.код.приложения,.к.которому.вы.хотите.подключиться,.
    и.когда.вы.хотели.бы.поместить.данные.в.Kafka.или.извлечь.их.из.нее.
    Kafka.Connect.же.вы.будете.задействовать.для.подключения.Kafka.к.хранилищам.
    данных,.созданным.не.вами,.код.которых.вы.не.можете.или.не.должны.менять..
    Kafka.Connect.применяется.для.извлечения.данных.из.внешнего.хранилища.
    данных.в.Kafka.или.помещения.данных.из.нее.во.внешнее.хранилище..Если.для.
    хранилищ.уже.существует.коннектор,.Kafka.Connect.могут.использовать.и.не.
    программисты,.а.простые.пользователи,.которым.необходимо.только.настроить.
    коннекторы.
    Если.же.нужно.подключить.Kafka.к.хранилищу.данных,.для.которого.еще.не.су- ществует.коннектора,.можно.написать.приложение,.задействующее.или.клиенты.
    Kafka,.или.Kafka.Connect..Рекомендуется.задействовать.Connect,.поскольку.он.
    предоставляет.такие.готовые.возможности,.как.управление.настройками,.хранение.
    смещений,.распараллеливание,.обработка.ошибок,.поддержка.различных.типов.
    данных.и.стандартные.REST.API.для.управления.коннекторами..Кажется,.что.
    написать.маленькое.приложение.для.подключения.Kafka.к.хранилищу.данных.
    очень.просто,.но.вам.придется.учесть.много.мелких.нюансов,.относящихся.к.типам.
    данных.и.настройкам,.так.что.задача.окажется.не.такой.уж.простой..Kafka.Connect.
    берет.большую.часть.этих.забот.на.себя,.благодаря.чему.вы.можете.сосредоточиться.
    на.перемещении.данных.во.внешние.хранилища.и.обратно.
    Kafka Connect
    Фреймворк.Kafka.Connect.—.часть.Apache.Kafka,.обеспечивающая.масштабируе- мый.и.гибкий.способ.перемещения.данных.между.Kafka.и.другими.хранилищами.
    данных..Он.предоставляет.API.и.среду.выполнения.для.разработки.и.запуска.пла-
    гинов-коннекторов.(connector.plugins).—.исполняемых.Kafka.Connect.библиотек,.
    отвечающих.за.перемещение.данных..Kafka.Connect.выполняется.в.виде.кластера.
    процессов-исполнителей.(worker.processes)..Необходимо.установить.плагины-

    Kafka Connect 169
    коннекторы.на.исполнителях,.после.чего.воспользоваться.API.REST.для.настройки.
    коннекторов,.выполняемых.с.определенными.конфигурациями,.и.управления.ими..
    Коннекторы.запускают.дополнительные.задачи.(tasks).для.параллельного.пере- мещения.больших.объемов.данных.и.эффективного.использования.доступных.ре- сурсов.рабочих.узлов..Задачам.коннектора.источника.необходимо.лишь.прочитать.
    данные.из.системы-источника.и.передать.объекты.данных.коннектора.процессам- исполнителям..Задачи.коннектора.приемника.получают.объекты.данных.коннекто- ра.от.исполнителей.и.отвечают.за.их.запись.в.целевую.информационную.систему..
    Для.обеспечения.хранения.этих.объектов.данных.в.Kafka.в.различных.форматах.
    Kafka.Connect.использует.преобразователи формата (convertors).—.поддержка.
    формата.JSON.встроена.в.Apache.Kafka,.а.реестр.схем.Confluent.предоставляет.
    преобразователи.форматов.Avro..Благодаря.этому.пользователи.могут.выбирать.
    формат.хранения.данных.в.Kafka.независимо.от.задействованных.коннекторов.
    В.этой.главе.мы,.разумеется,.не.можем.обсудить.все.нюансы.Kafka.Connect.и.мно- жества.его.коннекторов..Это.потребовало.бы.отдельной.книги..Однако.сделаем.
    обзор.Kafka.Connect.и.того,.как.он.применяется,.а.также.укажем,.где.искать.до- полнительную.справочную.информацию.
    Запуск Connect
    Kafka.Connect.поставляется.вместе.с.Apache.Kafka,.так.что.установить.его.отдельно.
    не.требуется..Для.промышленной.эксплуатации,.особенно.если.вы.собираетесь.
    использовать.Connect.для.перемещения.больших.объемов.данных.или.запускать.
    большое.число.коннекторов,.желательно.установить.Kafka.Connect.на.отдельном.
    сервере..В.этом.случае.установите.Apache.Kafka.на.все.машины,.запустив.на.части.
    серверов.брокеры,.а.на.других.серверах.—.Connect.
    Запуск.исполнителя.Kafka.Connect.напоминает.запуск.брокера..Нужно.просто.
    вызвать.сценарий.запуска,.передав.ему.файл.с.параметрами:
    bin/connect-distributed.sh config/connect-distributed.properties
    Вот.несколько.основных.настроек.исполнителей.Connect.
    ‰
    ‰ bootstrap.servers:
    .—.список.брокеров.Kafka,.с.которыми.будет.работать.
    Connect..Коннекторы.будут.передавать.данные.в.эти.брокеры.или.из.них..
    Указывать.в.этом.списке.все.брокеры.кластера.не.нужно,.но.рекомендуется.
    хотя.бы.три.
    ‰
    ‰ group.id:
    .—.все.исполнители.с.одним.идентификатором.группы.образуют.один.
    кластер.Connect..Запущенный.на.этом.кластере.коннектор.может.оказаться.за- пущенным.на.любом.из.исполнителей.кластера,.как.и.его.задачи.
    ‰
    ‰ key.converter
    .и.
    value.converter:
    .—.Connect.может.работать.с.несколькими.
    форматами.данных,.хранимых.в.Kafka..Эти.две.настройки.задают.преобразователь.
    формата.для.ключа.и.значения.сообщения,.сохраняемого.в.Kafka..По.умолчанию.

    170 Глава 7 • Создание конвейеров данных используется.формат.JSON.с.включенным.в.Apache.Kafka.преобразователем.
    JSONConverter..Можно.также.установить.их.равными.
    AvroConverter
    .—.это.со- ставная.часть.реестра.схем.Confluent.
    У.некоторых.преобразователей.формата.есть.особые.параметры.конфигурации..
    Например,.сообщения.в.формате.JSON.могут.включать.или.не.включать.схему..
    Чтобы.конкретизировать.эту.возможность,.нужно.установить.параметр.
    key.con- verter.schema.enable=true
    .или.
    false
    .соответственно..Аналогичную.настройку.
    можно.выполнить.для.преобразователя.значений,.установив.параметр.
    value.conver- ter.schema.enable
    .в.
    true
    .или.
    false
    ..Сообщения.Avro.также.содержат.схему,.но.
    необходимо.задать.местоположение.реестра.схем.с.помощью.свойств.
    key.con- verter.schema.registry.url
    .и.
    value.converter.schema.registry.url
    Настройки.
    rest.host.name
    .и.
    rest.port
    ..Для.настройки.и.контроля.коннекторов.
    обычно.используется.API.REST.или.Kafka.Connect..При.необходимости.вы.можете.
    задать.конкретный.порт.для.API.REST.
    Настроив.исполнителей,.убедитесь,.что.ваш.кластер.работает,.с.помощью.API.
    REST:
    gwen$ curl http://localhost:8083/
    {"version":"0.10.1.0-SNAPSHOT","commit":"561f45d747cd2a8c"}
    В.результате.обращения.к.корневому.URI.REST.должна.быть.возвращена.текущая.
    версия..Мы.работаем.с.предварительным.выпуском.Kafka.0.10.1.0..Можно.также.
    просмотреть.доступные.плагины.коннекторов:
    gwen$ curl http://localhost:8083/connector-plugins
    [{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"},
    {"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"}]
    У.нас.запущена.чистая.Apache.Kafka,.так.что.доступны.только.плагины.коннекто- ров.для.файлового.источника.и.файлового.приемника.
    Взглянем.теперь.на.настройку.и.использование.этих.образцов.коннекторов,.после.
    чего.перейдем.к.более.сложным.примерам,.требующим.настройки.внешних.инфор- мационных.систем,.к.которым.будет.осуществляться.подключение.
    Автономный режим
    Отметим, что у Kafka Connect имеется автономный режим. Он схож с распреде- ленным режимом — нужно просто запустить сценарий bin/connect-standalone.sh вместо bin/connect-distributed.sh. Файл настроек коннекторов можно передать в командной строке, а не через API REST. В этом режиме все коннекторы и задачи выполняются на одном автономном исполнителе. Connect в автономном режиме обычно удобнее использовать для разработки и отладки, а также в случаях, когда коннекторы и задачи должны выполняться на конкретной машине (напри- мер, коннектор syslog прослушивает порт, так что вам нужно знать, на каких машинах он работает).

    Kafka Connect 171
    Пример коннектора: файловый источник и файловый приемник
    Здесь.мы.воспользуемся.файловыми.коннекторами.и.преобразователем.формата.
    для.JSON,.включенными.в.состав.Apache.Kafka..Чтобы.следить.за.ходом.рассуж- дений,.убедитесь,.что.у.вас.установлены.и.запущены.ZooKeeper.и.Kafka.
    Для.начала.запустите.распределенный.исполнитель.Connect..При.промышленной.
    эксплуатации.должны.работать.хотя.бы.два.или.три.таких.исполнителя,.чтобы.обе- спечить.высокую.доступность..В.данном.примере.он.будет.один:
    bin/connect-distributed.sh config/connect-distributed.properties &
    Теперь.можно.запустить.файловый.источник..В.качестве.примера.настроим.его.на.
    чтение.файла.конфигурации.Kafka.—.фактически.передадим.конфигурацию.Kafka.
    в.тему.Kafka:
    echo '{"name":"load-kafka-config", "config":{"connector.class":"FileStreamSource",
    "file":"config/server.properties","topic":"kafka-config-topic"}}' |
    curl -X POST -d @- http://localhost:8083/connectors --header
    "contentType:application/json"
    {"name":"load-kafka-config","config":{"connector.class":"FileStreamSource",
    "file":"config/server.properties","topic":"kafka-config-topic","name":"load-kafka- config"},"tasks":[]}
    Для.создания.коннектора.мы.написали.JSON-текст,.включающий.название.коннек- тора.—.
    load-kafka-config
    .—.и.ассоциативный.массив.его.настроек,.включающий.
    класс.коннектора,.загружаемый.файл.и.тему,.в.которую.мы.хотим.его.загрузить.
    Воспользуемся.консольным.потребителем.Kafka.для.проверки.загрузки.настроек.
    в.тему:
    gwen$ bin/kafka-console-consumer.sh --new-consumer
    --bootstrap-server=localhost:9092 --topic kafka-config-topic --from-beginning
    Если.все.прошло.успешно,.вы.увидите.что-то.вроде:
    {"schema":{"type":"string","optional":false},"payload":"# Licensed to the
    Apache Software Foundation (ASF) under one or more"}

    {"schema":{"type":"string","optional":false},"pay- load":"############################# Server Basics
    #############################"}
    {"schema":{"type":"string","optional":false},"payload":""}
    {"schema":{"type":"string","optional":false},"payload":"# The id of the broker.
    This must be set to a unique integer for each broker."}
    {"schema":{"type":"string","optional":false},"payload":"broker.id=0"}
    {"schema":{"type":"string","optional":false},"payload":""}


    172 Глава 7 • Создание конвейеров данных
    Фактически.это.содержимое.файла.
    config/server.properties
    ,.преобразованного.
    нашим.коннектором.построчно.в.формат.JSON.и.помещенного.в.
    kafka-config- topic
    ..Обратите.внимание.на.то,.что.по.умолчанию.преобразователь.формата.JSON.
    вставляет.схему.в.каждую.запись..В.данном.случае.схема.очень.проста.—.всего.один.
    столбец.
    payload
    .типа.
    string
    ,.содержащий.по.одной.строке.из.файла.для.каждой.
    записи.
    А.сейчас.воспользуемся.преобразователем.формата.файлового.приемника.для.
    сброса.содержимого.этой.темы.в.файл..Итоговый.файл.должен.оказаться.точно.
    таким.же,.как.и.исходный.
    config/server.properties
    ,.поскольку.преобразователь.
    JSON.преобразует.записи.в.формате.JSON.в.обычные.текстовые.строки:
    echo '{"name":"dump-kafka-config", "config":
    {"connector.class":"FileStreamSink","file":"copy-of-server- properties","topics":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
    {"name":"dump-kafka-config","config":
    {"connector.class":"FileStreamSink","file":"copy-of-server- properties","topics":"kafka-config-topic","name":"dump-kafka-config"},"tasks":
    []}
    Обратите.внимание.на.отличие.от.исходных.настроек:.сейчас.мы.используем.класс.
    FileStreamSink
    ,.а.не.
    FileStreamSource
    ..У.нас.по-прежнему.есть.свойство.
    file
    ,.но.
    теперь.оно.указывает.на.целевой.файл,.а.не.на.источник.записей,.и.вместо.
    topic мы.указываем.
    topics
    ..Внимание,.множественное.число!.С.помощью.приемника.
    можно.записывать.несколько.тем.в.один.файл,.в.то.время.как.источник.позволяет.
    записывать.только.в.одну.тему.
    В.случае.успешного.выполнения.вы.получите.файл.
    copy-of-server-properties
    ,.
    совершенно.идентичный.файлу.
    config/server.properties
    ,.на.основе.которого.мы.
    заполняли.
    kafka-config-topic
    Удалить.коннектор.можно.с.помощью.команды:
    curl -X DELETE http://localhost:8083/connectors/dump-kafka-config
    Если.вы.заглянете.в.журнал.исполнителя.Connect.после.удаления.коннектора,.то.
    обнаружите,.что.все.остальные.коннекторы.перезапускают.задания..Это.происхо- дит.для.перераспределения.оставшихся.задач.между.исполнителями.и.обеспечения.
    равномерной.нагрузки.после.удаления.коннектора.
    Пример коннектора: из MySQL в Elasticsearch
    Теперь,.когда.заработал.простой.пример,.пора.заняться.чем-то.более.полез- ным..Возьмем.таблицу.MySQL,.отправим.ее.в.тему.Kafka,.загрузим.оттуда.
    в.Elasticsearch.и.проиндексируем.ее.содержимое.

    Kafka Connect 173
    Мы.выполняем.эксперименты.на.MacBook..Для.установки.MySQL.и.Elasticsearch.
    достаточно.выполнить.команды:
    brew install mysql brew install elasticsearch
    Следующий.шаг.—.проверить.наличие.коннекторов..Если.вы.работаете.с.Confluent.
    OpenSource,.то.они.уже.должны.быть.установлены.как.часть.платформы..В.про- тивном.случае.можно.выполнить.сборку.коннекторов.из.кода.в.GitHub.
    1.. Перейдите.по.адресу.
    https://github.com/confluentinc/kafka-connect-elasticsearch
    2.. Клонируйте.этот.репозиторий.
    3.. Выполните.команду.
    mvn install
    .для.сборки.проекта.
    4.. Повторите.эти.действия.для.коннектора.JDBC.(
    https://github.com/confluentinc/
    kafka-connect-jdbc
    ).
    Теперь.скопируйте.JAR-файлы,.появившиеся.в.результате.сборки.в.подкаталогах.
    target
    .каталогов,.в.которых.выполнялась.сборка.коннекторов,.в.место,.соответ- ствующее.пути.к.классам.Kafka.Connect:
    gwen$ mkdir libs gwen$ cp ../kafka-connect-jdbc/target/kafka-connect-jdbc-3.1.0-SNAPSHOT.jar libs/
    gwen$ cp ../kafka-connect-elasticsearch/target/kafka-connect- elasticsearch-3.2.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/*
    libs/
    Если.исполнители.Kafka.Connect.еще.не.запущены,.сделайте.это.и.проверьте,.что.
    новые.плагины.коннекторов.перечислены.в.списке:
    gwen$ bin/connect-distributed.sh config/connect-distributed.properties &
    gwen$ curl http://localhost:8083/connector-plugins
    [{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"},
    {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"},
    {"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"},
    {"class":"io.confluent.connect.jdbc.JdbcSourceConnector"}]
    Как.видите,.в.кластере.теперь.доступны.новые.плагины.коннекторов..JDBC- источнику.требуется.драйвер.MySQL.для.работы.с.MySQL..Мы.скачали.JDBC- драйвер.для.MySQL.с.сайта.Oracle,.разархивировали.его.и.скопировали.файл.
    mysql-connector-java-5.1.40-bin.jar
    .в.каталог.
    libs/
    .при.копировании.коннек- торов.
    Следующий.шаг.—.создание.таблицы.в.базе.данных.MySQL,.которую.потом.можно.
    будет.передать.в.Kafka.с.помощью.JDBC-коннектора:
    gwen$ mysql.server restart mysql> create database test;

    1   ...   17   18   19   20   21   22   23   24   ...   39


    написать администратору сайта