Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
84 Глава 3 • Производители Kafka: запись сообщений в Kafka new ProducerRecord producer.send(data); } } .Мы.по-прежнему.используем.тот.же.класс. KafkaAvroSerializer .И.передаем.URI.того.же.реестра.схем. .Но.теперь.нам.приходится.указывать.схему.Avro,.поскольку.ее.уже.не.предо- ставляет.сгенерированный.Avro.объект. .Тип.объекта.теперь. GenericRecord ..Мы.инициализируем.его.своей.схемой. и.предназначенными.для.записи.данными. .Значение. ProducerRecord .представляет.собой.просто.объект. GenericRecord ,. содержащий.схему.и.данные..Сериализатор.будет.знать,.как.получить.из.этой. записи.схему.данных,.сохранить.ее.в.реестре.схем.и.сериализовать.данные.из. объекта. Разделы В.предыдущих.примерах.создаваемые.нами.объекты. ProducerRecord .включали.на- звание.темы,.ключ.и.значение..Сообщения.Kafka.представляют.собой.пары.«ключ/ значение»,.и.хотя.можно.создавать.объекты. ProducerRecord .только.с.темой.и.зна- чением.или.с.неопределенным.значением.по.умолчанию.для.ключа,.большинство. приложений.отправляют.записи.с.ключами..Ключи.служат.для.двух.целей:.они. представляют.собой.дополнительную.информацию,.сохраняемую.вместе.с.сообще- нием,.и.на.их.основе.определяется,.в.какой.раздел.темы.записывать.сообщение.. Все.сообщения.с.одинаковым.ключом.попадут.в.один.раздел..Это.значит,.что,.если. каждый.процесс.читает.лишь.часть.разделов.темы.(подробнее.об.этом.в.главе.4),. все.записи.для.конкретного.ключа.будет.читать.один.и.тот.же.процесс..Для.созда- ния.записи.типа.«ключ/значение».нужно.просто.создать.объект. ProducerRecord ,. вот.так: ProducerRecord new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA"); При.создании.сообщений.с.неопределенным.значением.ключа.можно.просто.его. не.указывать: ProducerRecord new ProducerRecord<>("CustomerCountry", "USA"); .В.этом.примере.ключ.будет.равен. null ,.что.может.указывать,.например,.на.то,. что.имя.покупателя.в.форме.опущено. Если.ключ.равен. null .и.используется.метод.секционирования.по.умолчанию,.то. запись.будет.отправлена.в.один.из.доступных.разделов.темы.случайным.образом.. Разделы 85 Для.балансировки.сообщений.по.разделам.при.этом.будет.использоваться.цикли- ческий.алгоритм. Если.же.ключ.присутствует.и.используется.метод.секционирования.по.умолчанию,. Kafka.вычислит.хеш-значение.ключа.с.помощью.собственного.алгоритма.хеширо- вания,.так.что.хеш-значения.не.изменятся.при.обновлении.Java,.и.отправит.сооб- щение.в.конкретный.раздел.на.основе.полученного.результата..А.поскольку.важно,. чтобы.ключи.всегда.соответствовали.одним.и.тем.же.разделам,.для.вычисления. соответствия.используются.все.разделы.темы,.не.только.доступные..Это.значит,. что,.если.конкретный.раздел.недоступен.на.момент.записи.в.него.данных,.будет. возвращена.ошибка..Это.происходит.довольно.редко,.как.вы.увидите.в.главе.6,. когда.мы.будем.обсуждать.репликацию.и.доступность.Kafka. Соответствие.ключей.разделам.остается.согласованным.лишь.до.тех.пор,.пока. число.разделов.в.теме.не.меняется..Так.что.пока.это.число.постоянно,.вы.можете. быть.уверены,.например,.что.относящиеся.к.пользователю.045189.записи.всегда. будут.записываться.в.раздел.34..Эта.особенность.открывает.дорогу.для.всех.видов. оптимизации.при.чтении.данных.из.разделов..Но.как.только.вы.добавите.в.тему. новые.разделы,.такое.поведение.больше.нельзя.будет.гарантировать:.старые.записи. останутся.в.разделе.34,.а.новые.могут.оказаться.записанными.в.другой.раздел..Если. ключи.секционирования.важны,.простейшим.решением.будет.создавать.темы.с.до- статочным.количеством.разделов.(в.главе.2.мы.приводили.соображения.по.поводу. выбора.оптимального.количества.разделов).и.никогда.не.добавлять.новые. Реализация пользовательской стратегии секционирования..До.сих.пор.мы.обсуж- дали.особенности.метода.секционирования.по.умолчанию,.используемого.чаще. всего..Однако.Kafka.не.ограничивает.вас.лишь.хеш-разделами,.и.иногда.появляются. веские.причины.секционировать.данные.иначе..Например,.представьте,.что.вы.—. B2B-поставщик,.а.ваш.крупнейший.покупатель.—.компания.Banana,.производящая. карманные.устройства..Допустим,.что.более.10.%.ваших.ежедневных.транзакций. приходится.на.этого.покупателя..При.использовании.хеш-секционирования,.при- нятого.по.умолчанию,.записи.Banana.будут.распределяться.в.тот.же.раздел,.что. и.записи.других.заказчиков,.в.результате.чего.один.из.разделов.окажется.намного. больше.других..Это.приведет.к.исчерпанию.места.на.серверах,.замедлению.об- работки.и.т..д..На.самом.деле.лучше.выделить.для.покупателя.Banana.отдельный. раздел,.после.чего.применить.хеш-секционирование.для.распределения.остальных. заказчиков.по.разделам. Вот.пример.пользовательского.объекта. Partitioner : import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.utils.Utils; public class BananaPartitioner implements Partitioner { 86 Глава 3 • Производители Kafka: запись сообщений в Kafka public void configure(Map Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if ((keyBytes == null) || (!(key instanceOf String))) throw new InvalidRecordException("We expect all messages to have customer name as key") if (((String) key).equals("Banana")) return numPartitions-1; // Banana всегда попадает в последний раздел // Другие записи распределяются по разделам путем хеширования return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) } public void close() {} } .Интерфейс.объекта.секционирования.включает.методы. configure ,. partition и. close ..Здесь.мы.реализовали.только.метод. partition ,.хотя.следовало.бы.пере- давать.имя.нашего.особого.заказчика.через.метод. configure ,.а.не.зашивать.его. в.код.метода. partition .Мы.ожидаем.только.строковые.ключи,.так.что.в.противном.случае.генерируем. исключение. Старые API производителей В.этой.главе.мы.обсудили.Java.клиент-производитель,.включенный.в.пакет. org.apa- che.kafka.clients ..Однако.в.Apache.Kafka.все.еще.содержатся 1 .два.более.старых. клиента,.написанных.на.языке.Scala.и.включенных.в.пакет. kafka.producer .(часть. базового.модуля.Kafka)..Эти.производители.называются. SyncProducer .(в.зависи- мости.от.значения.параметра. acks .он.может.ждать.подтверждения.сервером.полу- чения.каждого.сообщения.или.пакета.сообщений,.прежде.чем.отправлять.новые. сообщения).и. AsyncProducer .(формирует.пакеты.сообщений.в.фоновом.режиме,. отправляет.их.в.отдельном.потоке.и.не.предоставляет.клиенту.никакой.информа- ции.об.успехе.или.неудаче.отправки). Поскольку.текущий.производитель.поддерживает.оба.типа.поведения,.намного. более.надежен.и.предоставляет.разработчику.гораздо.бо'льшие.возможности.кон- 1. Начиная.с.версии.0.11.0,.они.считаются.устаревшими.и.оставлены.только.из.соображений. совместимости..—.Примеч. пер. Резюме 87 троля,.мы.не.будем.обсуждать.эти.старые.API..Если.вас.интересует.возможность. их.использования,.подумайте.еще.раз,.после.чего.обратитесь.к.документации.Kafka. за.дополнительной.информацией. Резюме Мы.начали.эту.главу.с.простого.примера.производителя.—.всего.десять.строк.кода,. отправляющих.события.в.Kafka..Расширили.его.за.счет.добавления.обработки. ошибок.и.опытов.с.синхронной.и.асинхронной.отправкой..Затем.изучили.наиболее. важные.конфигурационные.параметры.производителя.и.выяснили,.как.они.влияют. на.его.поведение..Мы.обсудили.сериализаторы,.которые.служат.для.управления. форматом.записываемых.в.Kafka.событий..Мы.подробно.рассмотрели.Avro.—.один. из.многих.способов.сериализации.событий,.часто.используемый.вместе.с.Kafka.. В.завершение.главы.обсудили.секционирование.в.Kafka.и.привели.пример.про- двинутой.методики.пользовательского.секционирования. Теперь,.разобравшись.с.записью.событий.в.Kafka,.в.главе.4.рассмотрим.все,.что. касается.чтения.событий.из.Kafka. 4 Потребители Kafka: чтение данных из Kafka Приложения,.читающие.данные.из.Kafka,.используют.объект. KafkaConsumer .для. подписки.на.темы.Kafka.и.получения.из.них.сообщений..Чтение.данных.из.Kafka. несколько.отличается.от.чтения.данных.из.других.систем.обмена.сообщениями:. некоторые.принципы.его.работы.и.идеи.весьма.оригинальны..Чтобы.научиться. использовать.API.потребителей,.необходимо.сначала.разобраться.с.этими.прин- ципами..Мы.начнем.с.пояснений.по.поводу.важнейших.из.них,.а.затем.рассмотрим. примеры,.демонстрирующие.различные.способы.использования.API.потребителей. для.реализации.приложений.с.разнообразными.требованиями. Принципы работы потребителей Kafka Чтобы.научиться.читать.данные.из.Kafka,.нужно.сначала.разобраться.с.концепция- ми.потребителей.и.групп.потребителей..Мы.рассмотрим.эти.понятия.в.следующих. разделах. Потребители и группы потребителей Представьте.себе.приложение,.читающее.данные.из.темы.Kafka,.проверяющее.их. и.записывающее.результаты.в.другое.хранилище.данных..Это.приложение.должно. будет.создать.объект-потребитель,.подписаться.на.соответствующую.тему.и.при- ступить.к.получению.сообщений,.их.проверке.и.записи.результатов..До.поры.до. времени.такая.схема.будет.работать,.но.что.если.производители.записывают.со- общения.в.тему.быстрее,.чем.приложение.может.их.проверить?.Если.чтение.и.об- работка.данных.ограничиваются.одним.потребителем,.приложение,.не.способное. справиться.с.темпом.поступления.сообщений,.будет.все.больше.и.больше.отставать.. Понятно,.что.в.такой.ситуации.нужно.масштабировать.получение.сообщений.из. тем..Необходимо,.чтобы.несколько.потребителей.могли.делить.между.собой.дан- ные.и.читать.из.одной.темы.подобно.тому,.как.несколько.производителей.могут. писать.в.одну.тему. Принципы работы потребителей Kafka 89 Потребители.Kafka.обычно.состоят.в.группе потребителей..Если.несколько.по- требителей.подписаны.на.одну.тему.и.относятся.к.одной.группе,.все.потребители. группы.будет.получать.сообщения.из.различных.подмножеств.разделов.группы. Рассмотрим.тему.T1.с.четырьмя.разделами..Предположим,.что.мы.создали.новый. потребитель.C1.—.единственный.потребитель.в.группе.G1.и.подписали.его.на. тему.T1..C1.будет.получать.все.сообщения.из.всех.четырех.разделов.темы.(рис..4.1). Рис. 4.1. Одна группа потребителей с четырьмя разделами Если.мы.добавим.в.группу.G1.еще.один.потребитель,.C2,.то.каждый.потребитель. будет.получать.сообщения.только.из.двух.разделов..Например,.сообщения.из.раз- делов.0.и.2.попадут.к.C1,.а.из.разделов.1.и.3.—.к.C2.(рис..4.2). Рис. 4.2. Четыре раздела разбиты по двум потребителям Если.бы.в.группе.G1.было.четыре.потребителя,.то.каждый.из.них.читал.бы.сообще- ния.из.своего.раздела.(рис..4.3). 90 Глава 4 • Потребители Kafka: чтение данных из Kafka Рис. 4.3. Четыре потребителя, по одному разделу каждый Если.же.в.одной.группе.с.одной.темой.будет.больше.потребителей,.чем.разделов,. часть.потребителей.будут.простаивать.и.вообще.не.получать.сообщений.(рис..4.4). Рис. 4.4. Потребителей больше, чем разделов, поэтому часть из них простаивает Основной.способ.масштабирования.получения.данных.из.Kafka.—.добавление. новых.потребителей.в.группу..Потребители.Kafka.часто.выполняют.операции.с.вы- соким.значением.задержки,.например,.запись.данных.в.базу.или.занимающие.много. времени.вычисления.с.ними..В.этих.случаях.отдельный.потребитель.неизбежно. будет.отставать.от.темпов.поступления.данных.в.тему,.и.разделение.нагрузки.пу- тем.добавления.новых.потребителей,.каждый.из.которых.отвечает.лишь.за.часть. разделов.и.сообщений,.—.основной.метод.масштабирования..Поэтому.имеет.смысл. Принципы работы потребителей Kafka 91 создавать.темы.с.большим.количеством.разделов,.ведь.это.дает.возможность.до- бавлять.новых.потребителей.при.возрастании.нагрузки..Помните,.что.нет.смысла. добавлять.столько.потребителей,.сколько.нужно,.чтобы.их.стало.больше,.чем.раз- делов.в.теме,.—.часть.из.них.будет.просто.простаивать..В.главе.2.мы.приводили. соображения.по.поводу.выбора.количества.разделов.в.теме. Помимо.добавления.потребителей.для.масштабирования.отдельного.приложения. широко.распространено.чтение.несколькими.приложениями.данных.из.одной. темы..На.самом.деле.одной.из.главных.задач.создания.Kafka.было.обеспечение. возможности.использования.записанных.в.темы.Kafka.данных.во.множестве. сценариев.в.организации..В.подобном.случае.хотелось.бы,.чтобы.каждое.из.при- ложений.получило.все.данные,.а.не.только.их.подмножество..А.чтобы.приложе- ние.получило.все.данные.из.темы,.у.нее.должна.быть.своя.группа.потребителей.. В.отличие.от.многих.традиционных.систем.обмена.сообщениями,.Kafka.масшта- бируется.до.очень.больших.количеств.потребителей.и.их.групп.без.снижения. производительности. Если.мы.добавим.в.предыдущем.примере.новую.группу.G2.с.одним.потребителем,. он.прочитает.все.сообщения.из.темы.T1.вне.зависимости.от.группы.G1..В.G2.мо- жет.быть.свыше.одного.потребителя,.подобно.тому.как.было.в.G1,.но.G2.все.равно. получит.все.сообщения.вне.зависимости.от.других.групп.потребителей.(рис..4.5). Рис. 4.5. Вторая группа потребителей тоже читает все сообщения 92 Глава 4 • Потребители Kafka: чтение данных из Kafka Резюмируем:.для.каждого.приложения,.которому.нужны.все.сообщения.из.одной. темы.или.нескольких,.создается.новая.группа.потребителей..В.существующую. группу.их.добавляют.при.необходимости.масштабирования.чтения.и.обработки. сообщений.из.тем,.так.что.до.каждого.дополнительного.потребителя.в.группе.до- ходит.только.подмножество.сообщений. Группы потребителей и перебалансировка разделов Как.мы.видели.в.предыдущем.разделе,.потребители.в.группе.делят.между.собой. разделы.тем,.на.которые.подписаны..Добавленный.в.группу.потребитель.начинает. получать.сообщения.из.разделов,.за.которые.ранее.отвечал.другой.потребитель.. То.же.самое.происходит,.если.потребитель.останавливается.или.аварийно.заверша- ет.работу:.он.покидает.группу,.а.разделы,.за.которые.он.ранее.отвечал,.обрабатыва- ются.одним.из.оставшихся.потребителей..Переназначение.разделов.потребителям. происходит.также.при.изменении.тем,.которые.читает.группа.(например,.при.до- бавлении.администратором.новых.разделов). Передача.раздела.от.одного.потребителя.другому.называется.перебалансировкой. (rebalance)..Перебалансировка.важна,.потому.что.обеспечивает.группе.потребителей. масштабируемость.и.высокую.доступность,.позволяя.легко.и.безопасно.добав- лять.и.удалять.потребителей,.но.при.обычных.обстоятельствах.она.нежелательна.. Во.время.перебалансировки.потребители.не.могут.получать.сообщения,.так.что. она.фактически.представляет.собой.краткий.интервал.недоступности.всей.группы. потребителей..Кроме.того,.при.передаче.разделов.от.одного.потребителя.другому. потребитель.утрачивает.свое.текущее.состояние:.если.он.кэшировал.какие-либо. данные,.ему.придется.обновить.кэши,.что.замедлит.приложение.на.время.восстанов- ления.состояния..В.этой.главе.мы.обсудим,.как.сделать.так,.чтобы.перебалансировка. не.представляла.опасности,.и.как.избежать.нежелательной.перебалансировки. Потребители.поддерживают.членство.в.группе.и.принадлежность.разделов.за.счет. отправки.назначенному.координатором группы.брокеру.Kafka.(для.разных.групп. потребителей.это.могут.быть.разные.брокеры).периодических.контрольных сигна- лов .(heartbeats)..До.тех.пор,.пока.потребитель.регулярно.отправляет.контрольные. сигналы,.он.считается.активным,.нормально.работающим.и.обрабатывающим.со- общения.с.относящихся.к.нему.разделов..Контрольные.сигналы.отправляются.во. время.опросов.(то.есть.при.извлечении.потребителем.записей).и.при.фиксации. полученных.им.записей. Если.потребитель.на.длительное.время.прекращает.отправку.контрольных.сигна- лов,.время.его.сеанса.истекает.и.координатор.группы.признает.его.неработающим. и.инициирует.перебалансировку..В.случае.аварийного.сбоя.потребителя.и.прекра- щения.обработки.им.сообщений.координатору.группы.хватит.нескольких.секунд. без.контрольных.сигналов,.чтобы.признать.его.неработающим.и.инициировать. перебалансировку..На.протяжении.этого.времени.никакие.сообщения.из.относя- щихся.к.данному.потребителю.разделов.обрабатываться.не.будут..Если.же.потре- битель.завершает.работу.штатным.образом,.он.извещает.координатора.группы.об. Принципы работы потребителей Kafka 93 этом,.и.координатор.группы.инициирует.перебалансировку.немедленно,.сокращая. тем.самым.перерыв.в.обработке..Далее.в.этой.главе.мы.обсудим.параметры.конфи- гурации,.управляющие.частотой.отправки.контрольных.сигналов.и.длительностью. времени.ожидания.сеанса,.и.их.настройку.подходящим.для.вас.образом. ИЗМЕНЕНИЯ ЛОГИКИ РАБОТЫ КОНТРОЛЬНЫХ СИГНАЛОВ В ПОСЛЕДНИХ ВЕРСИЯХ KAFKA В.версии.0.10.1.Kafka.появился.отдельный.поток.для.контрольных.сигналов,.отправ- ляющий.их.и.между.опросами..Он.дает.возможность.сделать.частоту.контрольных. сигналов.(а.значит,.и.время,.которое.нужно.группе.потребителей.для.обнаружения. аварийного. сбоя. потребителя. и. отсутствия. от. него. контрольных. сигналов). неза- висимой.от.частоты.опросов.(определяемой.временем,.необходимым.для.обработки. возвращаемых. брокерами. данных).. В. новых. версиях. Kafka. можно. задать. макси- мальную.длительность.работы.потребителя.без.опросов,.по.истечении.которой.его. признают.отказавшим.и.будет.инициирована.перебалансировка..Этот.параметр.ис- пользуется. для. предотвращения. динамической. взаимоблокировки. (livelock),. при. которой.не.происходит.аварийного.сбоя.приложения,.но.и.задача.не.выполняется.. Он.не.зависим.от.параметра.session.timeout.ms,.соответствующего.промежутку.вре- мени,.необходимому.для.обнаружения.отказа.потребителя.и.прекращения.отправки. контрольных.сигналов. Оставшаяся.часть.данной.главы.посвящена.обсуждению.некоторых.проблем.более. старых.версий.Kafka.и.тому,.как.разработчик.может.с.ними.справиться..Мы.обсудим. также,. что. делать. с. приложениями,. которые. обрабатывают. записи. слишком. долго.. Если.вы.работаете.с.версией.Kafka.0.10.1.или.более.новой,.эта.информация.окажется. для. вас. не. слишком. актуальной.. Если. при. использовании. одной. из. новых. версий. приходится. иметь. дело. с. записями,. обработка. которых. занимает. больше. времени,. просто. подберите. значение. параметра. max.poll.interval.ms,. подходящее. для. более. длительных.промежутков.между.опросами. |