Главная страница

Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


Скачать 7.59 Mb.
НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Дата21.06.2022
Размер7.59 Mb.
Формат файлаpdf
Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
ТипДокументы
#609074
страница9 из 39
1   ...   5   6   7   8   9   10   11   12   ...   39
67
Хотя.API.производителей.очень.простой,.внутри.производителя,.«под.капотом»,.
при.отправке.данных.происходит.немного.больше.действий..Основные.этапы.от- правки.данных.в.Kafka.демонстрирует.рис..3.1.
Рис. 3.1. Высокоуровневый обзор компонентов производителей Kafka
Для.генерации.сообщений.для.Kafka.нам.понадобится.сначала.создать.объект.
ProducerRecord
,.включающий.тему,.в.которую.мы.собираемся.отправить.запись,.
и.значение..При.желании.можно.задать.также.ключ.и.раздел..После.отправки.
объекта.
ProducerRecord
.он.прежде.всего.сериализует.объекты.ключа.и.значения.
в.байтовые.массивы.для.отправки.по.сети.

68 Глава 3 • Производители Kafka: запись сообщений в Kafka
Далее.данные.попадают.в.объект.
Partitioner
..Если.в.
ProducerRecord
.был.указан.
раздел,.объект.
Partitioner
.ничего.не.делает.и.просто.возвращает.указанный.
раздел..Если.же.нет,.он.выбирает.раздел,.обычно.в.соответствии.с.ключом.из.
ProducerRecord
..Если.раздел.выбран,.производитель.будет.знать,.в.какую.тему.
и.раздел.должна.попасть.запись..После.этого.он.помещает.эту.запись.в.пакет.запи- сей,.предназначенных.для.отправки.в.соответствующие.тему.и.раздел..За.отправку.
этих.пакетов.записей.соответствующему.брокеру.Kafka.отвечает.отдельный.поток.
выполнения.
После.получения.сообщений.брокер.отправляет.ответ..В.случае.успешной.записи.
сообщений.в.Kafka.будет.возвращен.объект.
RecordMetadata
,.содержащий.тему,.раз- дел.и.смещение.записи.в.разделе..Если.брокеру.не.удалось.записать.сообщения,.он.
вернет.сообщение.об.ошибке..При.получении.сообщения.об.ошибке.производитель.
может.попробовать.отправить.сообщение.еще.несколько.раз,.прежде.чем.оставит.
эти.попытки.и.вернет.ошибку.
Создание производителя Kafka
Первый.шаг.записи.сообщений.в.Kafka.—.создание.объекта.производителя.со.свой- ствами,.которые.вы.хотели.бы.передать.производителю..У.производителей.Kafka.
есть.три.обязательных.свойства:
‰
‰ bootstrap.servers
.—.список.пар.
host:port
.брокеров,.используемых.произ- водителем.для.первоначального.соединения.с.кластером.Kafka..Он.не.обязан.
включать.все.брокеры,.поскольку.производитель.может.получить.дополнитель- ную.информацию.после.начального.соединения..Но.рекомендуется.включить.
в.него.хотя.бы.два.брокера,.чтобы.производитель.мог.подключиться.к.кластеру.
при.сбое.одного.из.них;
‰
‰ key.serializer
.—.имя.класса,.применяемого.для.сериализации.ключей.запи- сей,.генерируемых.для.отправки.в.Kafka..Брокеры.Kafka.ожидают,.что.в.каче- стве.ключей.и.значений.сообщений.используются.байтовые.массивы..Однако.
интерфейс.производителя.позволяет.отправлять.в.качестве.ключа.и.значе- ния.(посредством.использования.параметризованных.типов).любой.объект..
Это.сильно.повышает.удобочитаемость.кода,.но.производителю.при.этом.
нужно.знать,.как.преобразовать.эти.объекты.в.байтовые.массивы..Значением.
свойства.
key.serializer
.должно.быть.имя.класса,.реализующего.интерфейс org.apache.kafka.common.serialization.Serializer
..С.помощью.этого.класса.
производитель.сериализует.объект.ключа.в.байтовый.массив..Пакет.программ.
клиента.Kafka.включает.классы.
ByteArraySerializer
.(почти.ничего.не.делает),.
StringSerializer
.и.
IntegerSerializer
,.так.что.при.использовании.обычных.
типов.данных.нет.необходимости.реализовывать.свои.сериализаторы..Задать.
значение.свойства.
key.serializer
.необходимо,.даже.если.вы.планируете.от- правлять.только.значения;

