Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
174 Глава 7 • Создание конвейеров данных Query OK, 1 row affected (0.00 sec) mysql> use test; Database changed mysql> create table login (username varchar(30), login_time datetime); Query OK, 0 rows affected (0.02 sec) mysql> insert into login values ('gwenshap', now()); Query OK, 1 row affected (0.01 sec) mysql> insert into login values ('tpalino', now()); Query OK, 1 row affected (0.00 sec) mysql> commit; Query OK, 0 rows affected (0.01 sec) Как.видите,.мы.создали.базу.данных.и.таблицу.и.вставили.в.нее.несколько.строк. для.примера. Следующий.шаг.—.настройка.коннектора.JDBC-источника..Можно.прочитать. о.возможностях.настройки.в.документации,.а.можно.воспользоваться.API.REST. для.их.выяснения: gwen$ curl -X PUT -d "{}" localhost:8083/connector-plugins/JdbcSourceConnector/ config/validate --header "content-Type:application/json" | python -m json.tool { "configs": [ { "definition": { "default_value": "", "dependents": [], "display_name": "Timestamp Column Name", "documentation": "The name of the timestamp column to use to detect new or modified rows. This column may not be nullable.", "group": "Mode", "importance": "MEDIUM", "name": "timestamp.column.name", "order": 3, "required": false, "type": "STRING", "width": "MEDIUM" }, По.сути,.мы.запросили.у.API.REST.проверку.настроек.коннектора,.передав.их. пустой.перечень..В.качестве.ответа.получили.JSON-описание.всех.доступных.на- строек.(для.повышения.удобочитаемости.результат.пропустили.через.Python). Теперь.приступим.к.созданию.и.настройке.JDBC-коннектора: echo '{"name":"mysql-login-connector", "config":{"connector.class":"JdbcSource- Connector","connection.url":"jdbc:mysql://127.0.0.1:3306/test? user=root", Kafka Connect 175 "mode":"timestamp","table.whitelist":"login","validate.non.null":false,"timestamp. column.name":"login_time","topic.prefix":"mysql."}}' | curl -X POST -d @- http:// localhost:8083/connectors --header "content-Type:application/json" {"name":"mysql-login-connector","config":{"connector.class":"JdbcSourceConnector", "connection.url":"jdbc:mysql://127.0.0.1:3306/test? user=root","mode":"timestamp","table.whitelist":"login","validate. non.null":"false","timestamp.column.name":"login_time","topic. prefix":"mysql.","name":"mysql-login-connector"},"tasks":[]} Прочитаем.данные.из.темы. mysql.login ,.чтобы.убедиться,.что.она.работает: gwen$ bin/kafka-console-consumer.sh --new --bootstrap-server=localhost:9092 -- topic mysql.login --from-beginning {"schema":{"type":"struct","fields": [{"type":"string","optional":true,"field":"username"}, {"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Time- stamp","version":1,"field":"login_time"}],"optional":false,"name":"login"}, "payload":{"username":"gwenshap","login_time":1476423962000}} {"schema":{"type":"struct","fields": [{"type":"string","optional":true,"field":"username"}, {"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Time- stamp","version":1,"field":"login_time"}],"optional":false,"name":"login"}, "payload":{"username":"tpalino","login_time":1476423981000}} Если.вы.не.увидите.никаких.данных.или.получите.сообщение,.гласящее,.что.темы. не.существует,.поищите.в.журналах.Connect.следующие.ошибки: [2016-10-16 19:39:40,482] ERROR Error while starting connector mysql-login- connector (org.apache.kafka.connect.runtime.WorkerConnector:108) org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Access denied for user 'root;'@'localhost' (using password: NO) at io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector. java:78) С.первого.раза.обычно.не.получается.задать.правильную.строку.соединения.. Среди.возможных.проблем.также.отсутствие.драйвера.по.пути.к.классам.или.прав. на.чтение.таблицы. Обратите.внимание.на.то,.что.вставленные.во.время.работы.коннектора.в.таблицу. login .дополнительные.строки.должны.сразу.же.отразиться.в.теме. mysql.login Передача.данных.из.MySQL.в.Kafka.полезна.и.сама.по.себе,.но.пойдем.еще.дальше. и.запишем.эти.данные.в.Elasticsearch. Во-первых,.запустим.Elasticsearch.и.проверим.его.работу,.обратившись.к.соответ- ствующему.локальному.порту: gwen$ elasticsearch & gwen$ curl http://localhost:9200/ { 176 Глава 7 • Создание конвейеров данных "name" : "Hammerhead", "cluster_name" : "elasticsearch_gwen", "cluster_uuid" : "42D5GrxOQFebf83DYgNl-g", "version" : { "number" : "2.4.1", "build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16", "build_timestamp" : "2016-09-27T18:57:55Z", "build_snapshot" : false, "lucene_version" : "5.5.2" }, "tagline" : "You Know, for Search" } Теперь.запустим.коннектор: echo '{"name":"elastic-login-connector", "config":{"connector.class": "ElasticsearchSinkConnector","connection.url":"http://localhost:9200","type. name":"mysql-data","topics":"mysql.login","key.ignore":true}}' | curl -X POST -d @- http://localhost:8083/connectors --header "contentType:application/json" {"name":"elastic-login-connector","config":{"connector.class": "ElasticsearchSinkConnector","connection.url":"http://localhost:9200","type.name": "mysqldata","topics":"mysql.login","key.ignore":"true","name":"elastic-login- connector"},"tasks":[{"connector":"elastic-login-connector","task":0}]} Здесь.есть.несколько.настроек,.которые.требуют.пояснений.. connection.url .—. просто.URL.локального.сервера.Elasticsearch,.который.мы.настроили.ранее.. Каждая.тема.в.Kafka.по.умолчанию.становится.отдельным.индексом.Elasticsearch. (остается.название.темы)..Внутри.темы.необходимо.описать.тип.записываемых. данных..Мы.считаем,.что.тип.у.всех.событий.в.теме.одинаков,.так.что.просто. «зашиваем».его:. type.name=mysql-data ..Записываем.в.Elasticsearch.только.одну. тему.—. mysql.login ..При.описании.таблицы.в.MySQL.мы.не.задали.для.нее.пер- вичного.ключа..В.результате.ключи.событий.в.Kafka.не.определены..А.поскольку. у.событий.в.Kafka.нет.ключей,.необходимо.сообщить.коннектору.Elasticsearch,. чтобы.в.качестве.ключей.событий.он.использовал.название.темы,.идентифика- торы.разделов.и.смещения. Проверим,.что.индекс.с.данными.таблицы. mysql.login .создан: gwen$ curl 'localhost:9200/_cat/indices?v' health status index pri rep docs.count docs.deleted store.size pri.store.size yellow open mysql.login 5 1 3 0 10.7kb 10.7kb Если.индекс.не.найден,.поищите.ошибки.в.журнале.исполнителя.Connect..Зачастую. причиной.ошибок.становится.отсутствие.параметров.или.библиотек..Если.все.в.по- рядке,.можем.поискать.в.индексе.наши.записи: gwen$ curl -s -X "GET" "http://localhost:9200/mysql.login/_search?pretty=true" { Kafka Connect 177 "took" : 29, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 3, "max_score" : 1.0, "hits" : [ { "_index" : "mysql.login", "_type" : "mysql-data", "_id" : "mysql.login+0+1", "_score" : 1.0, "_source" : { "username" : "tpalino", "login_time" : 1476423981000 } }, { "_index" : "mysql.login", "_type" : "mysql-data", "_id" : "mysql.login+0+2", "_score" : 1.0, "_source" : { "username" : "nnarkede", "login_time" : 1476672246000 } }, { "_index" : "mysql.login", "_type" : "mysql-data", "_id" : "mysql.login+0+0", "_score" : 1.0, "_source" : { "username" : "gwenshap", "login_time" : 1476423962000 } } ] } } Если.добавить.новые.записи.в.таблицу.в.MySQL,.они.автоматически.появятся. в.теме. mysql.login .в.Kafka.и.соответствующем.индексе.Elasticsearch. Теперь,.посмотрев.на.сборку.и.установку.JDBC-источников.и.приемников.Ela- sticsearch,.мы.сможем.собрать.и.использовать.любую.пару.подходящих.для.нашего. сценария.коннекторов..Confluent.поддерживает.список.всех.известных.коннекторов. ( http://www.confluent.io/product/connectors/), .включая.поддерживаемые.как.различными. компаниями,.так.и.сообществом.разработчиков..Можете.выбрать.из.списка.любой. коннектор,.какой.вам.только.хочется.попробовать,.собрать.его.из.кода.в.GitHub. репозитории,.настроить.—.прочитав.документацию.или.получив.настройки.из.API. REST.—.и.запустить.его.на.своем.кластере.исполнителей.Connect. 178 Глава 7 • Создание конвейеров данных Создание своих собственных коннекторов API коннекторов общедоступен, так что каждый может создать новый коннектор. На самом деле именно так большинство их в Connector Hub и появилось — люди создавали коннекторы и сообщали нам о них. Так что мы призываем вас написать собственный коннектор, если хранилище данных, к которому вам нужно подклю- читься, в Connector Hub отсутствует. Можете даже представить его сообществу разработчиков, чтобы другие люди его увидели и смогли использовать. Обсуж- дение всех нюансов создания коннектора выходит за рамки данной главы, но вы можете узнать о них из официальной документации (http://docs.confluent.io/3.0.1/ connect/devguide.html). Мы также рекомендуем рассмотреть уже существующие коннекторы в качестве образцов и, возможно, начать с применения шаблона maven (http://www.bit.ly/2sc9E9q). Мы всегда будем рады вашим вопросам и прось- бам о помощи, а также демонстрации новых коннекторов в почтовой рассылке сообщества разработчиков Apache Kafka (users@kafka.apache.org). Взглянем на Connect поближе Чтобы.понять,.как.работает.Connect,.необходимо.разобраться.с.тремя.основными. его.понятиями.и.их.взаимодействием.друг.с.другом..Как.мы.уже.объясняли.и.де- монстрировали.на.примерах,.для.использования.Connect.вам.нужен.работающий. кластер.исполнителей.и.потребуется.запускать/останавливать.коннекторы..Еще. один.нюанс,.в.который.мы.ранее.особо.не.углублялись,.—.обработка.данных.пре- образователями.форматов.—.компонентами,.преобразующими.строки.MySQL. в.записи.JSON,.заносимые.в.Kafka.коннектором. Заглянем.в.каждую.из.систем.чуть.глубже.и.разберемся,.как.они.взаимодействуют. друг.с.другом. Коннекторы и задачи Плагины.коннекторов.реализуют.API.коннекторов,.состоящее.из.двух.частей. Коннекторы..Отвечают.за.выполнение.трех.важных.вещей: y определение.числа.задач.для.коннектора; y разбиение.работы.по.копированию.данных.между.задачами; y получение.от.исполнителей.настроек.для.задач.и.передачу.их.далее. Например,.коннектор.JDBC-источника.подключается.к.базе.данных,.находит. таблицы.для.копирования.и.на.основе.этой.информации.определяет,.сколько. требуется.задач,.выбирая.меньшее.из.значений.параметра. max.tasks .и.числа. таблиц..После.этого.он.генерирует.конфигурацию.для.каждой.из.задач.на.основе. своих.настроек.(например,.параметра. connection.url ).и.списка.таблиц,.которые. должны.будут.копировать.все.задачи..Метод. taskConfigs() .возвращает.список. ассоциативных.массивов,.то.есть.настроек.для.каждой.из.запускаемых.задач.. Исполнители.отвечают.за.дальнейший.запуск.задач.и.передачу.каждой.из.них. Kafka Connect 179 ее.индивидуальных.настроек,.на.основе.которых.она.должна.будет.скопировать. уникальный.набор.таблиц.из.базы.данных..Отметим,.что.коннектор.при.запуске. посредством.API.REST.может.быть.запущен.на.любом.узле,.а.значит,.и.запуска- емые.им.задачи.тоже.могут.выполняться.на.любом.из.узлов. Задачи..Отвечают.за.получение.данных.из.Kafka.и.вставку.туда.данных..Испол- нители.инициализируют.все.задачи.путем.передачи.контекста..Контекст.ис- точника.включает.объект,.предназначенный.для.хранения.задачей.смещений. записей.источника.(например,.в.файловом.коннекторе.смещения.представля- ют.собой.позиции.в.файле;.в.коннекторе.JDBC-источника.они.могут.быть. значениями.первичного.ключа.таблицы)..Контекст.для.коннектора.приемни- ка.включает.методы,.с.помощью.которых.он.может.контролировать.получае- мые.из.Kafka.записи..Они.используются,.в.частности,.для.приостановки.об- ратного.потока.данных,.а.также.повторения.отправки.и.сохранения.смещений. во.внешнем.хранилище.для.обеспечения.строго.однократной.доставки..После. инициализации.задания.запускаются.с.объектом. Properties ,.содержащим.на- стройки,.созданные.для.данной.задачи.коннектором..После.запуска.задачи. источника.опрашивают.внешнюю.систему.и.возвращают.списки.записей,.от- правляемые.исполнителем.брокерам.Kafka..Задачи.приемника.получают.за- писи.из.Kafka.через.исполнитель.и.отвечают.за.отправку.этих.записей.во.внеш- нюю.систему. Исполнители Процессы-исполнители.Kafka.Connect.представляют.собой.процессы-контейнеры,. выполняющие.коннекторы.и.задачи..Они.отвечают.за.обработку.HTTP-запросов. с.описанием.коннекторов.и.их.настроек,.а.также.за.хранение.настроек.коннекто- ров,.запуск.коннекторов.и.их.задач,.включая.передачу.соответствующих.настроек.. В.случае.останова.или.аварийного.сбоя.процесса-исполнителя.кластер.узнает.об. этом.из.контрольных.сигналов.протокола.потребителей.Kafka,.и.он.переназначает. работающие.на.данном.исполнителе.коннекторы.и.задачи.оставшимся.исполни- телям..Другие.исполнители.тоже.заметят,.если.к.кластеру.Connect.присоединится. новый.исполнитель,.и.назначат.ему.коннекторы.или.задачи,.чтобы.равномерно. распределить.нагрузку.между.всеми.исполнителями. Исполнители.отвечают.также.за.автоматическую.фиксацию.смещений.для.коннек- торов.как.источника,.так.и.приемника.и.за.выполнение.повторов.в.случае.генера- ции.задачами.исключений..Чтобы.разобраться.в.том,.что.такое.исполнители,.лучше. всего.представить.себе,.что.коннекторы.и.задачи.отвечают.за.ту.часть.интеграции. данных,.которая.относится.к.их.перемещению,.а.исполнители.отвечают.за.API. REST,.управление.настройками,.надежность,.высокую.доступность,.масштабиро- вание.и.распределение.нагрузки. Если.сравнивать.с.классическими.API.потребителей/производителей,.то.главное. преимущество.API.Connect.состоит.именно.в.этом.разделении.обязанностей.. 180 Глава 7 • Создание конвейеров данных Опытные.разработчики.знают,.что.написание.кода.для.чтения.данных.из.Kafka. и.вставки.их.в.базу.данных.занимает,.наверное,.день.или.два..Но.если.вдобавок. нужно.отвечать.за.настройки,.ошибки,.API.REST,.мониторинг,.развертывание,. повышающее.и.понижающее.вертикальное.масштабирование,.а.также.обработку. сбоев,.то.реализация.всего.этого.может.занять.несколько.месяцев..При.воплощении. в.жизнь.копирования.данных.с.помощью.коннекторов.последние.подключаются. к.исполнителям,.которые.берут.на.себя.заботы.о.множестве.сложных.эксплуата- ционных.вопросов,.так.что.вам.не.придется.об.этом.беспокоиться. Преобразователи форматов и модель данных Connect Последний.фрагмент.пазла.API.Connect.—.модель.данных.коннектора.и.преоб- разователи.форматов..API.Kafka.Connect.включают.API.данных,.который.в.свою. очередь.включает.как.объекты.данных,.так.и.описывающие.эти.данные.схемы.. Например,.JDBC-источник.читает.столбец.из.базы.данных.и.формирует.объект. Connect. Schema .на.основе.типов.данных.столбцов,.которые.вернула.база.данных.. Для.каждого.столбца.сохраняются.имя.столбца.и.значение..Все.коннекторы.ис- точников.выполняют.схожие.функции.—.читают.события.из.системы.источника. и.генерируют.пары. Schema / Value ..У.коннекторов.приемников.функции.противо- положные.—.они.получают.пары. Schema / Value .и.используют.объекты. Schema .для. разбора.значений.и.вставки.их.в.целевую.систему. Хотя.коннекторы.источников.знают,.как.генерировать.объекты.на.основе.API. данных,.остается.актуальным.вопрос.о.сохранении.этих.объектов.в.Kafka.исполни- телями.Connect..Именно.тут.вносят.свой.вклад.преобразователи.форматов..Поль- зователи,.настраивая.исполнитель.(или.коннектор),.выбирают.преобразователь. формата,.который.будет.применяться.для.сохранения.данных.в.Kafka..В.настоящий. момент.в.этом.качестве.можно.использовать.Avro,.JSON.и.строки..Преобразователь. JSON.можно.настроить.так,.чтобы.он.включал.или.не.включал.схему.в.итоговую. запись,.таким.образом.можно.обеспечить.поддержку.как.структурированных,.так. и.полуструктурированных.данных..Получив.от.коннектора.запись.API.данных,. исполнитель.с.помощью.уже.настроенного.преобразователя.формата.преобразует. запись.в.объект.Avro,.JSON.или.строковое.значение,.после.чего.сохраняет.результат. в.Kafka. Противоположное.происходит.с.коннекторами.приемников..Исполнитель.Connect,. прочитав.запись.из.Kafka,.с.помощью.уже.настроенного.преобразователя.преоб- разует.запись.из.формата.Kafka.(Avro,.JSON.или.строковое.значение).в.запись.API. данных.Connect,.после.чего.передает.ее.коннектору.приемника,.вставляющему.ее. в.целевую.систему..Благодаря.этому.API.Connect.может.поддерживать.различ- ные.типы.хранимых.в.Kafka.данных.вне.зависимости.от.реализации.коннекторов. (то.есть.можно.использовать.любой.коннектор.для.любого.типа.записей,.был.бы. только.доступен.преобразователь.формата). Альтернативы Kafka Connect 181 Управление смещениями Управление.смещениями.—.один.из.удобных.сервисов,.предоставляемых.испол- нителями.коннекторам.(помимо.развертывания.и.управления.настройками.через. API.REST)..Суть.его.в.том,.что.коннекторам.необходимо.знать,.какие.данные.они. уже.обработали,.и.они.могут.воспользоваться.предоставляемыми.Kafka.API.для. хранения.информации.о.том,.какие.события.уже.обработаны. Для.коннекторов.источников.это.значит,.что.записи,.возвращаемые.коннектором. исполнителям.Connect,.включают.информацию.о.логическом.разделе.и.логиче- ском.смещении..Это.не.разделы.и.смещения.Kafka,.а.разделы.и.смещения.в.том. виде,.в.каком.они.нужны.в.системе.источника..Например,.в.файловом.источнике. раздел.может.быть.файлом,.а.смещение.—.номером.строки.или.символа.в.этом. файле..В.JDBC-источнике.раздел.может.быть.таблицей.базы.данных,.а.смеще- ние.—.идентификатором.записи.в.таблице..При.написании.коннектора.источника. нужно.принять.одно.из.важнейших.проектных.решений:.как.секционировать.дан- ные.в.системе.источника.и.как.отслеживать.смещения..Оно.влияет.на.возможный. уровень.параллелизма.коннектора,.а.также.вероятность.обеспечения.по.крайней. мере.однократной.или.строго.однократной.доставки. После.возвращения.коннектором.источника.списка.записей,.включающего.раз- делы.и.смещения.в.источнике.для.всех.записей,.исполнитель.отправляет.записи. брокерам.Kafka..Если.брокеры.сообщили,.что.получили.записи,.исполнитель. сохраняет.смещения.отправленных.в.Kafka.записей..Механизм.хранения.явля- ется.подключаемым,.обычно.это.тема.Kafka..Благодаря.этому.коннекторы.могут. начинать.обработку.событий.с.последнего.сохраненного.после.перезапуска.или. аварийного.сбоя.смещения. Последовательность.выполняемых.коннекторами.приемников.действий.аналогич- на.с.точностью.до.наоборот:.они.читают.записи.Kafka,.в.которых.уже.есть.иденти- фикаторы.темы,.раздела.и.смещения..Затем.вызывают.метод. put() .коннектора.для. сохранения.этих.записей.в.целевой.системе..В.случае.успешного.выполнения.этих. действий.они.фиксируют.переданные.коннектору.смещения.в.Kafka.с.помощью. обычных.методов.фиксации.потребителей. Отслеживание.смещений.самим.фреймворком.должно.облегчить.разработчикам. задачу.написания.коннекторов.и.до.определенной.степени.гарантировать.согласо- ванное.поведение.при.использовании.различных.коннекторов. Альтернативы Kafka Connect Мы.подробно.рассмотрели.API.Kafka.Connect..Хотя.нам.очень.понравились. их.удобство.и.надежность,.эти.API.—.не.единственный.метод.передачи.данных. в.Kafka.и.из.нее..Посмотрим,.какие.еще.варианты.существуют.и.как.их.обычно. применяют. |