Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
Пакетные переназначения Переназначения разделов серьезно влияют на производительность кластера, поскольку вызывают изменения согласованности страничного кэша памяти и ис- пользуют сеть и дисковые операции ввода/вывода. Будет отличной идеей ми- нимизировать эти эффекты за счет разбиения переназначений на множество мелких шагов. Изменение коэффициента репликации Утилита.переназначения.разделов.предоставляет.недокументированную.возмож- ность,.позволяющую.увеличивать/уменьшать.коэффициент.репликации.раздела.. Это.может.пригодиться.при.создании.раздела.с.неправильным.коэффициентом. репликации.(например,.если.на.момент.создания.темы.не.было.доступно.доста- точного.числа.брокеров)..Сделать.это.можно.посредством.создания.JSON-объекта. Управление разделами 231 в.том.же.формате,.который.применялся.на.этапе.переназначения.разделов.для.до- бавления/удаления.реплик.с.целью.задания.нужного.коэффициента.репликации.. Кластер.завершит.операцию.переназначения,.сохранив.новое.значение.коэффи- циента. Например,.рассмотрим.текущее.распределение.реплик.для.темы.my-topic.с.одним. разделом.и.коэффициентом.репликации,.равным.1: { "partitions": [ { "topic": "my-topic", "partition": 0, "replicas": [ 1 ] } ], "version": 1 } Если.передать.следующий.JSON-объект.на.этапе.выполнения.переназначений.раз- делов,.то.коэффициент.репликации.будет.увеличен.до.2: { "partitions": [ { "partition": 0, "replicas": [ 1, 2 ], "topic": "my-topic" } ], "version": 1 } Аналогично.уменьшить.коэффициент.репликации.раздела.можно,.передав.JSON- объект.с.сокращенным.списком.реплик. Сброс на диск сегментов журнала В.случае,.когда.необходимо.найти.сообщение.с.конкретным.содержимым,.напри- мер,.«отравленную.таблетку».—.сообщение,.которое.потребитель.не.может.об- работать,.можно.воспользоваться.вспомогательной.утилитой.для.декодирования. сегментов.журнала.раздела..Она.позволяет.просматривать.отдельные.сообщения,. не.потребляя.и.не.декодируя.их..В.качестве.аргумента.эта.утилита.принимает.раз- деленный.запятыми.список.файлов.сегментов.журнала.и.может.выводить.сводную. или.развернутую.информацию.по.сообщению. 232 Глава 9 • Администрирование Kafka Например,.декодируем.файл. 00000000000052368601.log .сегмента.журнала.и.вы- ведем.сводную.информацию.по.сообщениям: # kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000052368601.log Dumping 00000000000052368601.log Starting offset: 52368601 offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc: 1194341321 offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc: 278946641 offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc: 3767466431 offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc: 2444301359 Или.декодируем.файл. 00000000000052368601.log .сегмента.журнала.с.выводом.раз- вернутой.информации.по.сообщениям: # kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000052368601.log --print-data-log offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc: 1194341321 payload: test message 1 offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc: 278946641 payload: test message 2 offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc: 3767466431 payload: test message 3 offset: 52368606 position: 2299 NoTimestampType: -1 isvalid: true payloadsize: 932 magic: 0 compresscodec: GZIPCompressionCodec crc: 2444301359 payload: test message 4 Можно.также.использовать.эту.утилиту.для.проверки.файлов.индексов,.сопутству- ющих.сегментам.журналов..Индексы.применяются.для.поиска.сообщений.в.сег- менте.журнала,.их.порча.может.вызвать.ошибки.при.потреблении..Проверка.всегда. выполняется,.если.брокер.запускается.в.«нечистом».состоянии.(то.есть.не.был. остановлен.штатным.образом),.но.ее.можно.запустить.и.вручную..Существует.два. параметра.для.проверки.индексов,.используемых.в.зависимости.от.желаемой.тща- тельности.процесса..Параметр. --index-sanity-check .проверяет.только.пригодность. индекса.к.использованию,.а. --verify-index-only .—.наличие.неточностей.в.индексе. без.вывода.всех.его.записей. Управление разделами 233 Например,.проверим.файл.индекса.для.файла. 00000000000052368601.log .сегмента. журнала.на.отсутствие.повреждений: # kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000052368601.index,00000000000052368601.log --index-sanity-check Dumping 00000000000052368601.index 00000000000052368601.index passed sanity check. Dumping 00000000000052368601.log Starting offset: 52368601 offset: 52368601 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 661 magic: 0 compresscodec: GZIPCompressionCodec crc: 1194341321 offset: 52368603 position: 687 NoTimestampType: -1 isvalid: true payloadsize: 895 magic: 0 compresscodec: GZIPCompressionCodec crc: 278946641 offset: 52368604 position: 1608 NoTimestampType: -1 isvalid: true payloadsize: 665 magic: 0 compresscodec: GZIPCompressionCodec crc: 3767466431 Проверка реплик Репликация.разделов.функционирует.аналогично.обычному.клиенту.—.потребите- лю.Kafka:.ведомый.брокер.начинает.репликацию.с.самого.старого.смещения.и.пе- риодически.записывает.в.контрольные.точки.данные.о.текущем.смещении.на.диск.. При.останове.и.перезапуске.репликация.возобновляется.с.последней.контрольной. точки..Ранее.реплицированные.сегменты.журналов.могут.удаляться.с.брокера,. в.этом.случае.ведомый.брокер.не.будет.заполнять.промежутки. Чтобы.проверить.согласованность.всех.реплик.разделов.темы.в.кластере,.можно. воспользоваться.утилитой. kafka-replica-verification.sh ..Она.извлекает.сообще- ния.из.всех.реплик.заданного.набора.разделов.темы.и.проверяет.наличие.всех.со- общений.во.всех.репликах..Утилите.необходимо.передать.регулярное.выражение,. соответствующее.всем.темам,.которые.нужно.проверить..Если.его.не.указать,.то. будут.проверяться.все.темы..Необходимо.также.указать.явным.образом.список. брокеров.для.подключения. Осторожно: влияет на производительность кластера Утилита проверки реплик влияет на производительность кластера так же, как и перераспределение разделов, поскольку для проверки реплик читает все сме- щения, начиная с самого старого. Кроме того, она читает данные из всех реплик раздела параллельно, так что будьте осторожны при ее использовании. 234 Глава 9 • Администрирование Kafka Например,.проверим.реплики.для.тем,.название.которых.начинается.с.«my-»,.на. брокерах.1.и.2: # kafka-replica-verification.sh --broker-list kafka1.example.com:9092,kafka2.example.com:9092 --topic-white-list 'my-.*' 2016-11-23 18:42:08,838: verification process is started. 2016-11-23 18:42:38,789: max lag is 0 for partition [my-topic,7] at offset 53827844 among 10 partitions 2016-11-23 18:43:08,790: max lag is 0 for partition [my-topic,7] at offset 53827878 among 10 partitions Потребление и генерация В.ходе.работы.с.Apache.Kafka.вам.часто.придется.вручную.потреблять.сообщения. или.генерировать.образцы.сообщений.для.проверки.работы.ваших.приложений.. Для.этого.существуют.две.вспомогательные.утилиты:. kafka-console-consumer.sh и. kafka-console-producer.sh ..Они.представляют.собой.адаптеры.для.клиентских. библиотек.Java,.которые.обеспечивают.возможность.взаимодействия.с.темами. Kafka,.так.что.писать.для.этой.цели.отдельное.приложение.не.нужно. Конвейерная передача вывода в отдельное приложение Хотя существует возможность написания приложений-адаптеров для консольного производителя и потребителя (например, для чтения сообщений и конвейерной передачи их другому приложению на обработку), эти приложения часто ненадежны, так что лучше их избегать. Очень непросто наладить взаимодействие с консольным потребителем так, чтобы при этом сообщения не терялись. Аналогично, доступны не все возможности консольного производителя, так что отправлять данные долж- ным образом непросто. Лучше воспользоваться или непосредственно клиентскими библиотеками Java, или сторонними клиентскими библиотеками для других языков программирования, которые применяют протокол Kafka напрямую. Консольный потребитель Утилита. kafka-console-consumer.sh .позволяет.потреблять.сообщения.из.одной. или.нескольких.тем.кластера.Kafka..Сообщения.выводятся.в.стандартный.поток. вывода.и.разделяются.символом.новой.строки..По.умолчанию.выводятся.неформа- тированные.сообщения.(с.помощью. DefaultFormatter )..Обязательные.параметры. описаны.далее. Проверяйте версию утилиты Очень важно использовать потребитель той же версии, что и кластер Kafka. Кон- сольные потребители более старых версий могут повредить кластер из-за того, что взаимодействуют с ZooKeeper недопустимым образом. Потребление и генерация 235 Первый.из.обязательных.параметров.указывает,.применять.ли.потребитель.новой. версии,.включающий.ссылку.на.сам.кластер.Kafka..При.использовании.потребите- ля.старой.версии.для.этого.требуется.только.один.аргумент.—.параметр. --zookeeper с.последующей.строкой.подключения.для.кластера..В.приведенных.ранее.примерах. он.может.выглядеть.вот.так:. --zookeeper zoo1.example.com:2181/kafka-cluster В.случае.нового.потребителя.необходимо.задать.как.флаг. --new-consumer ,. так.и.параметр. --bootstrap-server .с.разделенным.запятыми.списком.брокеров. после.него,.например:. --bootstrap-server kafka1.examp le.com:9092,kafka2.examp- le.com:9092 Далее.необходимо.указать.потребляемые.темы..Для.этого.существует.три.параме- тра:. --topic ,. --whitelist .и. --blacklist ,.из.которых.можно.указать.только.один.. Параметр. --topic .задает.для.потребления.одну.конкретную.тему..После.каждого. из.параметров. --whitelist .и. --blacklist .должно.следовать.регулярное.выражение. (не.забудьте.экранировать.его.должным.образом,.чтобы.командная.строка.оболочки. не.изменила.его)..При.использовании.параметра. --whitelist .будут.потребляться. все.темы,.соответствующие.регулярному.выражению,.а.в.случае. --blacklist .—.все. темы,.кроме.соответствующих.регулярному.выражению. Например,.для.потребления.отдельной.темы.my-topic.с.помощью.потребителя. старой.версии.нужно.сделать.следующее: # kafka-console-consumer.sh --zookeeper zoo1.example.com:2181/kafka-cluster --topic my-topic sample message 1 sample message 2 ^CProcessed a total of 2 messages # Помимо.простейших.команд.командной.строки.можно.передавать.консольному. потребителю.и.все.обычные.параметры.конфигурации..Сделать.это.можно.двумя. способами.в.зависимости.от.количества.передаваемых.параметров..Первый.способ:. через.файл.конфигурации.потребителя,.задав.ключ. --consumer.config CONFIGFILE ,. где. CONFIGFILE .—.полный.путь.к.файлу.с.параметрами.конфигурации..Другой.спо- соб:.указать.параметры.в.командной.строке.с.одним.или.большим.количеством. аргументов.в.виде. --consumer-property KEY=VALUE ,.где. KEY .—.название.параметра,. а. VALUE .—.его.задаваемое.значение..Такой.способ.удобен.при.задании.таких.пара- метров.конфигурации,.как.идентификатор.группы.потребителей. Параметры командной строки, которые легко перепутать Как для консольного потребителя, так и для консольного производителя су- ществует параметр командной строки --property, который не следует путать с параметрами --consumer-property и --producer-property. Параметр --property ис- пользуется для передачи конфигурации только подпрограмме форматирования сообщений, а не самому клиенту. 236 Глава 9 • Администрирование Kafka Существует.несколько.часто.используемых.параметров.консольного.потребителя,. которые.не.мешает.знать: --formatter CLASSNAME .—.задает.используемый.при.декодировании.сообщений. класс.подпрограммы.форматирования.сообщений..По.умолчанию. kafka.to- ols.De faultMessageFormatter ; --from-beginning .—.потреблять.сообщения.из.заданной.(-ых).темы.(тем),.на- чиная.с.первого.смещения..В.противном.случае.потребление.начинается.с.по- следнего.смещения; --max-messages NUM .—.читать.не.более. NUM .сообщений,.прежде.чем.завершить. работу; --partition NUM .—.читать.только.из.раздела.с.идентификатором. NUM .(для.этого. требуется.новый.потребитель). Параметры подпрограммы форматирования сообщений Помимо.используемой.по.умолчанию.существуют.три.подпрограммы.форматиро- вания.сообщений: kafka.tools.LoggingMessageFormatter .—.выводит.сообщения.посредством. механизма.журналирования.вместо.стандартного.потока.вывода..Сообщения. выводятся.на.уровне.INFO.и.включают.метку.даты/времени,.ключ.и.значение; kafka.tools.ChecksumMessageFormatter .—.выводит.только.контрольные.суммы. сообщений; kafka.tools.NoOpMessageFormatter .—.потребляет.сообщения,.но.не.выводит.их. вообще. У. kafka.tools.DefaultMessageFormatter .есть.также.несколько.удобных.параметров,. которые.можно.задавать.с.помощью.параметра.командной.строки. --property : print.timestamp .—.установите.в. true .для.отображения.метки.даты/времени. каждого.из.сообщений.(при.наличии); print.key .—.установите.в. true .для.отображения.наряду.со.значением.ключа. сообщения; key.separator .—.задает.символ-разделитель,.выводимый.между.ключом.и.зна- чением.сообщения; line.separator .—.задает.символ-разделитель,.выводимый.между.сообщениями; key.deserializer .—.позволяет.задать.имя.класса,.используемого.для.десериа- лизации.ключа.сообщения.перед.выводом; value.deserializer .—.позволяет.задать.имя.класса,.используемого.для.десери- ализации.значения.сообщения.перед.выводом. Потребление и генерация 237 Классы.десериализации.должны.реализовывать.интерфейс. org.apache.kafka.com- mon.serialization.Deserializer ..Для.отображения.вывода.консольный.потреби- тель.вызывает.их.метод. toString ..Обычно.эти.десериализаторы.реализуют.в.виде. Java-класса,.размещаемого.на.пути.классов.для.консольного.потребителя.посред- ством.задания.значения.переменной.среды. CLASSPATH ,.перед.выполнением.сценария. kafka_console_consumer.sh Чтение тем смещений Иногда.бывает.удобно.посмотреть,.какие.смещения.для.групп.потребителей.кластера. были.зафиксированы..Возможно,.вам.захочется.узнать,.фиксировала.ли.смещения. конкретная.группа.потребителей.или.насколько.часто.это.происходило..Сделать.это. можно.посредством.чтения.специальной.внутренней.темы. __consumer_offsets .через. консольный.потребитель..Смещения.для.всех.потребителей.записываются.в.эту.тему. в.виде.сообщений..Для.декодирования.этих.сообщений.нужно.воспользоваться. классом. kafka.coordinator.GroupMetadataMana ger$OffsetsMessageFormatter 1 Например,.прочитаем.отдельное.сообщение.из.темы.смещений: # kafka-console-consumer.sh --zookeeper zoo1.example.com:2181/kafka-cluster --topic __consumer_offsets --formatter 'kafka.coordinator.GroupMetadataManager$OffsetsMessage Formatter' --max-messages 1 [my-group-name,my-topic,0]::[OffsetMetadata[481690879,NO_METADATA] ,CommitTime 1479708539051,ExpirationTime 1480313339051] Processed a total of 1 messages # Консольный производитель Аналогично.консольному.потребителю.утилита. kafka-console-producer.sh .может. использоваться.для.записи.сообщений.в.тему.Kafka.вашего.кластера..По.умолчанию. сообщения.читаются.по.одному.в.строке,.в.качестве.разделителя.ключа.и.значения. применяется.символ.табуляции.(если.он.отсутствует,.считается,.что.ключ.пустой). Изменение способа чтения строк Можно написать собственный класс построчного чтения для нестандартных операций. Этот класс, отвечающий за создание объекта ProducerRecord, должен расширять интерфейс kafka.common.MessageReader. Указать свой класс можно через командную строку с помощью параметра --line-reader. Убедитесь, что со- держащий ваш класс JAR-файл включен в путь классов. 1. Начиная.с.версии.0.11,.этот.класс.называется.kafka.coordinator.group.GroupMetadataMa- nager\$OffsetsMessageFormatter..—.Примеч. пер. 238 Глава 9 • Администрирование Kafka Консольный.производитель.требует.задания.как.минимум.двух.обязательных. аргументов..Один.из.них.—.параметр. --broker-list ,.в.котором.указывается.один. или.несколько.брокеров.в.виде.разделенного.запятыми.списка.элементов.вида. hostname:port ..Второй.обязательный.параметр.—. --topic ,.определяющий.тему,. для.которой.генерируются.сообщения..По.окончании.генерации.отправьте.символ. конца.файла.(EOF).для.закрытия.клиента. Например,.вот.так.можно.генерировать.два.сообщения.для.темы.my-topic: # kafka-console-producer.sh --broker-list kafka1.example.com:9092,kafka2.example.com:9092 --topic my-topic sample message 1 sample message 2 ^D # Как.и.в.случае.консольного.потребителя,.консольному.производителю.можно.пере- давать.любые.обычные.параметры.настройки..Сделать.это.можно.двумя.способами. в.зависимости.от.количества.передаваемых.параметров..Первый.способ:.через.файл. конфигурации.производителя.путем.задания.ключа. --producer.con fig CONFIGFILE ,. где. CONFIGFILE .—.полный.путь.к.файлу.с.параметрами.конфигурации..Другой.спо- соб:.указать.параметры.в.командной.строке.с.одним.или.несколькими.аргументами. в.виде. --producer-property KEY=VALUE ,.где. KEY .—.название.параметра,.а. VALUE .—.его. задаваемое.значение..Это.способ.удобен.при.задании.таких.параметров.произво- дителей,.как.настройки.пакетной.передачи.сообщений.(например,. linger.ms .или. batch.size ). У.консольного.производителя.имеется.множество.аргументов.командной.строки,. предназначенных.для.тонкой.настройки.его.поведения..Среди.наиболее.удобных: --key-serializer CLASSNAME .—.задает.используемый.при.сериализации.клю- чей.сообщений.класс.подпрограммы.кодирования..По.умолчанию. kafka.se- rializer.DefaultEncoder ; --value-serializer CLASSNAME .—.задает.используемый.при.сериализации.зна- чений.сообщений.класс.подпрограммы.кодирования..По.умолчанию. kafka.se- rializer.DefaultEncoder ; --compression-codec STRING .—.задает.используемый.при.генерации.сообще- ний.тип.сжатия..Возможные.значения.—. none ,. gzip ,. snappy .и. lz4 ..Значение.по. умолчанию. gzip ; --sync .—.генерирует.сообщения.синхронно,.ожидая.подтверждения.получения. каждого.из.них.перед.отправкой.следующего. |