Создание производителя Kafka 69
‰
‰ value.serializer
.—.имя.класса,.используемого.для.сериализации.значений.
записей,.генерируемых.для.отправки.в.Kafka..Аналогично.тому,.как.значение.
свойства.
key.serializer
.соответствует.классу,.с.помощью.которого.произ- водитель.сериализует.объект.ключа.сообщения.в.байтовый.массив,.значение.
свойства.
value.serializer
.должно.быть.равно.имени.класса,.служащего.для.
сериализации.объекта.значения.сообщения.
Следующий.фрагмент.кода.демонстрирует.создание.нового.производителя.с.по- мощью.задания.лишь.обязательных.параметров.и.использования.значений.по.умол- чанию.для.всего.остального:
private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer(kafkaProps);
.Начинаем.с.объекта.
Properties
.Поскольку.мы.планируем.использовать.строки.для.ключа.и.значения.сообще- ния,.воспользуемся.встроенным.типом.
StringSerializer
.Создаем.новый.производитель,.задавая.подходящие.типы.ключа.и.значения.
и.передавая.в.конструктор.объект.
Properties
При.таком.простом.интерфейсе.понятно,.что.управляют.поведением.производителя.
в.основном.заданием.соответствующих.параметров.конфигурации..В.документации.
Apache.Kafka.описаны.все.параметры.конфигурации..Кроме.того,.мы.рассмотрим.
самые.важные.из.них.далее.в.этой.главе.
После.создания.экземпляра.производителя.можно.приступать.к.отправке.сообще- ний..Существует.три.основных.метода.отправки.сообщений.
‰
‰
Сделать и забыть. Отправляем.сообщение.на.сервер,.после.чего.особо.не.вол- нуемся,.дошло.оно.или.нет..Обычно.оно.доходит.успешно,.поскольку.Kafka.
отличается.высокой.доступностью,.а.производитель.повторяет.отправку.со- общений.автоматически..Однако.часть.сообщений.при.использовании.этого.
метода.теряется.
‰
‰
Синхронная отправка. При.отправке.сообщения.метод.
send()
.возвращает.объ- ект-фьючерс.(объект.класса.Future),.после.чего.можно.воспользоваться.мето- дом.
get()
.для.организации.ожидания.и.выяснения.того,.успешно.ли.прошла.
отправка.
‰
‰
Асинхронная отправка. Методу.
send()
.передается.функция.обратного.вызова,.
которая.вызывается.при.получении.ответа.от.брокера.Kafka.

70 Глава 3 • Производители Kafka: запись сообщений в Kafka
В.примерах.в.дальнейшем.мы.увидим,.как.отправлять.сообщения.с.помощью.
данных.методов.и.как.обрабатывать.различные.типы.возникающих.при.этом.оши- бок.
Хотя.все.примеры.в.этой.главе.однопоточны,.объект.производителя.может.ис- пользоваться.для.отправки.сообщений.несколькими.потоками..Лучше.всего.начать.
с.одного.производителя.и.одного.потока..Чтобы.повысить.пропускную.способ- ность,.можно.будет.добавить.дополнительные.потоки,.использующие.тот.же.про- изводитель..А.когда.этот.метод.перестанет.приносить.плоды.—.добавить.в.прило- жение.дополнительные.производители.для.еще.большего.повышения.пропускной.
способности.
Отправка сообщения в Kafka
Вот.простейший.способ.отправки.сообщения:
ProducerRecord record =
new ProducerRecord<>("CustomerCountry", "Precision Products",
"France"); try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
.Производитель.получает.на.входе.объекты.
ProducerRecord
,.так.что.начнем.с.соз- дания.такого.объекта..У.класса.
ProducerRecord
.есть.несколько.конструкторов,.
которые.обсудим.позднее..В.данном.случае.мы.имеем.дело.с.конструктором,.
принимающим.на.входе.строковое.значение.—.название.темы,.в.которую.
отправляются.данные,.и.отсылаемые.в.Kafka.ключ.и.значение.(тоже.стро- ки)..Типы.ключа.и.значения.должны.соответствовать.объектам.
serializer и.
producer
.Для.отправки.объекта.типа.
ProducerRecord
.используем.метод.
send()
.объекта.
producer
..Как.мы.уже.видели.в.схеме.архитектуры.производителя.(см..рис..3.1),.
сообщение.помещается.в.буфер.и.отправляется.брокеру.в.отдельном.потоке..
Метод.
send()
.возвращает.объект.класса.Future.языка.Java.(
http://www.bit.ly/2rG7Cg6
),.
включающий.
RecordMetadata
,.но.поскольку.мы.просто.игнорируем.возвращае- мое.значение,.то.никак.не.можем.узнать,.успешно.ли.было.отправлено.сообще- ние..Такой.способ.отправки.сообщений.можно.использовать,.только.если.по- терять.сообщение.вполне.допустимо.
.Хотя.ошибки,.возможные.при.отправке.сообщений.брокерам.Kafka.или.воз- никающие.в.самих.брокерах,.игнорируются,.при.появлении.в.производителе.
ошибки.перед.отправкой.сообщения.в.Kafka.вполне.может.быть.сгенерировано.
исключение..Это.может.быть.исключение.
SerializationException
.при.неудач-

