Главная страница

Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


Скачать 7.59 Mb.
НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Дата21.06.2022
Размер7.59 Mb.
Формат файлаpdf
Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
ТипДокументы
#609074
страница12 из 39
1   ...   8   9   10   11   12   13   14   15   ...   39
Как происходит распределение разделов по брокерам
Когда потребитель хочет присоединиться к группе, он отправляет координатору группы запрос JoinGroup. Первый присоединившийся к группе потребитель стано- вится ведущим потребителем группы. Ведущий получает от координатора группы список всех потребителей группы, которые недавно отправляли контрольные сигналы, а значит, функционируют нормально, и отвечает за назначение потре- бителям подмножеств разделов. Для определения того, за какие разделы какой потребитель должен отвечать, используется реализация класса PartitionAssignor.
В Kafka есть две стратегии назначения разделов, которые мы подробнее обсудим в посвященном настройке разделе. После распределения разделов ведущий по- требитель группы отправляет список назначений координатору группы, который пересылает эту информацию потребителям. Каждый потребитель знает только о назначенных ему разделах. Единственный клиентский процесс, обладающий полным списком потребителей группы и назначенных им разделов, — ведущий группы. Эта процедура повторяется при каждой перебалансировке.

94 Глава 4 • Потребители Kafka: чтение данных из Kafka
Создание потребителя Kafka
Первый.шаг.к.получению.записей.—.создание.экземпляра.класса.
KafkaConsumer
Создание.экземпляра.класса.
KafkaConsumer
.очень.похоже.на.создание.экземпляра.
KafkaProducer
.—.необходимо.просто.создать.экземпляр.Java-класса.
Properties
,.
содержащий.свойства,.которые.вы.хотели.бы.передать.потребителю..Далее.в.этой.
главе.мы.подробнее.обсудим.все.свойства..Для.начала.нам.понадобятся.лишь.три.
обязательных:.
bootstrap.servers
,.
key.deserializer
.и.
value.deserializer
Свойство.
bootstrap.servers
.представляет.собой.строку.подключения.к.кластеру.
Kafka..Оно.используется.так.же,.как.и.в.
KafkaProducer
.(за.подробностями.можете.
обратиться.к.главе.3)..Два.других.свойства,.
key.deserializer
.и.
value.deserializer
,.
схожи.с.сериализаторами.для.производителей,.но.вместо.классов,.преобразующих.
Java-объекты.в.байтовые.массивы,.необходимо.задать.классы,.преобразующие.
байтовые.массивы.в.Java-объекты.
Есть.и.четвертое.свойство,.
group.id
,.которое,.строго.говоря,.не.является.обяза- тельным,.но.мы.пока.сделаем.вид,.что.является..Это.свойство.задает.группу.по- требителей,.к.которой.относится.экземпляр.
KafkaConsumer
..Хотя.можно.создавать.
и.потребителей,.не.принадлежащих.ни.к.одной.группе,.в.большей.части.данной.
главы.будем.предполагать,.что.все.потребители.состоят.в.группах.
Следующий.фрагмент.кода.демонстрирует.создание.объекта.
KafkaConsumer
:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumerString>(props);
Бо'льшая.часть.этого.кода.вам.уже.знакома,.если.вы.читали.главу.3,.посвященную.
созданию.производителей..Мы.считаем,.что.как.ключ,.так.и.значение.читаемых.
нами.записей.представляют.собой.объекты.
String
..Единственное.новое.свойство.
тут.
group.id
.—.название.группы,.к.которой.принадлежит.потребитель.
Подписка на темы
Следующий.шаг.после.создания.потребителя.—.подписка.его.на.одну.тему.или.
несколько..Метод.
subcribe()
.всего.лишь.требует.передачи.в.качестве.параметра.
списка.тем,.так.что.использовать.его.довольно.просто:
consumer.subscribe(Collections.singletonList("customerCountries"));

