Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
167 Связывание и быстрота адаптации Одна.из.важнейших.задач.конвейеров.данных.—.расцепление.источников.и.при- емников.данных..Случайное.связывание.может.возникнуть.множеством.спо- собов. Узкоспециализированные конвейеры..Некоторые.компании.создают.по.отдельному. конвейеру.для.каждой.пары.приложений,.которые.нужно.связать..Например,. они.используют.Logstash,.чтобы.выгрузить.журналы.в.Elasticsearch,.Flume,. чтобы.выгрузить.журналы.в.HDFS,.GoldenGate.для.передачи.данных.из.Oracle. в.HDFS,.Informatica.для.переброски.данных.из.MySQL.и.XML-файлов.в.Oracle. и.т..д..Такая.практика.приводит.к.сильному.связыванию.конвейера.данных. с.конкретными.конечными.точками.и.образует.мешанину.из.точек.интеграции,. требующую.немалых.затрат.труда.для.развертывания,.сопровождения.и.мо- ниторинга..Из-за.этого.возрастают.затраты.на.внедрение.новых.технологий. и.усложняются.инновации,.ведь.для.каждой.новой.появляющейся.в.компании. системы.приходится.создавать.дополнительные.конвейеры. Потери метаданных..Если.конвейер.данных.не.сохраняет.метаданные.схемы. и.не.позволяет.ей.эволюционировать,.производящее.данные.программное. обеспечение.окажется.в.конечном.итоге.сильно.связанным.с.программным. обеспечением,.их.использующим..Без.информации.о.схеме.каждый.из.этих.про- граммных.продуктов.должен.будет.содержать.информацию.о.способе.разбора. данных.и.их.интерпретации..Если.данные.движутся.из.Oracle.в.HDFS.и.адми- нистратор.базы.данных.добавил.в.Oracle.новое.поле,.не.сохранив.информацию. о.схеме.и.не.разрешив.ей.эволюционировать,.то.всем.разработчикам.придется. одновременно.модифицировать.свои.приложения..В.противном.случае.все.при- ложения,.читающие.из.HDFS.данные,.перестанут.работать..Оба.эти.варианта. отнюдь.не.означают,.что.адаптация.будет.быстрой..Если.конвейер.поддерживает. эволюцию.схемы,.то.все.команды.разработчиков.могут.менять.свои.приложения. независимо.друг.от.друга,.не.волнуясь,.что.далее.по.конвейеру.что-то.перестанет. работать. Чрезмерная обработка..Как.мы.уже.упоминали.при.обсуждении.преобразований. данных,.определенная.обработка.данных.—.неотъемлемое.свойство.конвейеров.. В.конце.концов,.данные.перемещаются.между.разными.системами,.в.которых. используются.разные.форматы.данных.и.поддерживаются.различные.сцена- рии..Однако.чрезмерная.обработка.ограничивает.располагающиеся.далее.по. конвейеру.системы.решениями,.принятыми.при.создании.конвейера:.о.том,. какие.поля.сохранять,.как.агрегировать.данные.и.т..д..Часто.из-за.этого.кон- вейер.постоянно.изменяется.по.мере.смены.требований.от.приложений,.рас- полагающихся.далее.по.конвейеру,.что.неэффективно,.небезопасно.и.плохо. соответствует.концепции.быстрой.адаптации..Чтобы.адаптация.была.быстрой,. стоит.сохранить.как.можно.больше.необработанных.данных.и.разрешить.рас- полагающимся.далее.по.конвейеру.приложениям.самим.решать,.как.их.обра- батывать.и.агрегировать. 168 Глава 7 • Создание конвейеров данных Когда использовать Kafka Connect, а когда клиенты-производители и клиенты-потребители При.записи.данных.в.Kafka.или.чтении.из.нее.можно.задействовать.традиционные. клиент-производитель.и.клиент-потребитель,.как.описано.в.главах.3.и.4,.или.вос- пользоваться.API.Kafka.Connect.и.коннекторами,.как.мы.покажем.далее..Прежде. чем.углубиться.в.нюансы.Kafka.Connect,.задумаемся.о.том,.когда.каждую.из.этих. возможностей.применить. Как.мы.уже.видели,.клиенты.Kafka.представляют.собой.клиенты,.встраиваемые. в.ваше.же.приложение..Благодаря.этому.приложение.может.читать.данные.из.Kafka. и.записывать.данные.в.нее..Используйте.клиенты.Kafka.тогда,.когда.у.вас.есть.воз- можность.модифицировать.код.приложения,.к.которому.вы.хотите.подключиться,. и.когда.вы.хотели.бы.поместить.данные.в.Kafka.или.извлечь.их.из.нее. Kafka.Connect.же.вы.будете.задействовать.для.подключения.Kafka.к.хранилищам. данных,.созданным.не.вами,.код.которых.вы.не.можете.или.не.должны.менять.. Kafka.Connect.применяется.для.извлечения.данных.из.внешнего.хранилища. данных.в.Kafka.или.помещения.данных.из.нее.во.внешнее.хранилище..Если.для. хранилищ.уже.существует.коннектор,.Kafka.Connect.могут.использовать.и.не. программисты,.а.простые.пользователи,.которым.необходимо.только.настроить. коннекторы. Если.же.нужно.подключить.Kafka.к.хранилищу.данных,.для.которого.еще.не.су- ществует.коннектора,.можно.написать.приложение,.задействующее.или.клиенты. Kafka,.или.Kafka.Connect..Рекомендуется.задействовать.Connect,.поскольку.он. предоставляет.такие.готовые.возможности,.как.управление.настройками,.хранение. смещений,.распараллеливание,.обработка.ошибок,.поддержка.различных.типов. данных.и.стандартные.REST.API.для.управления.коннекторами..Кажется,.что. написать.маленькое.приложение.для.подключения.Kafka.к.хранилищу.данных. очень.просто,.но.вам.придется.учесть.много.мелких.нюансов,.относящихся.к.типам. данных.и.настройкам,.так.что.задача.окажется.не.такой.уж.простой..Kafka.Connect. берет.большую.часть.этих.забот.на.себя,.благодаря.чему.вы.можете.сосредоточиться. на.перемещении.данных.во.внешние.хранилища.и.обратно. Kafka Connect Фреймворк.Kafka.Connect.—.часть.Apache.Kafka,.обеспечивающая.масштабируе- мый.и.гибкий.способ.перемещения.данных.между.Kafka.и.другими.хранилищами. данных..Он.предоставляет.API.и.среду.выполнения.для.разработки.и.запуска.пла- гинов-коннекторов.(connector.plugins).—.исполняемых.Kafka.Connect.библиотек,. отвечающих.за.перемещение.данных..Kafka.Connect.выполняется.в.виде.кластера. процессов-исполнителей.(worker.processes)..Необходимо.установить.плагины- Kafka Connect 169 коннекторы.на.исполнителях,.после.чего.воспользоваться.API.REST.для.настройки. коннекторов,.выполняемых.с.определенными.конфигурациями,.и.управления.ими.. Коннекторы.запускают.дополнительные.задачи.(tasks).для.параллельного.пере- мещения.больших.объемов.данных.и.эффективного.использования.доступных.ре- сурсов.рабочих.узлов..Задачам.коннектора.источника.необходимо.лишь.прочитать. данные.из.системы-источника.и.передать.объекты.данных.коннектора.процессам- исполнителям..Задачи.коннектора.приемника.получают.объекты.данных.коннекто- ра.от.исполнителей.и.отвечают.за.их.запись.в.целевую.информационную.систему.. Для.обеспечения.хранения.этих.объектов.данных.в.Kafka.в.различных.форматах. Kafka.Connect.использует.преобразователи формата (convertors).—.поддержка. формата.JSON.встроена.в.Apache.Kafka,.а.реестр.схем.Confluent.предоставляет. преобразователи.форматов.Avro..Благодаря.этому.пользователи.могут.выбирать. формат.хранения.данных.в.Kafka.независимо.от.задействованных.коннекторов. В.этой.главе.мы,.разумеется,.не.можем.обсудить.все.нюансы.Kafka.Connect.и.мно- жества.его.коннекторов..Это.потребовало.бы.отдельной.книги..Однако.сделаем. обзор.Kafka.Connect.и.того,.как.он.применяется,.а.также.укажем,.где.искать.до- полнительную.справочную.информацию. Запуск Connect Kafka.Connect.поставляется.вместе.с.Apache.Kafka,.так.что.установить.его.отдельно. не.требуется..Для.промышленной.эксплуатации,.особенно.если.вы.собираетесь. использовать.Connect.для.перемещения.больших.объемов.данных.или.запускать. большое.число.коннекторов,.желательно.установить.Kafka.Connect.на.отдельном. сервере..В.этом.случае.установите.Apache.Kafka.на.все.машины,.запустив.на.части. серверов.брокеры,.а.на.других.серверах.—.Connect. Запуск.исполнителя.Kafka.Connect.напоминает.запуск.брокера..Нужно.просто. вызвать.сценарий.запуска,.передав.ему.файл.с.параметрами: bin/connect-distributed.sh config/connect-distributed.properties Вот.несколько.основных.настроек.исполнителей.Connect. bootstrap.servers: .—.список.брокеров.Kafka,.с.которыми.будет.работать. Connect..Коннекторы.будут.передавать.данные.в.эти.брокеры.или.из.них.. Указывать.в.этом.списке.все.брокеры.кластера.не.нужно,.но.рекомендуется. хотя.бы.три. group.id: .—.все.исполнители.с.одним.идентификатором.группы.образуют.один. кластер.Connect..Запущенный.на.этом.кластере.коннектор.может.оказаться.за- пущенным.на.любом.из.исполнителей.кластера,.как.и.его.задачи. key.converter .и. value.converter: .—.Connect.может.работать.с.несколькими. форматами.данных,.хранимых.в.Kafka..Эти.две.настройки.задают.преобразователь. формата.для.ключа.и.значения.сообщения,.сохраняемого.в.Kafka..По.умолчанию. 170 Глава 7 • Создание конвейеров данных используется.формат.JSON.с.включенным.в.Apache.Kafka.преобразователем. JSONConverter..Можно.также.установить.их.равными. AvroConverter .—.это.со- ставная.часть.реестра.схем.Confluent. У.некоторых.преобразователей.формата.есть.особые.параметры.конфигурации.. Например,.сообщения.в.формате.JSON.могут.включать.или.не.включать.схему.. Чтобы.конкретизировать.эту.возможность,.нужно.установить.параметр. key.con- verter.schema.enable=true .или. false .соответственно..Аналогичную.настройку. можно.выполнить.для.преобразователя.значений,.установив.параметр. value.conver- ter.schema.enable .в. true .или. false ..Сообщения.Avro.также.содержат.схему,.но. необходимо.задать.местоположение.реестра.схем.с.помощью.свойств. key.con- verter.schema.registry.url .и. value.converter.schema.registry.url Настройки. rest.host.name .и. rest.port ..Для.настройки.и.контроля.коннекторов. обычно.используется.API.REST.или.Kafka.Connect..При.необходимости.вы.можете. задать.конкретный.порт.для.API.REST. Настроив.исполнителей,.убедитесь,.что.ваш.кластер.работает,.с.помощью.API. REST: gwen$ curl http://localhost:8083/ {"version":"0.10.1.0-SNAPSHOT","commit":"561f45d747cd2a8c"} В.результате.обращения.к.корневому.URI.REST.должна.быть.возвращена.текущая. версия..Мы.работаем.с.предварительным.выпуском.Kafka.0.10.1.0..Можно.также. просмотреть.доступные.плагины.коннекторов: gwen$ curl http://localhost:8083/connector-plugins [{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"}, {"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"}] У.нас.запущена.чистая.Apache.Kafka,.так.что.доступны.только.плагины.коннекто- ров.для.файлового.источника.и.файлового.приемника. Взглянем.теперь.на.настройку.и.использование.этих.образцов.коннекторов,.после. чего.перейдем.к.более.сложным.примерам,.требующим.настройки.внешних.инфор- мационных.систем,.к.которым.будет.осуществляться.подключение. Автономный режим Отметим, что у Kafka Connect имеется автономный режим. Он схож с распреде- ленным режимом — нужно просто запустить сценарий bin/connect-standalone.sh вместо bin/connect-distributed.sh. Файл настроек коннекторов можно передать в командной строке, а не через API REST. В этом режиме все коннекторы и задачи выполняются на одном автономном исполнителе. Connect в автономном режиме обычно удобнее использовать для разработки и отладки, а также в случаях, когда коннекторы и задачи должны выполняться на конкретной машине (напри- мер, коннектор syslog прослушивает порт, так что вам нужно знать, на каких машинах он работает). Kafka Connect 171 Пример коннектора: файловый источник и файловый приемник Здесь.мы.воспользуемся.файловыми.коннекторами.и.преобразователем.формата. для.JSON,.включенными.в.состав.Apache.Kafka..Чтобы.следить.за.ходом.рассуж- дений,.убедитесь,.что.у.вас.установлены.и.запущены.ZooKeeper.и.Kafka. Для.начала.запустите.распределенный.исполнитель.Connect..При.промышленной. эксплуатации.должны.работать.хотя.бы.два.или.три.таких.исполнителя,.чтобы.обе- спечить.высокую.доступность..В.данном.примере.он.будет.один: bin/connect-distributed.sh config/connect-distributed.properties & Теперь.можно.запустить.файловый.источник..В.качестве.примера.настроим.его.на. чтение.файла.конфигурации.Kafka.—.фактически.передадим.конфигурацию.Kafka. в.тему.Kafka: echo '{"name":"load-kafka-config", "config":{"connector.class":"FileStreamSource", "file":"config/server.properties","topic":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "contentType:application/json" {"name":"load-kafka-config","config":{"connector.class":"FileStreamSource", "file":"config/server.properties","topic":"kafka-config-topic","name":"load-kafka- config"},"tasks":[]} Для.создания.коннектора.мы.написали.JSON-текст,.включающий.название.коннек- тора.—. load-kafka-config .—.и.ассоциативный.массив.его.настроек,.включающий. класс.коннектора,.загружаемый.файл.и.тему,.в.которую.мы.хотим.его.загрузить. Воспользуемся.консольным.потребителем.Kafka.для.проверки.загрузки.настроек. в.тему: gwen$ bin/kafka-console-consumer.sh --new-consumer --bootstrap-server=localhost:9092 --topic kafka-config-topic --from-beginning Если.все.прошло.успешно,.вы.увидите.что-то.вроде: {"schema":{"type":"string","optional":false},"payload":"# Licensed to the Apache Software Foundation (ASF) under one or more"} {"schema":{"type":"string","optional":false},"pay- load":"############################# Server Basics #############################"} {"schema":{"type":"string","optional":false},"payload":""} {"schema":{"type":"string","optional":false},"payload":"# The id of the broker. This must be set to a unique integer for each broker."} {"schema":{"type":"string","optional":false},"payload":"broker.id=0"} {"schema":{"type":"string","optional":false},"payload":""} 172 Глава 7 • Создание конвейеров данных Фактически.это.содержимое.файла. config/server.properties ,.преобразованного. нашим.коннектором.построчно.в.формат.JSON.и.помещенного.в. kafka-config- topic ..Обратите.внимание.на.то,.что.по.умолчанию.преобразователь.формата.JSON. вставляет.схему.в.каждую.запись..В.данном.случае.схема.очень.проста.—.всего.один. столбец. payload .типа. string ,.содержащий.по.одной.строке.из.файла.для.каждой. записи. А.сейчас.воспользуемся.преобразователем.формата.файлового.приемника.для. сброса.содержимого.этой.темы.в.файл..Итоговый.файл.должен.оказаться.точно. таким.же,.как.и.исходный. config/server.properties ,.поскольку.преобразователь. JSON.преобразует.записи.в.формате.JSON.в.обычные.текстовые.строки: echo '{"name":"dump-kafka-config", "config": {"connector.class":"FileStreamSink","file":"copy-of-server- properties","topics":"kafka-config-topic"}}' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json" {"name":"dump-kafka-config","config": {"connector.class":"FileStreamSink","file":"copy-of-server- properties","topics":"kafka-config-topic","name":"dump-kafka-config"},"tasks": []} Обратите.внимание.на.отличие.от.исходных.настроек:.сейчас.мы.используем.класс. FileStreamSink ,.а.не. FileStreamSource ..У.нас.по-прежнему.есть.свойство. file ,.но. теперь.оно.указывает.на.целевой.файл,.а.не.на.источник.записей,.и.вместо. topic мы.указываем. topics ..Внимание,.множественное.число!.С.помощью.приемника. можно.записывать.несколько.тем.в.один.файл,.в.то.время.как.источник.позволяет. записывать.только.в.одну.тему. В.случае.успешного.выполнения.вы.получите.файл. copy-of-server-properties ,. совершенно.идентичный.файлу. config/server.properties ,.на.основе.которого.мы. заполняли. kafka-config-topic Удалить.коннектор.можно.с.помощью.команды: curl -X DELETE http://localhost:8083/connectors/dump-kafka-config Если.вы.заглянете.в.журнал.исполнителя.Connect.после.удаления.коннектора,.то. обнаружите,.что.все.остальные.коннекторы.перезапускают.задания..Это.происхо- дит.для.перераспределения.оставшихся.задач.между.исполнителями.и.обеспечения. равномерной.нагрузки.после.удаления.коннектора. Пример коннектора: из MySQL в Elasticsearch Теперь,.когда.заработал.простой.пример,.пора.заняться.чем-то.более.полез- ным..Возьмем.таблицу.MySQL,.отправим.ее.в.тему.Kafka,.загрузим.оттуда. в.Elasticsearch.и.проиндексируем.ее.содержимое. Kafka Connect 173 Мы.выполняем.эксперименты.на.MacBook..Для.установки.MySQL.и.Elasticsearch. достаточно.выполнить.команды: brew install mysql brew install elasticsearch Следующий.шаг.—.проверить.наличие.коннекторов..Если.вы.работаете.с.Confluent. OpenSource,.то.они.уже.должны.быть.установлены.как.часть.платформы..В.про- тивном.случае.можно.выполнить.сборку.коннекторов.из.кода.в.GitHub. 1.. Перейдите.по.адресу. https://github.com/confluentinc/kafka-connect-elasticsearch 2.. Клонируйте.этот.репозиторий. 3.. Выполните.команду. mvn install .для.сборки.проекта. 4.. Повторите.эти.действия.для.коннектора.JDBC.( https://github.com/confluentinc/ kafka-connect-jdbc ). Теперь.скопируйте.JAR-файлы,.появившиеся.в.результате.сборки.в.подкаталогах. target .каталогов,.в.которых.выполнялась.сборка.коннекторов,.в.место,.соответ- ствующее.пути.к.классам.Kafka.Connect: gwen$ mkdir libs gwen$ cp ../kafka-connect-jdbc/target/kafka-connect-jdbc-3.1.0-SNAPSHOT.jar libs/ gwen$ cp ../kafka-connect-elasticsearch/target/kafka-connect- elasticsearch-3.2.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/* libs/ Если.исполнители.Kafka.Connect.еще.не.запущены,.сделайте.это.и.проверьте,.что. новые.плагины.коннекторов.перечислены.в.списке: gwen$ bin/connect-distributed.sh config/connect-distributed.properties & gwen$ curl http://localhost:8083/connector-plugins [{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector"}, {"class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"}, {"class":"org.apache.kafka.connect.file.FileStreamSinkConnector"}, {"class":"io.confluent.connect.jdbc.JdbcSourceConnector"}] Как.видите,.в.кластере.теперь.доступны.новые.плагины.коннекторов..JDBC- источнику.требуется.драйвер.MySQL.для.работы.с.MySQL..Мы.скачали.JDBC- драйвер.для.MySQL.с.сайта.Oracle,.разархивировали.его.и.скопировали.файл. mysql-connector-java-5.1.40-bin.jar .в.каталог. libs/ .при.копировании.коннек- торов. Следующий.шаг.—.создание.таблицы.в.базе.данных.MySQL,.которую.потом.можно. будет.передать.в.Kafka.с.помощью.JDBC-коннектора: gwen$ mysql.server restart mysql> create database test; |