Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
Как происходит распределение разделов по брокерам Когда потребитель хочет присоединиться к группе, он отправляет координатору группы запрос JoinGroup. Первый присоединившийся к группе потребитель стано- вится ведущим потребителем группы. Ведущий получает от координатора группы список всех потребителей группы, которые недавно отправляли контрольные сигналы, а значит, функционируют нормально, и отвечает за назначение потре- бителям подмножеств разделов. Для определения того, за какие разделы какой потребитель должен отвечать, используется реализация класса PartitionAssignor. В Kafka есть две стратегии назначения разделов, которые мы подробнее обсудим в посвященном настройке разделе. После распределения разделов ведущий по- требитель группы отправляет список назначений координатору группы, который пересылает эту информацию потребителям. Каждый потребитель знает только о назначенных ему разделах. Единственный клиентский процесс, обладающий полным списком потребителей группы и назначенных им разделов, — ведущий группы. Эта процедура повторяется при каждой перебалансировке. 94 Глава 4 • Потребители Kafka: чтение данных из Kafka Создание потребителя Kafka Первый.шаг.к.получению.записей.—.создание.экземпляра.класса. KafkaConsumer Создание.экземпляра.класса. KafkaConsumer .очень.похоже.на.создание.экземпляра. KafkaProducer .—.необходимо.просто.создать.экземпляр.Java-класса. Properties ,. содержащий.свойства,.которые.вы.хотели.бы.передать.потребителю..Далее.в.этой. главе.мы.подробнее.обсудим.все.свойства..Для.начала.нам.понадобятся.лишь.три. обязательных:. bootstrap.servers ,. key.deserializer .и. value.deserializer Свойство. bootstrap.servers .представляет.собой.строку.подключения.к.кластеру. Kafka..Оно.используется.так.же,.как.и.в. KafkaProducer .(за.подробностями.можете. обратиться.к.главе.3)..Два.других.свойства,. key.deserializer .и. value.deserializer ,. схожи.с.сериализаторами.для.производителей,.но.вместо.классов,.преобразующих. Java-объекты.в.байтовые.массивы,.необходимо.задать.классы,.преобразующие. байтовые.массивы.в.Java-объекты. Есть.и.четвертое.свойство,. group.id ,.которое,.строго.говоря,.не.является.обяза- тельным,.но.мы.пока.сделаем.вид,.что.является..Это.свойство.задает.группу.по- требителей,.к.которой.относится.экземпляр. KafkaConsumer ..Хотя.можно.создавать. и.потребителей,.не.принадлежащих.ни.к.одной.группе,.в.большей.части.данной. главы.будем.предполагать,.что.все.потребители.состоят.в.группах. Следующий.фрагмент.кода.демонстрирует.создание.объекта. KafkaConsumer : Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer Бо'льшая.часть.этого.кода.вам.уже.знакома,.если.вы.читали.главу.3,.посвященную. созданию.производителей..Мы.считаем,.что.как.ключ,.так.и.значение.читаемых. нами.записей.представляют.собой.объекты. String ..Единственное.новое.свойство. тут. group.id .—.название.группы,.к.которой.принадлежит.потребитель. Подписка на темы Следующий.шаг.после.создания.потребителя.—.подписка.его.на.одну.тему.или. несколько..Метод. subcribe() .всего.лишь.требует.передачи.в.качестве.параметра. списка.тем,.так.что.использовать.его.довольно.просто: consumer.subscribe(Collections.singletonList("customerCountries")); Цикл опроса 95 .Просто.создаем.список,.содержащий.один.элемент,.—.название.темы custo- merCountries Можно.также.вызвать.метод. subcribe() .с.регулярным.выражением.в.качестве. параметра..Это.выражение.может.соответствовать.нескольким.названиям.тем,. так.что.при.создании.новой.темы.с.подходящим.под.это.регулярное.выражение. названием.практически.тотчас.же.будет.выполнена.перебалансировка,.а.потре- бители.начнут.получать.данные.из.новой.темы..Это.удобно.для.приложений,. которым.требуется.получать.данные.из.нескольких.тем.и.обрабатывать.данные,. содержащиеся.в.них..Подписка.на.несколько.тем.с.помощью.регулярного.выра- жения.чаще.всего.используется.в.приложениях,.реплицирующих.данные.между. Kafka.и.другой.системой. Для.подписки.на.все.темы. test .можно.выполнить.следующий.вызов: consumer.subscribe(Pattern.compile("test.*")); Цикл опроса В.самой.основе.API.потребителей.лежит.простой.цикл.опроса.сервера..После. подписки.потребителя.на.тему.цикл.опроса.осуществляет.координацию.и.пере- балансировку.разделов,.отправляет.контрольные.сигналы.и.извлекает.данные.. Разработчику.предоставляется.чистый.API,.просто.возвращающий.доступные. данные.из.соответствующих.разделов..Основной.код.потребителя.выглядит.сле- дующим.образом: try { while (true) { ConsumerRecords { log.debug("topic = %s, partition = %d, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsValue(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1; } custCountryMap.put(record.value(), updatedCount) JSONObject json = new JSONObject(custCountryMap); System.out.println(json.toString(4)); } } } finally { consumer.close(); } 96 Глава 4 • Потребители Kafka: чтение данных из Kafka .Разумеется,.это.бесконечный.цикл..Потребители.обычно.представляют.собой. работающие.в.течение.длительного.времени.приложения,.непрерывно.опра- шивающие.Kafka.на.предмет.дополнительных.данных..Далее.в.этой.главе.мы. покажем,.как.можно.аккуратно.выйти.из.цикла.и.закрыть.потребитель. .Это.важнейшая.строка.кода.в.данной.главе..Как.акулы.должны.непрерывно. двигаться,.чтобы.не.погибнуть,.потребители.должны.опрашивать.Kafka,.ина- че.их.сочтут.неработающими,.а.разделы,.откуда.они.получают.данные,.будут. переданы.другим.потребителям.группы..Передаваемый.нами.в.метод. poll() параметр.представляет.собой.длительность.ожидания.и.определяет,.сколько. времени.будет.длиться.блокировка.в.случае.недоступности.данных.в.буфере. потребителя..Если.этот.параметр.равен. 0 ,.возврат.из.метода. poll() .произойдет. немедленно,.в.противном.случае.он.будет.ожидать.поступления.данных.от. брокера.указанное.число.миллисекунд..Значение.обычно.определяется.потреб- ностью.приложения.в.скорости.откликов,.то.есть.тем,.насколько.быстро.нужно. вернуть.управление.выполняющему.опрос.потоку. .Метод. poll() .возвращает.список.записей,.каждая.из.которых.содержит.тему. и.раздел,.из.которого.она.поступила,.смещение.записи.в.разделы.и,.конечно,. ключ.и.значение.записи..Обычно.по.списку.проходят.в.цикле,.и.записи.обраба- тываются.по.отдельности. .Обработка.обычно.заканчивается.записью.результата.в.хранилище.данных.или. обновлением.сохраненной.записи..Цель.состоит.в.ведении.текущего.списка. покупателей.из.каждого.округа,.так.что.мы.обновляем.хеш-таблицу.и.выводим. результат.в.виде.JSON..В.более.реалистичном.примере.результаты.обновлений. сохранялись.бы.в.хранилище.данных. .Всегда.закрывайте.(выполняйте.операцию. close() ).потребитель.перед.за- вершением.его.выполнения..Эта.операция.приводит.к.закрытию.сетевых.под- ключений.и.сокетов..Она.также.инициирует.немедленную.перебалансировку,. не.дожидаясь.обнаружения.координатором.группы.того,.что.потребитель.пере- стал.отправлять.контрольные.сигналы.и,.вероятно,.более.не.функционирует.. Это.заняло.бы.намного.больше.времени.и,.следовательно,.увеличило.период,. в.течение.которого.потребители.не.могут.получать.сообщения.из.некоторого. подмножества.разделов. Цикл. poll .не.только.получает.данные..При.первом.вызове.метода. poll() .для. нового.потребителя.он.отвечает.за.поиск.координатора.группы,.присоединение. потребителя.к.группе.и.назначение.ему.разделов..Перебалансировка.в.случае.ее. запуска.также.выполняется.в.цикле.опроса..И.конечно,.в.цикле.опроса.отправля- ются.контрольные.сигналы,.подтверждающие.функционирование.потребителей.. Поэтому.необходимо.гарантировать.быстроту.и.эффективность.всего.кода,.вы- полняемого.между.итерациями. Настройка потребителей 97 Потокобезопасность Несколько потребителей, относящихся к одной группе, не могут работать в одном потоке, и несколько потоков не могут безопасно использовать один потребитель. Железное правило: один потребитель на один поток. Для работы нескольких по- требителей в одной группе в одном приложении необходимо выполнять каждый из них в отдельном потоке. Не помешает обернуть логику потребителя в отдельный объект и воспользоваться объектом типа ExecutorService языка Java для запуска нескольких потоков, каждый — со своим потребителем. В блоге Confluent вы можете найти руководство (bit.ly/2tfVu6O) по выполнению этих действий. Настройка потребителей До.сих.пор.мы.сосредотачивались.на.изучении.API.потребителей,.но.рассмотрели. лишь.несколько.параметров.настройки.—.обязательные.параметры. bootstrap.ser- vers ,. group.id ,. key.deserializer .и. value.deserializer ..Все.настройки.потребителей. описаны.в.документации.Apache.Kafka.( http://kafka.apache.org/documentation.html#new- consumerconfigs )..Значения.по.умолчанию.большинства.параметров.вполне.разумны. и.не.требуют.изменения,.но.некоторые.могут.серьезно.повлиять.на.производитель- ность.и.доступность.потребителей..Рассмотрим.наиболее.важные.из.них. fetch.min.bytes Это.свойство.позволяет.потребителю.задавать.минимальный.объем.данных,.полу- чаемых.от.брокера.при.извлечении.записей..Если.брокеру.поступает.запрос.на.за- писи.от.потребителя,.но.новые.записи.оказываются.на.несколько.байт.меньше,.чем. значение. fetch.min.bytes ,.брокер.будет.ждать.до.тех.пор,.пока.не.появятся.новые. сообщения,.прежде.чем.отправлять.записи.потребителю..Это.снижает.нагрузку.как. на.потребитель,.так.и.на.брокер,.ведь.им.приходится.обрабатывать.меньше.переме- щаемых.туда.и.обратно.сообщений.при.небольшом.объеме.новых.действий.в.темах. или.в.часы.пониженной.активности..При.слишком.активном.использовании.CPU. при.небольшом.количестве.доступных.данных.или.для.снижения.нагрузки.на.бро- керы.при.большом.числе.потребителей.лучше.повысить.значение.этого.параметра. по.сравнению.с.принятым.по.умолчанию. fetch.max.wait.ms Задавая.параметр. fetch.min.bytes ,.вы.сообщаете.Kafka.о.необходимости.подождать. до.того.момента,.когда.у.него.будет.достаточно.данных.для.отправки,.прежде.чем. отвечать.потребителю..Параметр. fetch.max.wait.ms .позволяет.управлять.тем,.сколь- ко.именно.ждать..По.умолчанию.Kafka.ждет.500.мс..Это.приводит.к.дополнительной. задержке.до.500.мс.при.отсутствии.достаточного.объема.поступающих.в.тему.Kafka. 98 Глава 4 • Потребители Kafka: чтение данных из Kafka данных..Если.нужно.ограничить.потенциальную.задержку.(обычно.из-за.соглаше- ний.об.уровне.предоставления.услуг,.определяющих.максимальную.задержку. приложения),.можно.задать.меньшее.значение.параметра. fetch.max.wait.ms Если.установить.для. fetch.max.wait.ms .100.мс,.а.для fetch.min.bytes .—.1.Мбайт,. Kafka.отправит.данные.в.ответ.на.запрос.потребителя,.или.когда.объем.возвраща- емых.данных.достигнет.1.Мбайт,.или.по.истечении.100.мс.в.зависимости.от.того,. что.произойдет.первым. max.partition.fetch.bytes Это.свойство.определяет.максимальное.число.байт,.возвращаемых.сервером.из. расчета.на.один.раздел..Значение.по.умолчанию.равно.1.Мбайт..Это.значит,.что. при.возврате.методом. KafkaConsumer.poll() .объекта. ConsumerRecords .объект.записи. будет.занимать.не.более. max.partition.fetch.bytes .на.каждый.назначенный.потре- бителю.раздел..Так.что.если.в.теме.20.разделов,.а.потребителей.—.5,.каждому.из.них. понадобится.4.Мбайт.доступной.для.объекта. ConsumerRecords .памяти..На.практике. обычно.приходится.выделять.больше.памяти,.поскольку.в.случае.отказа.потре- бителя.каждому.из.оставшихся.придется.взять.на.себя.работу.с.бо'льшим.числом. разделов..Значение. max.partition.fetch.bytes .должно.быть.больше.максимально. приемлемого.для.брокера.размера.сообщения.(который.задается.параметром. max.message.size .конфигурации.брокера),.иначе.брокер.может.выдать.сообщение,. которое.потребитель.не.сможет.прочитать,.в.результате.чего.зависнет..Еще.один. важный.нюанс.—.количество.времени,.которое.нужно.потребителю.для.обработки. данных..Как.вы.помните,.потребитель.должен.вызывать.метод. poll() .достаточно. часто,.чтобы.не.истекло.время.ожидания.и.не.произошла.перебалансировка..Если. возвращаемый.при.отдельном.вызове.метода. poll() .объем.данных.очень.велик,.по- требителю.может.потребоваться.слишком.много.времени.на.обработку,.из-за.чего. он.не.успеет.начать.следующую.итерацию.цикла.опроса.вовремя.и.время.ожидания. истечет..В.этом.случае.можно.или.уменьшить.значение. max.partition.fetch.bytes ,. или.увеличить.время.ожидания.сеанса. session.timeout.ms По.умолчанию.потребитель.может.находиться.вне.связи.с.брокерами.и.продолжать. считаться.работающим.не.более.3.с..Если.потребитель.не.отправляет.контрольный. сигнал.координатору.группы.в.течение.промежутка.времени,.большего,.чем.опре- делено.параметром. session.timeout.ms ,.он.считается.отказавшим.и.координатор. группы.инициирует.перебалансировку.группы.потребителей.с.назначением.раз- делов.отказавшего.потребителя.другим.потребителям.группы..Этот.параметр. тесно.связан.с.параметром. heartbeat.interval.ms ,.который.задает.частоту.отправки. методом. poll() .объекта. KafkaConsumer .контрольных.сигналов.координатору.груп- пы,.в.то.время.как.параметр. session.timeout.ms .задает.время,.в.течение.которого. потребитель.может.обходиться.без.отправки.контрольного.сигнала..Следовательно,. Настройка потребителей 99 эти.два.параметра.обычно.меняют.одновременно.—.значение. heartbeat.interval.ms должно.быть.меньше.значения. session.timeout.ms .(обычно.составляет.треть.от. него)..Так,.если.значение. heartbeat.interval.ms .составляет.3.с,.то. session.time- out.ms .следует.задать.равным.1.с..Значение. session.timeout.ms .меньшее,.чем.за- дано.по.умолчанию,.позволяет.группам.потребителей.быстрее.обнаруживать.отказы. потребителей.и.восстанавливаться.после.них..В.то.же.время.оно.может.стать.при- чиной.нежелательной.перебалансировки.из-за.более.длительной.итерации.цикла. опроса.или.сборки.мусора..Задание.более.высокого.значения. session.timeout.ms снизит.вероятность.случайной.перебалансировки,.но.и.для.обнаружения.настоя- щего.сбоя.в.этом.случае.потребуется.больше.времени. auto.offset.reset Этот.параметр.определяет.поведение.потребителя.при.начале.чтения.раздела,.для. которого.у.него.зафиксированное.смещение.отсутствует.или.стало.некорректным. (например,.вследствие.слишком.продолжительного.бездействия.потребителя,. приведшего.к.удалению.записи.с.этим.смещением.с.брокера)..Значение.по.умол- чанию.—.latest.(«самое.позднее»)..Это.значит,.что.в.отсутствие.корректного.сме- щения.потребитель.начинает.читать.самые.свежие.записи.(сделанные.после.на- чала.его.работы)..Альтернативное.значение.—.earliest.(«самое.раннее»),.при. котором.в.отсутствие.корректного.смещения.потребитель.читает.все.данные.из. раздела.с.начала. enable.auto.commit Далее.в.этой.главе.мы.обсудим.различные.варианты.фиксации.смещений..Данный. параметр.определяет,.будет.ли.потребитель.фиксировать.смещения.автоматически,. и.по.умолчанию.равен. true ..Если.вы.предпочитаете.контролировать,.когда.фикси- руются.смещения,.установите.для.него.значение. false ..Это.нужно.для.того,.чтобы. уменьшить.степень.дублирования.и.избежать.пропущенных.данных..При.значении. true .этого.параметра.имеет.смысл.задать.также.частоту.фиксации.смещений.с.по- мощью.параметра. auto.commit.interval.ms partition.assignment.strategy Мы.уже.знаем,.что.разделы.распределяются.по.потребителям.в.группе..Класс. PartitionAssignor .определяет.(при.заданных.потребителях.и.темах,.на.которые.они. подписаны),.какие.разделы.к.какому.потребителю.будут.относиться..По.умолчанию. в.Kafka.есть.две.стратегии.распределения. Диапазонная (Range). Каждому.потребителю.присваиваются.последовательные. подмножества.разделов.из.тем,.на.которые.он.подписан..Так.что.если.потребите- ли.C1.и.C2.подписаны.на.темы.T1.и.T2,.оба.по.три.раздела,.то.потребителю.C1. 100 Глава 4 • Потребители Kafka: чтение данных из Kafka будут.назначены.разделы.0.и.1.из.тем.T1.и.T2,.а.C2.—.раздел.2.из.тем.T1.и.T2.. Поскольку.в.каждой.из.тем.нечетное.количество.разделов,.а.распределение. разделов.по.потребителям.выполняется.для.каждой.темы.отдельно,.у.первого. потребителя.окажется.больше.разделов,.чем.у.второго..Подобное.происходит. всегда,.когда.используется.диапазонная.стратегия.распределения,.а.количе- ство.потребителей.не.делится.нацело.на.количество.разделов.в.каждой.теме. Циклическая (RoundRobin). Все.разделы.от.всех.подписанных.тем.распределяют- ся.по.потребителям.последовательно,.один.за.другим..Если.бы.описанные.ранее. потребители.C1.и.C2.использовали.циклическое.распределение,.C1.были.бы. назначены.разделы.0.и.2.из.темы.T1.и.раздел.1.из.темы.T2,.а.C2.были.бы.на- значены.раздел.1.из.темы.T1.и.разделы.0.и.2.из.темы.T2..Когда.все.потребители. подписаны.на.одни.и.те.же.темы.(очень.распространенный.сценарий),.цикличе- ское.распределение.дает.одинаковое.количество.разделов.у.всех.потребителей. (или,.в.крайнем.случае,.различие.в.1.раздел). Параметр. partition.assignment.strategy .позволяет.выбирать.стратегию.распре- деления.разделов..Стратегия.по.умолчанию.—. org.apache.kafka.clients.consu- mer.Ran geAssignor ,.реализующая.описанную.ранее.диапазонную.стратегию.. Ее.можно.заменить.стратегией. org.apache.kafka.clients.consumer.RoundRo- binAssignor ..В.качестве.более.продвинутого.решения.можете.реализовать.собствен- ную.стратегию.распределения,.при.этом.параметр. partition.assignment.strategy должен.указывать.на.имя.вашего.класса. client.id Значение.этого.параметра.может.быть.любой.строкой..В.дальнейшем.его.будут. использовать.брокеры.для.идентификации.отправленных.клиентом.сообщений.. Используется.при.журналировании.и.для.показателей,.а.также.при.задании. квот. max.poll.records Этот.параметр.задает.максимальное.число.записей,.возвращаемое.при.одном.вы- зове.метода. poll() ..Он.удобен.для.управления.объемом.данных,.обрабатываемым. приложением.в.цикле.опроса. receive.buffer.bytes и send.buffer.bytes Это.размеры.TCP-буферов.отправки.и.получения,.используемых.сокетами.при. записи.и.чтении.данных..Если.значение.этих.параметров.равно. –1 ,.будут.использо- ваться.значения.по.умолчанию.операционной.системы..Рекомендуется.повышать. их.в.случае,.когда.производители.или.потребители.взаимодействуют.с.брокерами. из.другого.ЦОД,.поскольку.подобные.сетевые.подключения.обычно.характеризу- ются.более.длительной.задержкой.и.низкой.пропускной.способностью.сети. |