Отправка сообщения в Kafka 71
ной.сериализации.сообщения,.
BufferExhaustedException
.или.
TimeoutException при.переполнении.буфера,.
InterruptException
.при.сбое.отправляющего.по- тока.
Синхронная отправка сообщения
Вот.простейший.способ.синхронной.отправки.сообщения:
ProducerRecord record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
.Используем.метод.
Future.get()
.для.ожидания.ответа.от.Kafka..Этот.метод.
генерирует.исключение.в.случае.неудачи.отправки.записи.в.Kafka..При.отсут- ствии.ошибок.мы.получим.объект.
RecordMetadata
,.из.которого.можно.узнать.
смещение,.соответствующее.записанному.сообщению.
.Если.перед.отправкой.данных.в.Kafka.или.во.время.нее.возникли.ошибки,.если.
брокер.Kafka.вернул.сообщение.об.ошибке,.исключающие.повтор.отправки.
или.лимит.на.повторы.исчерпан,.нас.будет.ожидать.исключение..В.этом.случае.
просто.выводим.информацию.о.нем.
В.классе.
KafkaProducer
.существует.два.типа.ошибок..Первый.тип.—.ошибки,.ко- торые.можно.исправить,.отправив.сообщение.повторно.(retriable)..Например,.так.
можно.исправить.ошибку.соединения,.поскольку.через.некоторое.время.оно.спо- собно.восстановиться..Ошибка.«отсутствует.ведущий.узел».может.быть.исправлена.
выбором.нового.ведущего.узла.для.раздела..Можно.настроить.
KafkaProducer
.так,.
чтобы.при.таких.ошибках.отправка.повторялась.автоматически..Код.приложения.
будет.получать.исключения.подобного.типа.только.тогда,.когда.лимит.на.повторы.
уже.исчерпан,.а.ошибка.все.еще.не.исправлена..Некоторые.ошибки.невозможно.
исправить.повторной.отправкой.сообщения..Такова,.например,.ошибка.«сообщение.
слишком.велико»..В.подобных.случаях.
KafkaProducer
.не.станет.пытаться.повторить.
отправку,.а.вернет.исключение.сразу.же.
Асинхронная отправка сообщения
Пусть.время.прохождения.сообщения.между.нашим.приложением.и.кластером.
Kafka.и.обратно.равно.10.мс..Если.мы.будем.ждать.ответа.после.отправки.каждого.
сообщения,.отправка.100.сообщений.займет.около.1.секунды..В.то.же.время,.если.
не.ожидать.ответов,.отправка.всех.сообщений.не.займет.практически.никакого.
времени..В.большинстве.случаев.ответ.и.не.требуется.—.Kafka.возвращает.тему,.

