Главная страница

Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


Скачать 7.59 Mb.
НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Дата21.06.2022
Размер7.59 Mb.
Формат файлаpdf
Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
ТипДокументы
#609074
страница22 из 39
1   ...   18   19   20   21   22   23   24   25   ...   39
174 Глава 7 • Создание конвейеров данных
Query OK, 1 row affected (0.00 sec)
mysql> use test;
Database changed mysql> create table login (username varchar(30), login_time datetime);
Query OK, 0 rows affected (0.02 sec)
mysql> insert into login values ('gwenshap', now());
Query OK, 1 row affected (0.01 sec)
mysql> insert into login values ('tpalino', now());
Query OK, 1 row affected (0.00 sec)
mysql> commit;
Query OK, 0 rows affected (0.01 sec)
Как.видите,.мы.создали.базу.данных.и.таблицу.и.вставили.в.нее.несколько.строк.
для.примера.
Следующий.шаг.—.настройка.коннектора.JDBC-источника..Можно.прочитать.
о.возможностях.настройки.в.документации,.а.можно.воспользоваться.API.REST.
для.их.выяснения:
gwen$ curl -X PUT -d "{}" localhost:8083/connector-plugins/JdbcSourceConnector/
config/validate --header "content-Type:application/json" | python -m json.tool
{
"configs": [
{
"definition": {
"default_value": "",
"dependents": [],
"display_name": "Timestamp Column Name",
"documentation": "The name of the timestamp column to use to detect new or modified rows. This column may not be nullable.",
"group": "Mode",
"importance": "MEDIUM",
"name": "timestamp.column.name",
"order": 3,
"required": false,
"type": "STRING",
"width": "MEDIUM"
},

По.сути,.мы.запросили.у.API.REST.проверку.настроек.коннектора,.передав.их.
пустой.перечень..В.качестве.ответа.получили.JSON-описание.всех.доступных.на- строек.(для.повышения.удобочитаемости.результат.пропустили.через.Python).
Теперь.приступим.к.созданию.и.настройке.JDBC-коннектора:
echo '{"name":"mysql-login-connector", "config":{"connector.class":"JdbcSource-
Connector","connection.url":"jdbc:mysql://127.0.0.1:3306/test? user=root",

Kafka Connect 175
"mode":"timestamp","table.whitelist":"login","validate.non.null":false,"timestamp.
column.name":"login_time","topic.prefix":"mysql."}}' | curl -X POST -d @- http://
localhost:8083/connectors --header "content-Type:application/json"
{"name":"mysql-login-connector","config":{"connector.class":"JdbcSourceConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/test?
user=root","mode":"timestamp","table.whitelist":"login","validate.
non.null":"false","timestamp.column.name":"login_time","topic.
prefix":"mysql.","name":"mysql-login-connector"},"tasks":[]}
Прочитаем.данные.из.темы.
mysql.login
,.чтобы.убедиться,.что.она.работает:
gwen$ bin/kafka-console-consumer.sh --new --bootstrap-server=localhost:9092 -- topic mysql.login --from-beginning

{"schema":{"type":"struct","fields":
[{"type":"string","optional":true,"field":"username"},
{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Time- stamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},
"payload":{"username":"gwenshap","login_time":1476423962000}}
{"schema":{"type":"struct","fields":
[{"type":"string","optional":true,"field":"username"},
{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Time- stamp","version":1,"field":"login_time"}],"optional":false,"name":"login"},
"payload":{"username":"tpalino","login_time":1476423981000}}
Если.вы.не.увидите.никаких.данных.или.получите.сообщение,.гласящее,.что.темы.
не.существует,.поищите.в.журналах.Connect.следующие.ошибки:
[2016-10-16 19:39:40,482] ERROR Error while starting connector mysql-login- connector (org.apache.kafka.connect.runtime.WorkerConnector:108)
org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Access denied for user 'root;'@'localhost' (using password: NO)
at io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.
java:78)
С.первого.раза.обычно.не.получается.задать.правильную.строку.соединения..
Среди.возможных.проблем.также.отсутствие.драйвера.по.пути.к.классам.или.прав.
на.чтение.таблицы.
Обратите.внимание.на.то,.что.вставленные.во.время.работы.коннектора.в.таблицу.
login
.дополнительные.строки.должны.сразу.же.отразиться.в.теме.
mysql.login
Передача.данных.из.MySQL.в.Kafka.полезна.и.сама.по.себе,.но.пойдем.еще.дальше.
и.запишем.эти.данные.в.Elasticsearch.
Во-первых,.запустим.Elasticsearch.и.проверим.его.работу,.обратившись.к.соответ- ствующему.локальному.порту:
gwen$ elasticsearch &
gwen$ curl http://localhost:9200/
{

176 Глава 7 • Создание конвейеров данных "name" : "Hammerhead",
"cluster_name" : "elasticsearch_gwen",
"cluster_uuid" : "42D5GrxOQFebf83DYgNl-g",
"version" : {
"number" : "2.4.1",
"build_hash" : "c67dc32e24162035d18d6fe1e952c4cbcbe79d16",
"build_timestamp" : "2016-09-27T18:57:55Z",
"build_snapshot" : false,
"lucene_version" : "5.5.2"
},
"tagline" : "You Know, for Search"
}
Теперь.запустим.коннектор:
echo '{"name":"elastic-login-connector", "config":{"connector.class":
"ElasticsearchSinkConnector","connection.url":"http://localhost:9200","type.
name":"mysql-data","topics":"mysql.login","key.ignore":true}}' |
curl -X POST -d @- http://localhost:8083/connectors --header
"contentType:application/json"
{"name":"elastic-login-connector","config":{"connector.class":
"ElasticsearchSinkConnector","connection.url":"http://localhost:9200","type.name":
"mysqldata","topics":"mysql.login","key.ignore":"true","name":"elastic-login- connector"},"tasks":[{"connector":"elastic-login-connector","task":0}]}
Здесь.есть.несколько.настроек,.которые.требуют.пояснений..
connection.url
.—.
просто.URL.локального.сервера.Elasticsearch,.который.мы.настроили.ранее..
Каждая.тема.в.Kafka.по.умолчанию.становится.отдельным.индексом.Elasticsearch.
(остается.название.темы)..Внутри.темы.необходимо.описать.тип.записываемых.
данных..Мы.считаем,.что.тип.у.всех.событий.в.теме.одинаков,.так.что.просто.
«зашиваем».его:.
type.name=mysql-data
..Записываем.в.Elasticsearch.только.одну.
тему.—.
mysql.login
..При.описании.таблицы.в.MySQL.мы.не.задали.для.нее.пер- вичного.ключа..В.результате.ключи.событий.в.Kafka.не.определены..А.поскольку.
у.событий.в.Kafka.нет.ключей,.необходимо.сообщить.коннектору.Elasticsearch,.
чтобы.в.качестве.ключей.событий.он.использовал.название.темы,.идентифика- торы.разделов.и.смещения.
Проверим,.что.индекс.с.данными.таблицы.
mysql.login
.создан:
gwen$ curl 'localhost:9200/_cat/indices?v'
health status index pri rep docs.count docs.deleted store.size pri.store.size yellow open mysql.login 5 1 3 0 10.7kb
10.7kb
Если.индекс.не.найден,.поищите.ошибки.в.журнале.исполнителя.Connect..Зачастую.
причиной.ошибок.становится.отсутствие.параметров.или.библиотек..Если.все.в.по- рядке,.можем.поискать.в.индексе.наши.записи:
gwen$ curl -s -X "GET" "http://localhost:9200/mysql.login/_search?pretty=true"
{

Kafka Connect 177
"took" : 29,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 1.0,
"hits" : [ {
"_index" : "mysql.login",
"_type" : "mysql-data",
"_id" : "mysql.login+0+1",
"_score" : 1.0,
"_source" : {
"username" : "tpalino",
"login_time" : 1476423981000
}
}, {
"_index" : "mysql.login",
"_type" : "mysql-data",
"_id" : "mysql.login+0+2",
"_score" : 1.0,
"_source" : {
"username" : "nnarkede",
"login_time" : 1476672246000
}
}, {
"_index" : "mysql.login",
"_type" : "mysql-data",
"_id" : "mysql.login+0+0",
"_score" : 1.0,
"_source" : {
"username" : "gwenshap",
"login_time" : 1476423962000
}
} ]
}
}
Если.добавить.новые.записи.в.таблицу.в.MySQL,.они.автоматически.появятся.
в.теме.
mysql.login
.в.Kafka.и.соответствующем.индексе.Elasticsearch.
Теперь,.посмотрев.на.сборку.и.установку.JDBC-источников.и.приемников.Ela- sticsearch,.мы.сможем.собрать.и.использовать.любую.пару.подходящих.для.нашего.
сценария.коннекторов..Confluent.поддерживает.список.всех.известных.коннекторов.
(
http://www.confluent.io/product/connectors/),
.включая.поддерживаемые.как.различными.
компаниями,.так.и.сообществом.разработчиков..Можете.выбрать.из.списка.любой.
коннектор,.какой.вам.только.хочется.попробовать,.собрать.его.из.кода.в.GitHub.
репозитории,.настроить.—.прочитав.документацию.или.получив.настройки.из.API.
REST.—.и.запустить.его.на.своем.кластере.исполнителей.Connect.

178 Глава 7 • Создание конвейеров данных
Создание своих собственных коннекторов
API коннекторов общедоступен, так что каждый может создать новый коннектор.
На самом деле именно так большинство их в Connector Hub и появилось — люди создавали коннекторы и сообщали нам о них. Так что мы призываем вас написать собственный коннектор, если хранилище данных, к которому вам нужно подклю- читься, в Connector Hub отсутствует. Можете даже представить его сообществу разработчиков, чтобы другие люди его увидели и смогли использовать. Обсуж- дение всех нюансов создания коннектора выходит за рамки данной главы, но вы можете узнать о них из официальной документации (http://docs.confluent.io/3.0.1/
connect/devguide.html). Мы также рекомендуем рассмотреть уже существующие коннекторы в качестве образцов и, возможно, начать с применения шаблона maven (http://www.bit.ly/2sc9E9q). Мы всегда будем рады вашим вопросам и прось- бам о помощи, а также демонстрации новых коннекторов в почтовой рассылке сообщества разработчиков Apache Kafka (users@kafka.apache.org).
Взглянем на Connect поближе
Чтобы.понять,.как.работает.Connect,.необходимо.разобраться.с.тремя.основными.
его.понятиями.и.их.взаимодействием.друг.с.другом..Как.мы.уже.объясняли.и.де- монстрировали.на.примерах,.для.использования.Connect.вам.нужен.работающий.
кластер.исполнителей.и.потребуется.запускать/останавливать.коннекторы..Еще.
один.нюанс,.в.который.мы.ранее.особо.не.углублялись,.—.обработка.данных.пре- образователями.форматов.—.компонентами,.преобразующими.строки.MySQL.
в.записи.JSON,.заносимые.в.Kafka.коннектором.
Заглянем.в.каждую.из.систем.чуть.глубже.и.разберемся,.как.они.взаимодействуют.
друг.с.другом.
Коннекторы и задачи
Плагины.коннекторов.реализуют.API.коннекторов,.состоящее.из.двух.частей.
‰
‰
Коннекторы..Отвечают.за.выполнение.трех.важных.вещей:
y определение.числа.задач.для.коннектора;
y разбиение.работы.по.копированию.данных.между.задачами;
y получение.от.исполнителей.настроек.для.задач.и.передачу.их.далее.
Например,.коннектор.JDBC-источника.подключается.к.базе.данных,.находит.
таблицы.для.копирования.и.на.основе.этой.информации.определяет,.сколько.
требуется.задач,.выбирая.меньшее.из.значений.параметра.
max.tasks
.и.числа.
таблиц..После.этого.он.генерирует.конфигурацию.для.каждой.из.задач.на.основе.
своих.настроек.(например,.параметра.
connection.url
).и.списка.таблиц,.которые.
должны.будут.копировать.все.задачи..Метод.
taskConfigs()
.возвращает.список.
ассоциативных.массивов,.то.есть.настроек.для.каждой.из.запускаемых.задач..
Исполнители.отвечают.за.дальнейший.запуск.задач.и.передачу.каждой.из.них.

Kafka Connect 179
ее.индивидуальных.настроек,.на.основе.которых.она.должна.будет.скопировать.
уникальный.набор.таблиц.из.базы.данных..Отметим,.что.коннектор.при.запуске.
посредством.API.REST.может.быть.запущен.на.любом.узле,.а.значит,.и.запуска- емые.им.задачи.тоже.могут.выполняться.на.любом.из.узлов.
‰
‰
Задачи..Отвечают.за.получение.данных.из.Kafka.и.вставку.туда.данных..Испол- нители.инициализируют.все.задачи.путем.передачи.контекста..Контекст.ис- точника.включает.объект,.предназначенный.для.хранения.задачей.смещений.
записей.источника.(например,.в.файловом.коннекторе.смещения.представля- ют.собой.позиции.в.файле;.в.коннекторе.JDBC-источника.они.могут.быть.
значениями.первичного.ключа.таблицы)..Контекст.для.коннектора.приемни- ка.включает.методы,.с.помощью.которых.он.может.контролировать.получае- мые.из.Kafka.записи..Они.используются,.в.частности,.для.приостановки.об- ратного.потока.данных,.а.также.повторения.отправки.и.сохранения.смещений.
во.внешнем.хранилище.для.обеспечения.строго.однократной.доставки..После.
инициализации.задания.запускаются.с.объектом.
Properties
,.содержащим.на- стройки,.созданные.для.данной.задачи.коннектором..После.запуска.задачи.
источника.опрашивают.внешнюю.систему.и.возвращают.списки.записей,.от- правляемые.исполнителем.брокерам.Kafka..Задачи.приемника.получают.за- писи.из.Kafka.через.исполнитель.и.отвечают.за.отправку.этих.записей.во.внеш- нюю.систему.
Исполнители
Процессы-исполнители.Kafka.Connect.представляют.собой.процессы-контейнеры,.
выполняющие.коннекторы.и.задачи..Они.отвечают.за.обработку.HTTP-запросов.
с.описанием.коннекторов.и.их.настроек,.а.также.за.хранение.настроек.коннекто- ров,.запуск.коннекторов.и.их.задач,.включая.передачу.соответствующих.настроек..
В.случае.останова.или.аварийного.сбоя.процесса-исполнителя.кластер.узнает.об.
этом.из.контрольных.сигналов.протокола.потребителей.Kafka,.и.он.переназначает.
работающие.на.данном.исполнителе.коннекторы.и.задачи.оставшимся.исполни- телям..Другие.исполнители.тоже.заметят,.если.к.кластеру.Connect.присоединится.
новый.исполнитель,.и.назначат.ему.коннекторы.или.задачи,.чтобы.равномерно.
распределить.нагрузку.между.всеми.исполнителями.
Исполнители.отвечают.также.за.автоматическую.фиксацию.смещений.для.коннек- торов.как.источника,.так.и.приемника.и.за.выполнение.повторов.в.случае.генера- ции.задачами.исключений..Чтобы.разобраться.в.том,.что.такое.исполнители,.лучше.
всего.представить.себе,.что.коннекторы.и.задачи.отвечают.за.ту.часть.интеграции.
данных,.которая.относится.к.их.перемещению,.а.исполнители.отвечают.за.API.
REST,.управление.настройками,.надежность,.высокую.доступность,.масштабиро- вание.и.распределение.нагрузки.
Если.сравнивать.с.классическими.API.потребителей/производителей,.то.главное.
преимущество.API.Connect.состоит.именно.в.этом.разделении.обязанностей..

180 Глава 7 • Создание конвейеров данных
Опытные.разработчики.знают,.что.написание.кода.для.чтения.данных.из.Kafka.
и.вставки.их.в.базу.данных.занимает,.наверное,.день.или.два..Но.если.вдобавок.
нужно.отвечать.за.настройки,.ошибки,.API.REST,.мониторинг,.развертывание,.
повышающее.и.понижающее.вертикальное.масштабирование,.а.также.обработку.
сбоев,.то.реализация.всего.этого.может.занять.несколько.месяцев..При.воплощении.
в.жизнь.копирования.данных.с.помощью.коннекторов.последние.подключаются.
к.исполнителям,.которые.берут.на.себя.заботы.о.множестве.сложных.эксплуата- ционных.вопросов,.так.что.вам.не.придется.об.этом.беспокоиться.
Преобразователи форматов и модель данных Connect
Последний.фрагмент.пазла.API.Connect.—.модель.данных.коннектора.и.преоб- разователи.форматов..API.Kafka.Connect.включают.API.данных,.который.в.свою.
очередь.включает.как.объекты.данных,.так.и.описывающие.эти.данные.схемы..
Например,.JDBC-источник.читает.столбец.из.базы.данных.и.формирует.объект.
Connect.
Schema
.на.основе.типов.данных.столбцов,.которые.вернула.база.данных..
Для.каждого.столбца.сохраняются.имя.столбца.и.значение..Все.коннекторы.ис- точников.выполняют.схожие.функции.—.читают.события.из.системы.источника.
и.генерируют.пары.
Schema
/
Value
..У.коннекторов.приемников.функции.противо- положные.—.они.получают.пары.
Schema
/
Value
.и.используют.объекты.
Schema
.для.
разбора.значений.и.вставки.их.в.целевую.систему.
Хотя.коннекторы.источников.знают,.как.генерировать.объекты.на.основе.API.
данных,.остается.актуальным.вопрос.о.сохранении.этих.объектов.в.Kafka.исполни- телями.Connect..Именно.тут.вносят.свой.вклад.преобразователи.форматов..Поль- зователи,.настраивая.исполнитель.(или.коннектор),.выбирают.преобразователь.
формата,.который.будет.применяться.для.сохранения.данных.в.Kafka..В.настоящий.
момент.в.этом.качестве.можно.использовать.Avro,.JSON.и.строки..Преобразователь.
JSON.можно.настроить.так,.чтобы.он.включал.или.не.включал.схему.в.итоговую.
запись,.таким.образом.можно.обеспечить.поддержку.как.структурированных,.так.
и.полуструктурированных.данных..Получив.от.коннектора.запись.API.данных,.
исполнитель.с.помощью.уже.настроенного.преобразователя.формата.преобразует.
запись.в.объект.Avro,.JSON.или.строковое.значение,.после.чего.сохраняет.результат.
в.Kafka.
Противоположное.происходит.с.коннекторами.приемников..Исполнитель.Connect,.
прочитав.запись.из.Kafka,.с.помощью.уже.настроенного.преобразователя.преоб- разует.запись.из.формата.Kafka.(Avro,.JSON.или.строковое.значение).в.запись.API.
данных.Connect,.после.чего.передает.ее.коннектору.приемника,.вставляющему.ее.
в.целевую.систему..Благодаря.этому.API.Connect.может.поддерживать.различ- ные.типы.хранимых.в.Kafka.данных.вне.зависимости.от.реализации.коннекторов.
(то.есть.можно.использовать.любой.коннектор.для.любого.типа.записей,.был.бы.
только.доступен.преобразователь.формата).

Альтернативы Kafka Connect 181
Управление смещениями
Управление.смещениями.—.один.из.удобных.сервисов,.предоставляемых.испол- нителями.коннекторам.(помимо.развертывания.и.управления.настройками.через.
API.REST)..Суть.его.в.том,.что.коннекторам.необходимо.знать,.какие.данные.они.
уже.обработали,.и.они.могут.воспользоваться.предоставляемыми.Kafka.API.для.
хранения.информации.о.том,.какие.события.уже.обработаны.
Для.коннекторов.источников.это.значит,.что.записи,.возвращаемые.коннектором.
исполнителям.Connect,.включают.информацию.о.логическом.разделе.и.логиче- ском.смещении..Это.не.разделы.и.смещения.Kafka,.а.разделы.и.смещения.в.том.
виде,.в.каком.они.нужны.в.системе.источника..Например,.в.файловом.источнике.
раздел.может.быть.файлом,.а.смещение.—.номером.строки.или.символа.в.этом.
файле..В.JDBC-источнике.раздел.может.быть.таблицей.базы.данных,.а.смеще- ние.—.идентификатором.записи.в.таблице..При.написании.коннектора.источника.
нужно.принять.одно.из.важнейших.проектных.решений:.как.секционировать.дан- ные.в.системе.источника.и.как.отслеживать.смещения..Оно.влияет.на.возможный.
уровень.параллелизма.коннектора,.а.также.вероятность.обеспечения.по.крайней.
мере.однократной.или.строго.однократной.доставки.
После.возвращения.коннектором.источника.списка.записей,.включающего.раз- делы.и.смещения.в.источнике.для.всех.записей,.исполнитель.отправляет.записи.
брокерам.Kafka..Если.брокеры.сообщили,.что.получили.записи,.исполнитель.
сохраняет.смещения.отправленных.в.Kafka.записей..Механизм.хранения.явля- ется.подключаемым,.обычно.это.тема.Kafka..Благодаря.этому.коннекторы.могут.
начинать.обработку.событий.с.последнего.сохраненного.после.перезапуска.или.
аварийного.сбоя.смещения.
Последовательность.выполняемых.коннекторами.приемников.действий.аналогич- на.с.точностью.до.наоборот:.они.читают.записи.Kafka,.в.которых.уже.есть.иденти- фикаторы.темы,.раздела.и.смещения..Затем.вызывают.метод.
put()
.коннектора.для.
сохранения.этих.записей.в.целевой.системе..В.случае.успешного.выполнения.этих.
действий.они.фиксируют.переданные.коннектору.смещения.в.Kafka.с.помощью.
обычных.методов.фиксации.потребителей.
Отслеживание.смещений.самим.фреймворком.должно.облегчить.разработчикам.
задачу.написания.коннекторов.и.до.определенной.степени.гарантировать.согласо- ванное.поведение.при.использовании.различных.коннекторов.
Альтернативы Kafka Connect
Мы.подробно.рассмотрели.API.Kafka.Connect..Хотя.нам.очень.понравились.
их.удобство.и.надежность,.эти.API.—.не.единственный.метод.передачи.данных.
в.Kafka.и.из.нее..Посмотрим,.какие.еще.варианты.существуют.и.как.их.обычно.
применяют.

1   ...   18   19   20   21   22   23   24   25   ...   39


написать администратору сайта