Цикл опроса 95
.Просто.создаем.список,.содержащий.один.элемент,.—.название.темы custo- merCountries
Можно.также.вызвать.метод.
subcribe()
.с.регулярным.выражением.в.качестве.
параметра..Это.выражение.может.соответствовать.нескольким.названиям.тем,.
так.что.при.создании.новой.темы.с.подходящим.под.это.регулярное.выражение.
названием.практически.тотчас.же.будет.выполнена.перебалансировка,.а.потре- бители.начнут.получать.данные.из.новой.темы..Это.удобно.для.приложений,.
которым.требуется.получать.данные.из.нескольких.тем.и.обрабатывать.данные,.
содержащиеся.в.них..Подписка.на.несколько.тем.с.помощью.регулярного.выра- жения.чаще.всего.используется.в.приложениях,.реплицирующих.данные.между.
Kafka.и.другой.системой.
Для.подписки.на.все.темы.
test
.можно.выполнить.следующий.вызов:
consumer.subscribe(Pattern.compile("test.*"));
Цикл опроса
В.самой.основе.API.потребителей.лежит.простой.цикл.опроса.сервера..После.
подписки.потребителя.на.тему.цикл.опроса.осуществляет.координацию.и.пере- балансировку.разделов,.отправляет.контрольные.сигналы.и.извлекает.данные..
Разработчику.предоставляется.чистый.API,.просто.возвращающий.доступные.
данные.из.соответствующих.разделов..Основной.код.потребителя.выглядит.сле- дующим.образом:
try {
while (true) {
ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records)
{
log.debug("topic = %s, partition = %d, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
} finally {
consumer.close();
}

96 Глава 4 • Потребители Kafka: чтение данных из Kafka
.Разумеется,.это.бесконечный.цикл..Потребители.обычно.представляют.собой.
работающие.в.течение.длительного.времени.приложения,.непрерывно.опра- шивающие.Kafka.на.предмет.дополнительных.данных..Далее.в.этой.главе.мы.
покажем,.как.можно.аккуратно.выйти.из.цикла.и.закрыть.потребитель.
.Это.важнейшая.строка.кода.в.данной.главе..Как.акулы.должны.непрерывно.
двигаться,.чтобы.не.погибнуть,.потребители.должны.опрашивать.Kafka,.ина- че.их.сочтут.неработающими,.а.разделы,.откуда.они.получают.данные,.будут.
переданы.другим.потребителям.группы..Передаваемый.нами.в.метод.
poll()
параметр.представляет.собой.длительность.ожидания.и.определяет,.сколько.
времени.будет.длиться.блокировка.в.случае.недоступности.данных.в.буфере.
потребителя..Если.этот.параметр.равен.
0
,.возврат.из.метода.
poll()
.произойдет.
немедленно,.в.противном.случае.он.будет.ожидать.поступления.данных.от.
брокера.указанное.число.миллисекунд..Значение.обычно.определяется.потреб- ностью.приложения.в.скорости.откликов,.то.есть.тем,.насколько.быстро.нужно.
вернуть.управление.выполняющему.опрос.потоку.
.Метод.
poll()
.возвращает.список.записей,.каждая.из.которых.содержит.тему.
и.раздел,.из.которого.она.поступила,.смещение.записи.в.разделы.и,.конечно,.
ключ.и.значение.записи..Обычно.по.списку.проходят.в.цикле,.и.записи.обраба- тываются.по.отдельности.
.Обработка.обычно.заканчивается.записью.результата.в.хранилище.данных.или.
обновлением.сохраненной.записи..Цель.состоит.в.ведении.текущего.списка.
покупателей.из.каждого.округа,.так.что.мы.обновляем.хеш-таблицу.и.выводим.
результат.в.виде.JSON..В.более.реалистичном.примере.результаты.обновлений.
сохранялись.бы.в.хранилище.данных.
.Всегда.закрывайте.(выполняйте.операцию.
close()
).потребитель.перед.за- вершением.его.выполнения..Эта.операция.приводит.к.закрытию.сетевых.под- ключений.и.сокетов..Она.также.инициирует.немедленную.перебалансировку,.
не.дожидаясь.обнаружения.координатором.группы.того,.что.потребитель.пере- стал.отправлять.контрольные.сигналы.и,.вероятно,.более.не.функционирует..
Это.заняло.бы.намного.больше.времени.и,.следовательно,.увеличило.период,.
в.течение.которого.потребители.не.могут.получать.сообщения.из.некоторого.
подмножества.разделов.
Цикл.
poll
.не.только.получает.данные..При.первом.вызове.метода.
poll()
.для.
нового.потребителя.он.отвечает.за.поиск.координатора.группы,.присоединение.
потребителя.к.группе.и.назначение.ему.разделов..Перебалансировка.в.случае.ее.
запуска.также.выполняется.в.цикле.опроса..И.конечно,.в.цикле.опроса.отправля- ются.контрольные.сигналы,.подтверждающие.функционирование.потребителей..
Поэтому.необходимо.гарантировать.быстроту.и.эффективность.всего.кода,.вы- полняемого.между.итерациями.

Настройка потребителей 97
Потокобезопасность
Несколько потребителей, относящихся к одной группе, не могут работать в одном потоке, и несколько потоков не могут безопасно использовать один потребитель.
Железное правило: один потребитель на один поток. Для работы нескольких по- требителей в одной группе в одном приложении необходимо выполнять каждый из них в отдельном потоке. Не помешает обернуть логику потребителя в отдельный объект и воспользоваться объектом типа ExecutorService языка Java для запуска нескольких потоков, каждый — со своим потребителем. В блоге Confluent вы можете найти руководство (bit.ly/2tfVu6O) по выполнению этих действий.
Настройка потребителей
До.сих.пор.мы.сосредотачивались.на.изучении.API.потребителей,.но.рассмотрели.
лишь.несколько.параметров.настройки.—.обязательные.параметры.
bootstrap.ser- vers
,.
group.id
,.
key.deserializer
.и.
value.deserializer
..Все.настройки.потребителей.
описаны.в.документации.Apache.Kafka.(
http://kafka.apache.org/documentation.html#new- consumerconfigs
)..Значения.по.умолчанию.большинства.параметров.вполне.разумны.
и.не.требуют.изменения,.но.некоторые.могут.серьезно.повлиять.на.производитель- ность.и.доступность.потребителей..Рассмотрим.наиболее.важные.из.них.
fetch.min.bytes
Это.свойство.позволяет.потребителю.задавать.минимальный.объем.данных,.полу- чаемых.от.брокера.при.извлечении.записей..Если.брокеру.поступает.запрос.на.за- писи.от.потребителя,.но.новые.записи.оказываются.на.несколько.байт.меньше,.чем.
значение.
fetch.min.bytes
,.брокер.будет.ждать.до.тех.пор,.пока.не.появятся.новые.
сообщения,.прежде.чем.отправлять.записи.потребителю..Это.снижает.нагрузку.как.
на.потребитель,.так.и.на.брокер,.ведь.им.приходится.обрабатывать.меньше.переме- щаемых.туда.и.обратно.сообщений.при.небольшом.объеме.новых.действий.в.темах.
или.в.часы.пониженной.активности..При.слишком.активном.использовании.CPU.
при.небольшом.количестве.доступных.данных.или.для.снижения.нагрузки.на.бро- керы.при.большом.числе.потребителей.лучше.повысить.значение.этого.параметра.
по.сравнению.с.принятым.по.умолчанию.
fetch.max.wait.ms
Задавая.параметр.
fetch.min.bytes
,.вы.сообщаете.Kafka.о.необходимости.подождать.
до.того.момента,.когда.у.него.будет.достаточно.данных.для.отправки,.прежде.чем.
отвечать.потребителю..Параметр.
fetch.max.wait.ms
.позволяет.управлять.тем,.сколь- ко.именно.ждать..По.умолчанию.Kafka.ждет.500.мс..Это.приводит.к.дополнительной.
задержке.до.500.мс.при.отсутствии.достаточного.объема.поступающих.в.тему.Kafka.

98 Глава 4 • Потребители Kafka: чтение данных из Kafka данных..Если.нужно.ограничить.потенциальную.задержку.(обычно.из-за.соглаше- ний.об.уровне.предоставления.услуг,.определяющих.максимальную.задержку.
приложения),.можно.задать.меньшее.значение.параметра.
fetch.max.wait.ms
Если.установить.для.
fetch.max.wait.ms
.100.мс,.а.для fetch.min.bytes
.—.1.Мбайт,.
Kafka.отправит.данные.в.ответ.на.запрос.потребителя,.или.когда.объем.возвраща- емых.данных.достигнет.1.Мбайт,.или.по.истечении.100.мс.в.зависимости.от.того,.
что.произойдет.первым.
max.partition.fetch.bytes
Это.свойство.определяет.максимальное.число.байт,.возвращаемых.сервером.из.
расчета.на.один.раздел..Значение.по.умолчанию.равно.1.Мбайт..Это.значит,.что.
при.возврате.методом.
KafkaConsumer.poll()
.объекта.
ConsumerRecords
.объект.записи.
будет.занимать.не.более.
max.partition.fetch.bytes
.на.каждый.назначенный.потре- бителю.раздел..Так.что.если.в.теме.20.разделов,.а.потребителей.—.5,.каждому.из.них.
понадобится.4.Мбайт.доступной.для.объекта.
ConsumerRecords
.памяти..На.практике.
обычно.приходится.выделять.больше.памяти,.поскольку.в.случае.отказа.потре- бителя.каждому.из.оставшихся.придется.взять.на.себя.работу.с.бо'льшим.числом.
разделов..Значение.
max.partition.fetch.bytes
.должно.быть.больше.максимально.
приемлемого.для.брокера.размера.сообщения.(который.задается.параметром.
max.message.size
.конфигурации.брокера),.иначе.брокер.может.выдать.сообщение,.
которое.потребитель.не.сможет.прочитать,.в.результате.чего.зависнет..Еще.один.
важный.нюанс.—.количество.времени,.которое.нужно.потребителю.для.обработки.
данных..Как.вы.помните,.потребитель.должен.вызывать.метод.
poll()
.достаточно.
часто,.чтобы.не.истекло.время.ожидания.и.не.произошла.перебалансировка..Если.
возвращаемый.при.отдельном.вызове.метода.
poll()
.объем.данных.очень.велик,.по- требителю.может.потребоваться.слишком.много.времени.на.обработку,.из-за.чего.
он.не.успеет.начать.следующую.итерацию.цикла.опроса.вовремя.и.время.ожидания.
истечет..В.этом.случае.можно.или.уменьшить.значение.
max.partition.fetch.bytes
,.
или.увеличить.время.ожидания.сеанса.
session.timeout.ms
По.умолчанию.потребитель.может.находиться.вне.связи.с.брокерами.и.продолжать.
считаться.работающим.не.более.3.с..Если.потребитель.не.отправляет.контрольный.
сигнал.координатору.группы.в.течение.промежутка.времени,.большего,.чем.опре- делено.параметром.
session.timeout.ms
,.он.считается.отказавшим.и.координатор.
группы.инициирует.перебалансировку.группы.потребителей.с.назначением.раз- делов.отказавшего.потребителя.другим.потребителям.группы..Этот.параметр.
тесно.связан.с.параметром.
heartbeat.interval.ms
,.который.задает.частоту.отправки.
методом.
poll()
.объекта.
KafkaConsumer
.контрольных.сигналов.координатору.груп- пы,.в.то.время.как.параметр.
session.timeout.ms
.задает.время,.в.течение.которого.
потребитель.может.обходиться.без.отправки.контрольного.сигнала..Следовательно,.

Настройка потребителей 99
эти.два.параметра.обычно.меняют.одновременно.—.значение.
heartbeat.interval.ms должно.быть.меньше.значения.
session.timeout.ms
.(обычно.составляет.треть.от.
него)..Так,.если.значение.
heartbeat.interval.ms
.составляет.3.с,.то.
session.time- out.ms
.следует.задать.равным.1.с..Значение.
session.timeout.ms
.меньшее,.чем.за- дано.по.умолчанию,.позволяет.группам.потребителей.быстрее.обнаруживать.отказы.
потребителей.и.восстанавливаться.после.них..В.то.же.время.оно.может.стать.при- чиной.нежелательной.перебалансировки.из-за.более.длительной.итерации.цикла.
опроса.или.сборки.мусора..Задание.более.высокого.значения.
session.timeout.ms снизит.вероятность.случайной.перебалансировки,.но.и.для.обнаружения.настоя- щего.сбоя.в.этом.случае.потребуется.больше.времени.
auto.offset.reset
Этот.параметр.определяет.поведение.потребителя.при.начале.чтения.раздела,.для.
которого.у.него.зафиксированное.смещение.отсутствует.или.стало.некорректным.
(например,.вследствие.слишком.продолжительного.бездействия.потребителя,.
приведшего.к.удалению.записи.с.этим.смещением.с.брокера)..Значение.по.умол- чанию.—.latest.(«самое.позднее»)..Это.значит,.что.в.отсутствие.корректного.сме- щения.потребитель.начинает.читать.самые.свежие.записи.(сделанные.после.на- чала.его.работы)..Альтернативное.значение.—.earliest.(«самое.раннее»),.при.
котором.в.отсутствие.корректного.смещения.потребитель.читает.все.данные.из.
раздела.с.начала.
enable.auto.commit
Далее.в.этой.главе.мы.обсудим.различные.варианты.фиксации.смещений..Данный.
параметр.определяет,.будет.ли.потребитель.фиксировать.смещения.автоматически,.
и.по.умолчанию.равен.
true
..Если.вы.предпочитаете.контролировать,.когда.фикси- руются.смещения,.установите.для.него.значение.
false
..Это.нужно.для.того,.чтобы.
уменьшить.степень.дублирования.и.избежать.пропущенных.данных..При.значении.
true
.этого.параметра.имеет.смысл.задать.также.частоту.фиксации.смещений.с.по- мощью.параметра.
auto.commit.interval.ms partition.assignment.strategy
Мы.уже.знаем,.что.разделы.распределяются.по.потребителям.в.группе..Класс.
PartitionAssignor
.определяет.(при.заданных.потребителях.и.темах,.на.которые.они.
подписаны),.какие.разделы.к.какому.потребителю.будут.относиться..По.умолчанию.
в.Kafka.есть.две.стратегии.распределения.
‰
‰
Диапазонная (Range). Каждому.потребителю.присваиваются.последовательные.
подмножества.разделов.из.тем,.на.которые.он.подписан..Так.что.если.потребите- ли.C1.и.C2.подписаны.на.темы.T1.и.T2,.оба.по.три.раздела,.то.потребителю.C1.

100 Глава 4 • Потребители Kafka: чтение данных из Kafka будут.назначены.разделы.0.и.1.из.тем.T1.и.T2,.а.C2.—.раздел.2.из.тем.T1.и.T2..
Поскольку.в.каждой.из.тем.нечетное.количество.разделов,.а.распределение.
разделов.по.потребителям.выполняется.для.каждой.темы.отдельно,.у.первого.
потребителя.окажется.больше.разделов,.чем.у.второго..Подобное.происходит.
всегда,.когда.используется.диапазонная.стратегия.распределения,.а.количе- ство.потребителей.не.делится.нацело.на.количество.разделов.в.каждой.теме.
‰
‰
Циклическая (RoundRobin). Все.разделы.от.всех.подписанных.тем.распределяют- ся.по.потребителям.последовательно,.один.за.другим..Если.бы.описанные.ранее.
потребители.C1.и.C2.использовали.циклическое.распределение,.C1.были.бы.
назначены.разделы.0.и.2.из.темы.T1.и.раздел.1.из.темы.T2,.а.C2.были.бы.на- значены.раздел.1.из.темы.T1.и.разделы.0.и.2.из.темы.T2..Когда.все.потребители.
подписаны.на.одни.и.те.же.темы.(очень.распространенный.сценарий),.цикличе- ское.распределение.дает.одинаковое.количество.разделов.у.всех.потребителей.
(или,.в.крайнем.случае,.различие.в.1.раздел).
Параметр.
partition.assignment.strategy
.позволяет.выбирать.стратегию.распре- деления.разделов..Стратегия.по.умолчанию.—.
org.apache.kafka.clients.consu- mer.Ran geAssignor
,.реализующая.описанную.ранее.диапазонную.стратегию..
Ее.можно.заменить.стратегией.
org.apache.kafka.clients.consumer.RoundRo- binAssignor
..В.качестве.более.продвинутого.решения.можете.реализовать.собствен- ную.стратегию.распределения,.при.этом.параметр.
partition.assignment.strategy должен.указывать.на.имя.вашего.класса.
client.id
Значение.этого.параметра.может.быть.любой.строкой..В.дальнейшем.его.будут.
использовать.брокеры.для.идентификации.отправленных.клиентом.сообщений..
Используется.при.журналировании.и.для.показателей,.а.также.при.задании.
квот.
max.poll.records
Этот.параметр.задает.максимальное.число.записей,.возвращаемое.при.одном.вы- зове.метода.
poll()
..Он.удобен.для.управления.объемом.данных,.обрабатываемым.
приложением.в.цикле.опроса.
receive.buffer.bytes и send.buffer.bytes
Это.размеры.TCP-буферов.отправки.и.получения,.используемых.сокетами.при.
записи.и.чтении.данных..Если.значение.этих.параметров.равно.
–1
,.будут.использо- ваться.значения.по.умолчанию.операционной.системы..Рекомендуется.повышать.
их.в.случае,.когда.производители.или.потребители.взаимодействуют.с.брокерами.
из.другого.ЦОД,.поскольку.подобные.сетевые.подключения.обычно.характеризу- ются.более.длительной.задержкой.и.низкой.пропускной.способностью.сети.

Фиксация и смещения
1   ...   8   9   10   11   12   13   14   15   ...   39


написать администратору сайта