72 Глава 3 • Производители Kafka: запись сообщений в Kafka раздел.и.смещение.записи,.которые.обычно.не.нужны.отправляющему.приложе- нию..Однако.нам.нужно.знать,.удалась.ли.вообще.отправка.сообщения,.чтобы.
можно.было.сгенерировать.исключение,.зафиксировать.в.журнале.ошибку.или,.
возможно,.записать.сообщение.в.файл.ошибок.для.дальнейшего.анализа.
Для.асинхронной.отправки.сообщений.с.сохранением.возможности.обработки.
различных.сценариев.ошибок.производители.поддерживают.добавление.функции.
обратного.вызова.при.отправке.записи..Вот.пример.использования.функции.об- ратного.вызова:
private class DemoProducerCallback implements Callback {
@Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); producer.send(record, new DemoProducerCallback());
.Для.использования.функций.обратного.вызова.нам.понадобится.класс,.реали- зующий.интерфейс.
org.apache.kafka.clients.producer.Callback
,.включающий.
одну-единственную.функцию.
onCompletion()
.Если.Kafka.вернет.ошибку,.в.функцию.
onCompletion()
.попадет.непустое.ис- ключение..В.приведенном.коде.вся.его.обработка.заключается.в.выводе.ин- формации,.но.в.коде.для.промышленной.эксплуатации.функции.обработки.
исключений,.вероятно,.будут.более.ошибкоустойчивыми.
.Записи.остаются.такими.же,.как.и.раньше.
.И.мы.передаем.объект.
Callback
.при.отправке.записи.
Настройка производителей
До.сих.пор.мы.практически.не.сталкивались.с.конфигурационными.параметрами.
производителей.—.только.с.обязательным.URI.
bootstrap.servers
.и.сериализато- рами.
У.производителя.есть.множество.параметров.конфигурации,.большинство.из.ко- торых.описаны.в.документации.Apache.Kafka.(
http://kafka.apache.org/documentation.
html#producerconfigs
)..Значения.по.умолчанию.многих.из.них.вполне.разумны,.так.
что.нет.смысла.возиться.с.каждым.параметром..Однако.некоторые.из.этих.пара- метров.существенно.влияют.на.использование.памяти,.производительность.и.на- дежность.производителей..Рассмотрим.их.

Настройка производителей 73
acks
Параметр.
acks
.задает.число.получивших.сообщение.реплик.разделов,.требующе- еся.для.признания.производителем.операции.записи.успешной..Этот.параметр.
существенно.влияет.на.вероятность.потери.сообщений..Он.может.принимать.три.
значения.
‰
‰
При.
acks=0
.производитель.не.ждет.ответа.от.брокера,.чтобы.счесть.отправку.
сообщения.успешной..Это.значит,.что,.если.произойдет.сбой.и.брокер.не.полу- чит.сообщение,.производитель.об.этом.не.узнает.и.сообщение.будет.потеряно..
Но.поскольку.производитель.не.ждет.какого-либо.ответа.от.сервера,.то.скорость.
отправки.сообщений.ограничивается.лишь.возможностями.сети,.так.что.эта.на- стройка.позволяет.достичь.очень.высокой.пропускной.способности.
‰
‰
При.
acks=1
.производитель.получает.от.брокера.ответ.об.успешном.получении.
сразу.же,.как.только.ведущая.реплика.получит.сообщение..Если.сообщение.
не.удалось.записать.на.ведущий.узел.(например,.когда.этот.узел.потерпел.фа- тальный.сбой,.а.новый.еще.не.успели.выбрать),.производитель.получит.ответ.
об.ошибке.и.может.попытаться.вновь.отправить.сообщение,.предотвращая.тем.
самым.потенциальную.потерю.данных..Сообщение.все.равно.может.быть.поте- ряно,.если.ведущий.узел.потерпел.фатальный.сбой,.а.в.качестве.нового.ведущего.
узла.была.выбрана.реплика,.не.содержащая.этого.сообщения.(из-за.«нечистых».
выборов.ведущего.узла)..В.этом.случае.пропускная.способность.зависит.от.того,.
отправляем.мы.сообщения.синхронно.или.асинхронно..Ожидание.ответа.от.
сервера.клиентским.кодом.(посредством.вызова.метода.
get()
,.возвращаемого.
при.отправке.сообщения.объекта.
Future
),.очевидно,.существенно.повысит.
длительность.задержки.(по.крайней.мере.на.время.перемещения.данных.по.
сети.в.обе.стороны)..При.использовании.клиентом.функций.обратного.вызова.
задержка.будет.незаметна,.но.пропускная.способность.окажется.ограничена.
числом.передаваемых.в.данный.момент.сообщений,.то.есть.числом.сообщений,.
которое.производитель.может.отправить.до.получения.ответов.от.сервера.
‰
‰
При.
acks=all
.производителю.приходит.ответ.от.брокера.об.успешном.получе- нии.сообщения.после.того,.как.оно.дойдет.до.всех.синхронизируемых.реплик..
Это.самый.безопасный.режим,.поскольку.есть.уверенность,.что.сообщение.по- лучено.более.чем.одним.брокером.и.оно.не.пропадет.даже.в.случае.аварийного.
сбоя.(больше.информации.по.этому.вопросу.вы.найдете.в.главе.5)..Однако.
в.случае.
acks=1
.задержка.будет.еще.выше,.ведь.придется.ждать.получения.со- общения.более.чем.одним.брокером.
buffer.memory
Этот.параметр.задает.объем.памяти,.используемой.производителем.для.буфериза- ции.сообщений,.ожидающих.отправки.брокерам..Если.приложение.отправляет.со- общения.быстрее,.чем.они.могут.быть.доставлены.серверу,.у.производителя.может.

