Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
67 Хотя.API.производителей.очень.простой,.внутри.производителя,.«под.капотом»,. при.отправке.данных.происходит.немного.больше.действий..Основные.этапы.от- правки.данных.в.Kafka.демонстрирует.рис..3.1. Рис. 3.1. Высокоуровневый обзор компонентов производителей Kafka Для.генерации.сообщений.для.Kafka.нам.понадобится.сначала.создать.объект. ProducerRecord ,.включающий.тему,.в.которую.мы.собираемся.отправить.запись,. и.значение..При.желании.можно.задать.также.ключ.и.раздел..После.отправки. объекта. ProducerRecord .он.прежде.всего.сериализует.объекты.ключа.и.значения. в.байтовые.массивы.для.отправки.по.сети. 68 Глава 3 • Производители Kafka: запись сообщений в Kafka Далее.данные.попадают.в.объект. Partitioner ..Если.в. ProducerRecord .был.указан. раздел,.объект. Partitioner .ничего.не.делает.и.просто.возвращает.указанный. раздел..Если.же.нет,.он.выбирает.раздел,.обычно.в.соответствии.с.ключом.из. ProducerRecord ..Если.раздел.выбран,.производитель.будет.знать,.в.какую.тему. и.раздел.должна.попасть.запись..После.этого.он.помещает.эту.запись.в.пакет.запи- сей,.предназначенных.для.отправки.в.соответствующие.тему.и.раздел..За.отправку. этих.пакетов.записей.соответствующему.брокеру.Kafka.отвечает.отдельный.поток. выполнения. После.получения.сообщений.брокер.отправляет.ответ..В.случае.успешной.записи. сообщений.в.Kafka.будет.возвращен.объект. RecordMetadata ,.содержащий.тему,.раз- дел.и.смещение.записи.в.разделе..Если.брокеру.не.удалось.записать.сообщения,.он. вернет.сообщение.об.ошибке..При.получении.сообщения.об.ошибке.производитель. может.попробовать.отправить.сообщение.еще.несколько.раз,.прежде.чем.оставит. эти.попытки.и.вернет.ошибку. Создание производителя Kafka Первый.шаг.записи.сообщений.в.Kafka.—.создание.объекта.производителя.со.свой- ствами,.которые.вы.хотели.бы.передать.производителю..У.производителей.Kafka. есть.три.обязательных.свойства: bootstrap.servers .—.список.пар. host:port .брокеров,.используемых.произ- водителем.для.первоначального.соединения.с.кластером.Kafka..Он.не.обязан. включать.все.брокеры,.поскольку.производитель.может.получить.дополнитель- ную.информацию.после.начального.соединения..Но.рекомендуется.включить. в.него.хотя.бы.два.брокера,.чтобы.производитель.мог.подключиться.к.кластеру. при.сбое.одного.из.них; key.serializer .—.имя.класса,.применяемого.для.сериализации.ключей.запи- сей,.генерируемых.для.отправки.в.Kafka..Брокеры.Kafka.ожидают,.что.в.каче- стве.ключей.и.значений.сообщений.используются.байтовые.массивы..Однако. интерфейс.производителя.позволяет.отправлять.в.качестве.ключа.и.значе- ния.(посредством.использования.параметризованных.типов).любой.объект.. Это.сильно.повышает.удобочитаемость.кода,.но.производителю.при.этом. нужно.знать,.как.преобразовать.эти.объекты.в.байтовые.массивы..Значением. свойства. key.serializer .должно.быть.имя.класса,.реализующего.интерфейс org.apache.kafka.common.serialization.Serializer ..С.помощью.этого.класса. производитель.сериализует.объект.ключа.в.байтовый.массив..Пакет.программ. клиента.Kafka.включает.классы. ByteArraySerializer .(почти.ничего.не.делает),. StringSerializer .и. IntegerSerializer ,.так.что.при.использовании.обычных. типов.данных.нет.необходимости.реализовывать.свои.сериализаторы..Задать. значение.свойства. key.serializer .необходимо,.даже.если.вы.планируете.от- правлять.только.значения; Создание производителя Kafka 69 value.serializer .—.имя.класса,.используемого.для.сериализации.значений. записей,.генерируемых.для.отправки.в.Kafka..Аналогично.тому,.как.значение. свойства. key.serializer .соответствует.классу,.с.помощью.которого.произ- водитель.сериализует.объект.ключа.сообщения.в.байтовый.массив,.значение. свойства. value.serializer .должно.быть.равно.имени.класса,.служащего.для. сериализации.объекта.значения.сообщения. Следующий.фрагмент.кода.демонстрирует.создание.нового.производителя.с.по- мощью.задания.лишь.обязательных.параметров.и.использования.значений.по.умол- чанию.для.всего.остального: private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer .Начинаем.с.объекта. Properties .Поскольку.мы.планируем.использовать.строки.для.ключа.и.значения.сообще- ния,.воспользуемся.встроенным.типом. StringSerializer .Создаем.новый.производитель,.задавая.подходящие.типы.ключа.и.значения. и.передавая.в.конструктор.объект. Properties При.таком.простом.интерфейсе.понятно,.что.управляют.поведением.производителя. в.основном.заданием.соответствующих.параметров.конфигурации..В.документации. Apache.Kafka.описаны.все.параметры.конфигурации..Кроме.того,.мы.рассмотрим. самые.важные.из.них.далее.в.этой.главе. После.создания.экземпляра.производителя.можно.приступать.к.отправке.сообще- ний..Существует.три.основных.метода.отправки.сообщений. Сделать и забыть. Отправляем.сообщение.на.сервер,.после.чего.особо.не.вол- нуемся,.дошло.оно.или.нет..Обычно.оно.доходит.успешно,.поскольку.Kafka. отличается.высокой.доступностью,.а.производитель.повторяет.отправку.со- общений.автоматически..Однако.часть.сообщений.при.использовании.этого. метода.теряется. Синхронная отправка. При.отправке.сообщения.метод. send() .возвращает.объ- ект-фьючерс.(объект.класса.Future),.после.чего.можно.воспользоваться.мето- дом. get() .для.организации.ожидания.и.выяснения.того,.успешно.ли.прошла. отправка. Асинхронная отправка. Методу. send() .передается.функция.обратного.вызова,. которая.вызывается.при.получении.ответа.от.брокера.Kafka. 70 Глава 3 • Производители Kafka: запись сообщений в Kafka В.примерах.в.дальнейшем.мы.увидим,.как.отправлять.сообщения.с.помощью. данных.методов.и.как.обрабатывать.различные.типы.возникающих.при.этом.оши- бок. Хотя.все.примеры.в.этой.главе.однопоточны,.объект.производителя.может.ис- пользоваться.для.отправки.сообщений.несколькими.потоками..Лучше.всего.начать. с.одного.производителя.и.одного.потока..Чтобы.повысить.пропускную.способ- ность,.можно.будет.добавить.дополнительные.потоки,.использующие.тот.же.про- изводитель..А.когда.этот.метод.перестанет.приносить.плоды.—.добавить.в.прило- жение.дополнительные.производители.для.еще.большего.повышения.пропускной. способности. Отправка сообщения в Kafka Вот.простейший.способ.отправки.сообщения: ProducerRecord new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } .Производитель.получает.на.входе.объекты. ProducerRecord ,.так.что.начнем.с.соз- дания.такого.объекта..У.класса. ProducerRecord .есть.несколько.конструкторов,. которые.обсудим.позднее..В.данном.случае.мы.имеем.дело.с.конструктором,. принимающим.на.входе.строковое.значение.—.название.темы,.в.которую. отправляются.данные,.и.отсылаемые.в.Kafka.ключ.и.значение.(тоже.стро- ки)..Типы.ключа.и.значения.должны.соответствовать.объектам. serializer и. producer .Для.отправки.объекта.типа. ProducerRecord .используем.метод. send() .объекта. producer ..Как.мы.уже.видели.в.схеме.архитектуры.производителя.(см..рис..3.1),. сообщение.помещается.в.буфер.и.отправляется.брокеру.в.отдельном.потоке.. Метод. send() .возвращает.объект.класса.Future.языка.Java.( http://www.bit.ly/2rG7Cg6 ),. включающий. RecordMetadata ,.но.поскольку.мы.просто.игнорируем.возвращае- мое.значение,.то.никак.не.можем.узнать,.успешно.ли.было.отправлено.сообще- ние..Такой.способ.отправки.сообщений.можно.использовать,.только.если.по- терять.сообщение.вполне.допустимо. .Хотя.ошибки,.возможные.при.отправке.сообщений.брокерам.Kafka.или.воз- никающие.в.самих.брокерах,.игнорируются,.при.появлении.в.производителе. ошибки.перед.отправкой.сообщения.в.Kafka.вполне.может.быть.сгенерировано. исключение..Это.может.быть.исключение. SerializationException .при.неудач- Отправка сообщения в Kafka 71 ной.сериализации.сообщения,. BufferExhaustedException .или. TimeoutException при.переполнении.буфера,. InterruptException .при.сбое.отправляющего.по- тока. Синхронная отправка сообщения Вот.простейший.способ.синхронной.отправки.сообщения: ProducerRecord new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { producer.send(record).get(); } catch (Exception e) { e.printStackTrace(); } .Используем.метод. Future.get() .для.ожидания.ответа.от.Kafka..Этот.метод. генерирует.исключение.в.случае.неудачи.отправки.записи.в.Kafka..При.отсут- ствии.ошибок.мы.получим.объект. RecordMetadata ,.из.которого.можно.узнать. смещение,.соответствующее.записанному.сообщению. .Если.перед.отправкой.данных.в.Kafka.или.во.время.нее.возникли.ошибки,.если. брокер.Kafka.вернул.сообщение.об.ошибке,.исключающие.повтор.отправки. или.лимит.на.повторы.исчерпан,.нас.будет.ожидать.исключение..В.этом.случае. просто.выводим.информацию.о.нем. В.классе. KafkaProducer .существует.два.типа.ошибок..Первый.тип.—.ошибки,.ко- торые.можно.исправить,.отправив.сообщение.повторно.(retriable)..Например,.так. можно.исправить.ошибку.соединения,.поскольку.через.некоторое.время.оно.спо- собно.восстановиться..Ошибка.«отсутствует.ведущий.узел».может.быть.исправлена. выбором.нового.ведущего.узла.для.раздела..Можно.настроить. KafkaProducer .так,. чтобы.при.таких.ошибках.отправка.повторялась.автоматически..Код.приложения. будет.получать.исключения.подобного.типа.только.тогда,.когда.лимит.на.повторы. уже.исчерпан,.а.ошибка.все.еще.не.исправлена..Некоторые.ошибки.невозможно. исправить.повторной.отправкой.сообщения..Такова,.например,.ошибка.«сообщение. слишком.велико»..В.подобных.случаях. KafkaProducer .не.станет.пытаться.повторить. отправку,.а.вернет.исключение.сразу.же. Асинхронная отправка сообщения Пусть.время.прохождения.сообщения.между.нашим.приложением.и.кластером. Kafka.и.обратно.равно.10.мс..Если.мы.будем.ждать.ответа.после.отправки.каждого. сообщения,.отправка.100.сообщений.займет.около.1.секунды..В.то.же.время,.если. не.ожидать.ответов,.отправка.всех.сообщений.не.займет.практически.никакого. времени..В.большинстве.случаев.ответ.и.не.требуется.—.Kafka.возвращает.тему,. 72 Глава 3 • Производители Kafka: запись сообщений в Kafka раздел.и.смещение.записи,.которые.обычно.не.нужны.отправляющему.приложе- нию..Однако.нам.нужно.знать,.удалась.ли.вообще.отправка.сообщения,.чтобы. можно.было.сгенерировать.исключение,.зафиксировать.в.журнале.ошибку.или,. возможно,.записать.сообщение.в.файл.ошибок.для.дальнейшего.анализа. Для.асинхронной.отправки.сообщений.с.сохранением.возможности.обработки. различных.сценариев.ошибок.производители.поддерживают.добавление.функции. обратного.вызова.при.отправке.записи..Вот.пример.использования.функции.об- ратного.вызова: private class DemoProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } } } ProducerRecord new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); producer.send(record, new DemoProducerCallback()); .Для.использования.функций.обратного.вызова.нам.понадобится.класс,.реали- зующий.интерфейс. org.apache.kafka.clients.producer.Callback ,.включающий. одну-единственную.функцию. onCompletion() .Если.Kafka.вернет.ошибку,.в.функцию. onCompletion() .попадет.непустое.ис- ключение..В.приведенном.коде.вся.его.обработка.заключается.в.выводе.ин- формации,.но.в.коде.для.промышленной.эксплуатации.функции.обработки. исключений,.вероятно,.будут.более.ошибкоустойчивыми. .Записи.остаются.такими.же,.как.и.раньше. .И.мы.передаем.объект. Callback .при.отправке.записи. Настройка производителей До.сих.пор.мы.практически.не.сталкивались.с.конфигурационными.параметрами. производителей.—.только.с.обязательным.URI. bootstrap.servers .и.сериализато- рами. У.производителя.есть.множество.параметров.конфигурации,.большинство.из.ко- торых.описаны.в.документации.Apache.Kafka.( http://kafka.apache.org/documentation. html#producerconfigs )..Значения.по.умолчанию.многих.из.них.вполне.разумны,.так. что.нет.смысла.возиться.с.каждым.параметром..Однако.некоторые.из.этих.пара- метров.существенно.влияют.на.использование.памяти,.производительность.и.на- дежность.производителей..Рассмотрим.их. Настройка производителей 73 acks Параметр. acks .задает.число.получивших.сообщение.реплик.разделов,.требующе- еся.для.признания.производителем.операции.записи.успешной..Этот.параметр. существенно.влияет.на.вероятность.потери.сообщений..Он.может.принимать.три. значения. При. acks=0 .производитель.не.ждет.ответа.от.брокера,.чтобы.счесть.отправку. сообщения.успешной..Это.значит,.что,.если.произойдет.сбой.и.брокер.не.полу- чит.сообщение,.производитель.об.этом.не.узнает.и.сообщение.будет.потеряно.. Но.поскольку.производитель.не.ждет.какого-либо.ответа.от.сервера,.то.скорость. отправки.сообщений.ограничивается.лишь.возможностями.сети,.так.что.эта.на- стройка.позволяет.достичь.очень.высокой.пропускной.способности. При. acks=1 .производитель.получает.от.брокера.ответ.об.успешном.получении. сразу.же,.как.только.ведущая.реплика.получит.сообщение..Если.сообщение. не.удалось.записать.на.ведущий.узел.(например,.когда.этот.узел.потерпел.фа- тальный.сбой,.а.новый.еще.не.успели.выбрать),.производитель.получит.ответ. об.ошибке.и.может.попытаться.вновь.отправить.сообщение,.предотвращая.тем. самым.потенциальную.потерю.данных..Сообщение.все.равно.может.быть.поте- ряно,.если.ведущий.узел.потерпел.фатальный.сбой,.а.в.качестве.нового.ведущего. узла.была.выбрана.реплика,.не.содержащая.этого.сообщения.(из-за.«нечистых». выборов.ведущего.узла)..В.этом.случае.пропускная.способность.зависит.от.того,. отправляем.мы.сообщения.синхронно.или.асинхронно..Ожидание.ответа.от. сервера.клиентским.кодом.(посредством.вызова.метода. get() ,.возвращаемого. при.отправке.сообщения.объекта. Future ),.очевидно,.существенно.повысит. длительность.задержки.(по.крайней.мере.на.время.перемещения.данных.по. сети.в.обе.стороны)..При.использовании.клиентом.функций.обратного.вызова. задержка.будет.незаметна,.но.пропускная.способность.окажется.ограничена. числом.передаваемых.в.данный.момент.сообщений,.то.есть.числом.сообщений,. которое.производитель.может.отправить.до.получения.ответов.от.сервера. При. acks=all .производителю.приходит.ответ.от.брокера.об.успешном.получе- нии.сообщения.после.того,.как.оно.дойдет.до.всех.синхронизируемых.реплик.. Это.самый.безопасный.режим,.поскольку.есть.уверенность,.что.сообщение.по- лучено.более.чем.одним.брокером.и.оно.не.пропадет.даже.в.случае.аварийного. сбоя.(больше.информации.по.этому.вопросу.вы.найдете.в.главе.5)..Однако. в.случае. acks=1 .задержка.будет.еще.выше,.ведь.придется.ждать.получения.со- общения.более.чем.одним.брокером. buffer.memory Этот.параметр.задает.объем.памяти,.используемой.производителем.для.буфериза- ции.сообщений,.ожидающих.отправки.брокерам..Если.приложение.отправляет.со- общения.быстрее,.чем.они.могут.быть.доставлены.серверу,.у.производителя.может. 74 Глава 3 • Производители Kafka: запись сообщений в Kafka исчерпаться.свободное.место.и.дополнительные.вызовы.метода. send() .приведут. или.к.блокировке,.или.генерации.исключения.в.зависимости.от.значения.параме- тра. block.on.buffer.full. .(В.версии.0.9.0.0.он.заменен.параметром. max.block.ms ,. предоставляющим.возможность.блокировки.на.заданное.время.с.последующей. генерацией.исключения.) compression.type По.умолчанию.сообщения.отправляются.в.несжатом.виде..Возможные.значения. этого.параметра.—. snappy ,. gzip .и. lz4 ,.при.которых.к.данным.применяются.соот- ветствующие.алгоритмы.сжатия.перед.отправкой.брокерам..Алгоритм.сжатия. snappy .был.разработан.компанией.Google.для.обеспечения.хорошей.степени.сжа- тия.при.низком.перерасходе.ресурсов.CPU.и.высокой.производительности.. Его.рекомендуется.использовать.в.случаях,.когда.важны.как.производительность,. так.и.пропускная.способность.сети..Алгоритм.сжатия. gzip .обычно.использует. больше.ресурсов.CPU.и.занимает.больше.времени,.но.обеспечивает.лучшую.сте- пень.сжатия..Поэтому.его.рекомендуется.использовать.в.случаях,.когда.пропуск- ная.способность.сети.ограничена..Благодаря.сжатию.можно.снизить.нагрузку.на. сеть.и.хранилище,.часто.являющиеся.узкими.местами.при.отправке.сообщений. в.Kafka. retries Производитель.получил.от.сервера.сообщение.об.ошибке..Возможно,.она.окажется. временной,.например,.отсутствует.ведущий.узел.для.раздела..В.таком.случае.зна- чение.параметра. retries .будет.определять.число.предпринятых.производителем. попыток.отправить.сообщение,.по.достижении.которого.он.прекратит.это.делать. и.известит.клиента.о.проблеме..По.умолчанию.производитель.ждет.100.мс.перед. следующей.попыткой,.но.это.поведение.можно.изменить.с.помощью.параметра. retry.backoff.ms ..Мы.рекомендуем.проверить,.сколько.времени.занимает.вос- становление.после.аварийного.сбоя.брокера.(то.есть.сколько.времени.займет.вы- бор.новых.ведущих.узлов.для.всех.разделов),.и.задать.такие.количество.повторов. и.задержку.между.ними,.чтобы.общее.время,.потраченное.на.повторы,.превышало. время.восстановления.кластера.Kafka.после.сбоя,.иначе.производитель.прекратит. попытки.повтора.слишком.рано..Не.при.всех.ошибках.производителю.имеет.смысл. пытаться.повторять.отправку..Некоторые.ошибки.носят.не.временный.характер,. и.при.их.возникновении.производитель.не.будет.пытаться.повторить.отправку. (например,.такова.ошибка.«сообщение.слишком.велико»)..В.целом,.поскольку. повторами.отправки.занимается.производитель,.нет.смысла.делать.это.в.коде. приложения..Лучше.сосредоточить.усилия.на.обработке.ошибок,.которые.нельзя. исправить.путем.повтора.отправки,.или.случаев,.когда.число.попыток.повтора.было. исчерпано.без.результата. |