74 Глава 3 • Производители Kafka: запись сообщений в Kafka исчерпаться.свободное.место.и.дополнительные.вызовы.метода.
send()
.приведут.
или.к.блокировке,.или.генерации.исключения.в.зависимости.от.значения.параме- тра.
block.on.buffer.full.
.(В.версии.0.9.0.0.он.заменен.параметром.
max.block.ms
,.
предоставляющим.возможность.блокировки.на.заданное.время.с.последующей.
генерацией.исключения.)
compression.type
По.умолчанию.сообщения.отправляются.в.несжатом.виде..Возможные.значения.
этого.параметра.—.
snappy
,.
gzip
.и.
lz4
,.при.которых.к.данным.применяются.соот- ветствующие.алгоритмы.сжатия.перед.отправкой.брокерам..Алгоритм.сжатия.
snappy
.был.разработан.компанией.Google.для.обеспечения.хорошей.степени.сжа- тия.при.низком.перерасходе.ресурсов.CPU.и.высокой.производительности..
Его.рекомендуется.использовать.в.случаях,.когда.важны.как.производительность,.
так.и.пропускная.способность.сети..Алгоритм.сжатия.
gzip
.обычно.использует.
больше.ресурсов.CPU.и.занимает.больше.времени,.но.обеспечивает.лучшую.сте- пень.сжатия..Поэтому.его.рекомендуется.использовать.в.случаях,.когда.пропуск- ная.способность.сети.ограничена..Благодаря.сжатию.можно.снизить.нагрузку.на.
сеть.и.хранилище,.часто.являющиеся.узкими.местами.при.отправке.сообщений.
в.Kafka.
retries
Производитель.получил.от.сервера.сообщение.об.ошибке..Возможно,.она.окажется.
временной,.например,.отсутствует.ведущий.узел.для.раздела..В.таком.случае.зна- чение.параметра.
retries
.будет.определять.число.предпринятых.производителем.
попыток.отправить.сообщение,.по.достижении.которого.он.прекратит.это.делать.
и.известит.клиента.о.проблеме..По.умолчанию.производитель.ждет.100.мс.перед.
следующей.попыткой,.но.это.поведение.можно.изменить.с.помощью.параметра.
retry.backoff.ms
..Мы.рекомендуем.проверить,.сколько.времени.занимает.вос- становление.после.аварийного.сбоя.брокера.(то.есть.сколько.времени.займет.вы- бор.новых.ведущих.узлов.для.всех.разделов),.и.задать.такие.количество.повторов.
и.задержку.между.ними,.чтобы.общее.время,.потраченное.на.повторы,.превышало.
время.восстановления.кластера.Kafka.после.сбоя,.иначе.производитель.прекратит.
попытки.повтора.слишком.рано..Не.при.всех.ошибках.производителю.имеет.смысл.
пытаться.повторять.отправку..Некоторые.ошибки.носят.не.временный.характер,.
и.при.их.возникновении.производитель.не.будет.пытаться.повторить.отправку.
(например,.такова.ошибка.«сообщение.слишком.велико»)..В.целом,.поскольку.
повторами.отправки.занимается.производитель,.нет.смысла.делать.это.в.коде.
приложения..Лучше.сосредоточить.усилия.на.обработке.ошибок,.которые.нельзя.
исправить.путем.повтора.отправки,.или.случаев,.когда.число.попыток.повтора.было.
исчерпано.без.результата.

Настройка производителей
1   ...   5   6   7   8   9   10   11   12   ...   39


написать администратору сайта