Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
101 Фиксация и смещения Метод. poll() .при.вызове.возвращает.записанные.в.Kafka.данные,.еще.не.прочитан- ные.потребителями.из.нашей.группы..Это.означает.возможность.отследить,.какие. записи.были.прочитаны.потребителями.данной.группы..Как.уже.обсуждалось,. одна.из.отличительных.черт.Kafka.—.отсутствие.отслеживания.подтверждений. от.потребителей,.подобно.тому.как.это.делают.многие.JMS.системы.организации. очередей..Вместо.этого.потребители.могут.использовать.Kafka.для.отслеживания. их.позиции.(смещения).в.каждом.из.разделов. Мы.будем.называть.действие.по.обновлению.текущей.позиции.потребителя.в.раз- делы.фиксацией.(commit). Каким.образом.потребители.фиксируют.смещение?.Путем.отправки.в.специальную. тему. __consumer_offsets .сообщения,.содержащего.смещение.для.каждого.из.разде- лов..Это.ни.на.что.не.влияет.до.тех.пор,.пока.все.потребители.работают.нормально.. Однако.в.случае.аварийного.сбоя.потребителя.или.присоединения.к.группе.нового. потребителя.это.инициирует перебалансировку..После.перебалансировки.каждому. из.потребителей.может.быть.назначен.набор.разделов,.отличный.от.обрабаты- ваемого.им.ранее..Чтобы.знать,.с.какого.места.продолжать.работу,.потребитель. должен.прочитать.последнее.зафиксированное.смещение.для.каждого.из.разделов. и.продолжить.оттуда. Если.зафиксированное.смещение.меньше.смещения.последнего.обработанного. клиентом.сообщения,.то.расположенные.между.ними.сообщения.будут.обработаны. дважды.(рис..4.6). Рис. 4.6. Повторно обрабатываемые события Если.зафиксированное.смещение.превышает.смещение.последнего.фактически. обработанного.клиентом.события,.расположенные.в.этом.промежутке.сообщения. будут.пропущены.группой.потребителей.(рис..4.7). 102 Глава 4 • Потребители Kafka: чтение данных из Kafka Рис. 4.7. Пропущенные события между смещениями Очевидно,.что.организация.смещений.существенно.влияет.на.клиентское.прило- жение..API. KafkaConsumer .предоставляет.множество.способов.фиксации.смещений. Автоматическая фиксация Простейший.способ.фиксации.смещений.—.делегировать.эту.задачу.потребителю.. При.значении. true .параметра. enable.auto.commit .потребитель.каждые.5.с.будет. автоматически.фиксировать.максимальное.смещение,.возвращенное.клиенту.ме- тодом. poll() ..Пятисекундный.интервал.—.значение.по.умолчанию,.которое.можно. изменить.заданием.параметра. auto.commit.interval.ms ..В.основе.автоматической. фиксации.лежит.цикл.опроса..При.каждом.опросе.потребитель.проверяет,.не.вре- мя.ли.выполнить.фиксацию,.и,.если.да,.фиксирует.возвращенные.при.последнем. опросе.смещения. Прежде.чем.воспользоваться.этой.удобной.возможностью,.следует.четко.пред- ставить.себе.последствия. Учтите,.что.по.умолчанию.автоматическая.фиксация.происходит.каждые.5.с.. Представьте,.что.после.последней.фиксации.прошло.3.с.и.запустилась.переба- лансировка..После.перебалансировки.все.потребители.начнут.получать.данные. с.последнего.зафиксированного.смещения..В.этом.случае.«возраст».смещения.со- ставляет.3.с,.так.что.все.поступившие.в.течение.этих.3.с.события.будут.обработаны. два.раза..Можно.настроить.интервал.фиксации.так,.чтобы.она.выполнялась.чаще,. и.уменьшить.окно,.в.пределах.которого.записи.дублируются..Однако.полностью. устранить.дублирование.невозможно. При.включенной.автофиксации.вызов.метода. poll() .всегда.будет.фиксировать. последнее.смещение,.возвращенное.предыдущим.вызовом..Этот.метод.не.знает,. Фиксация и смещения 103 какие.события.были.обработаны,.так.что.очень.важно.всегда.обрабатывать.все. возвращенные.методом. poll() .события.до.того,.как.вызывать. poll() .снова.(как. и. poll() ,.метод. close() .также.автоматически.фиксирует.смещения)..Обычно.это. не.проблема,.но.будьте.внимательны.при.обработке.исключений.или.досрочном. выходе.из.цикла.опроса. Автоматическая.фиксация.удобна,.но.она.не.позволяет.разработчику.управлять. так,.чтобы.избежать.дублирования.сообщений. Фиксация текущего смещения Большинство.разработчиков.стараются.жестко.контролировать.моменты.фикса- ции.смещений.—.как.для.исключения.вероятности.пропуска.сообщений,.так.и.для. уменьшения.количества.дублирования.сообщений.при.перебалансировке..В.API. потребителей.есть.возможность.фиксировать.текущее.смещение.в.нужный.раз- работчику.приложения.момент.вместо.фиксации.по.таймеру. При.задании.параметра. auto.commit.offset=false .смещения.будут.фиксировать- ся.только.тогда,.когда.приложение.потребует.этого.явным.образом..Простейший. и.наиболее.надежный.API.фиксации.—. commitSync() ..Он.фиксирует.последнее.воз- вращенное.методом. poll() .смещение.и.сразу.же.после.этого.завершает.выполнение. процедуры,.генерируя.исключение.в.случае.сбоя.фиксации. Важно.помнить,.что. commitSync() .фиксирует.последнее.возвращенное.методом. poll() .смещение,.так.что.не.забудьте.вызвать. commitSync() .после.завершения. обработки.всех.записей.в.наборе,.или.вы.рискуете.пропустить.сообщения,.как. описывалось.ранее..При.запуске.перебалансировки.все.сообщения.с.начала.самого. недавнего.пакета.и.до.момента.начала.перебалансировки.окажутся.обработанными. дважды. Далее.показано,.как.использовать. commitSync() .для.фиксации.смещений.после. завершения.обработки.последнего.пакета.сообщений: while (true) { ConsumerRecords for (ConsumerRecord { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } try { consumer.commitSync(); } catch (CommitFailedException e) { log.error("commit failed", e) } } 104 Глава 4 • Потребители Kafka: чтение данных из Kafka .Допустим,.что.обработка.записи.состоит.в.выводе.ее.содержимого..Вероятно,. реальное.приложение.будет.делать.с.записями.намного.больше.—.модифициро- вать,.расширять,.агрегировать.и.отображать.их.на.инструментальной.панели.или. оповещать.пользователей.о.важных.событиях..Решать,.когда.обработка.записи. завершена,.следует.в.зависимости.от.конкретного.сценария.использования. .После.завершения.обработки.всех.записей.текущего.пакета.вызываем. com- mitSync() .для.фиксации.последнего.смещения,.прежде.чем.выполнять.опрос. для.получения.дополнительных.сообщений. .Метод. commitSync() .повторяет.фиксацию.до.тех.пор,.пока.не.возникнет.непо- правимая.ошибка,.которую.можно.разве.что.записать.в.журнал. Асинхронная фиксация Один.из.недостатков.фиксации.вручную.—.то,.что.приложение.оказывается.за- блокировано,.пока.брокер.не.ответит.на.запрос.фиксации..Это.ограничивает.про- пускную.способность.приложения..Повысить.ее.можно.за.счет.снижения.частоты. фиксации,.но.тем.самым.мы.повысили.бы.число.потенциальных.дубликатов,.воз- никающих.при.перебалансировке. Другой.вариант.—.использование.API.асинхронной.фиксации..Вместо.того.чтобы. ждать.ответа.брокера.на.запрос.фиксации,.просто.отправляем.запрос.и.продолжаем. работу: while (true) { ConsumerRecords for (ConsumerRecord { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(); } .Фиксируем.последнее.смещение.и.продолжаем.работу. Недостаток.этого.подхода:.в.то.время.как. commitSync() .будет.повторять.попытку. фиксации.до.тех.пор,.пока.она.не.завершится.успешно.или.не.возникнет.ошиб- ка,.которую.нельзя.исправить.путем.повтора,. commitAsync() .повторять.попытку. не.станет..Причина.такого.поведения.состоит.в.том,.что.на.момент.получения. commitAsync() .ответа.от.сервера.уже.может.быть.успешно.выполнена.более.позд- няя.фиксация..Представьте.себе,.что.мы.отправили.запрос.на.фиксацию.смещения. 2000..Из-за.временных.проблем.со.связью.брокер.не.получил.этого.запроса.и,.сле- довательно,.не.ответил..Тем.временем.мы.обработали.другой.пакет.и.успешно.за- фиксировали.смещение.3000..Если.теперь. commitAsync() .попытается.выполнить. неудавшуюся.предыдущую.фиксацию.смещения,.она.может.зафиксировать.смеще- Фиксация и смещения 105 ние.2000.после.обработки.и.фиксации.смещения.3000..В.случае.перебалансировки. это.приведет.к.дополнительным.дубликатам. Мы.упомянули.эту.проблему.и.важность.правильного.порядка.фиксаций,.посколь- ку. commitAsync() .позволяет.также.передать.функцию.обратного.вызова,.применяе- мую.при.ответе.брокера..Обратные.вызовы.часто.используют.для.журналирования. ошибок.фиксаций.или.их.подсчета.в.виде.показателей,.но,.чтобы.воспользоваться. обратным.вызовом.для.повторения.попытки,.нужно.учитывать.проблему.с.по- рядком.фиксаций: while (true) { ConsumerRecords for (ConsumerRecord System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map if (e != null) log.error("Commit failed for offsets {}", offsets, e); } }); } .Отправляем.запрос.на.фиксацию.и.продолжаем.работу,.но.в.случае.сбоя.фик- сации.записываем.в.журнал.информацию.о.сбое.и.соответствующие.смещения. Повтор асинхронной фиксации Простой способ обеспечить правильный порядок при асинхронных повторах — использовать монотонно возрастающий порядковый номер. Увеличивайте поряд- ковый номер при каждой фиксации и вставьте его во время фиксации в обратный вызов commitAsync. Когда будете готовы отправить повторный запрос, проверьте, равен ли порядковый номер фиксации в обратном вызове переменной экземпляра. Если да, то более поздняя фиксация не выполнялась и можно спокойно пробовать отправить запрос еще раз. Если же значение переменной экземпляра больше, не нужно пробовать повторно отправлять запрос, потому что уже был сделан более поздний запрос на фиксацию. Сочетание асинхронной и синхронной фиксации При.обычных.обстоятельствах.случайные.сбои.при.фиксации.(без.повторных.за- просов).—.незначительная.помеха,.ведь.если.проблема.носит.временный.характер,. то.следующая.фиксация.будет.выполнена.успешно..Но.если.мы.знаем,.что.речь.идет. о.последней.фиксации.перед.закрытием.потребителя.или.перебалансировкой,.то. лучше.позаботиться,.чтобы.она.точно.оказалась.успешной. 106 Глава 4 • Потребители Kafka: чтение данных из Kafka Поэтому.часто.непосредственно.перед.остановом.комбинируют. commitAsync() с. commitSync() .вот.таким.образом.(мы.обсудим.фиксацию.перед.перебалансиров- кой.в.разделе.о.прослушивании.на.предмет.перебалансировки): try { while (true) { ConsumerRecords for (ConsumerRecord System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(); } } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync(); } finally { consumer.close(); } } .Пока.все.нормально,.мы.используем. commitAsync() ..Это.быстрее,.и.если.одна. фиксация.пройдет.неудачно,.то.следующая.сыграет.роль.повторной.попытки. .Но.при.закрытии.никакой.следующей.фиксации.не.будет..Поэтому.нужно.вы- звать.метод. commitSync() ,.который.станет.повторять.попытки.вплоть.до.успеш- ного.выполнения.или.невосстановимого.сбоя. Фиксация заданного смещения Фиксация.последнего.смещения.происходит.только.при.завершении.обработки. пакетов..Но.что.делать,.если.требуется.выполнять.ее.чаще?.Что.делать,.если.метод. poll() .вернул.огромный.пакет.и.необходимо.зафиксировать.смещения.в.ходе.его. обработки,.чтобы.не.пришлось.обрабатывать.все.эти.строки.снова.в.случае.пере- балансировки?.Просто.вызвать.метод. commitAsync() .или. commitSync() .нельзя,. ведь.они.зафиксируют.последнее.возвращенное.смещение,.которое.вы.еще.не.об- работали. К.счастью,.API.потребителей.предоставляет.возможность.вызывать.методы. commitAsync() .или. commitSync() ,.передавая.им.ассоциативный.словарь.разделов. и.смещений,.которые.нужно.зафиксировать..Если.идет.процесс.обработки.пакета. записей.и.смещение.последнего.полученного.вами.из.раздела.3.в.теме.«покупате- ли».сообщения.равно.5000,.то.можете.вызвать.метод. commitSync() .для.фиксации. смещения.5000.для.раздела.3.в.теме.«покупатели»..А.поскольку.потребитель.может. отвечать.более.чем.за.один.раздел,.придется.отслеживать.смещения.во.всех.его. разделах,.что.приведет.к.дополнительному.усложнению.кода. Прослушивание на предмет перебалансировки 107 Вот.так.выглядит.фиксация.заданных.смещений: private Map new HashMap<>(); int count = 0; while (true) { ConsumerRecords for (ConsumerRecord { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata")); if (count % 1000 == 0) consumer.commitAsync(currentOffsets, null); count++; } } .Этот.ассоциативный.словарь.будем.использовать.для.отслеживания.смещений. вручную. .Напомним:. printf .здесь.лишь.заглушка.для.реальной.обработки.получаемых. записей. .После.чтения.каждой.записи.обновляем.ассоциативный.словарь.смещений,.ука- зывая.смещение.следующего.намеченного.для.обработки.сообщения..Именно. с.этого.места.мы.начнем.чтение.в.следующий.раз. .Здесь.мы.решили.фиксировать.текущие.смещения.через.каждые.1000.записей.. В.своем.приложении.можете.выполнять.фиксацию.по.времени.или,.возможно,. на.основе.содержимого.записей. .Я.решил.вызвать.здесь.метод. commitAsync() ,.но. commitSync() .тоже.прекрасно. подошел.бы..Конечно,.при.фиксации.конкретных.смещений.необходимо.вы- полнять.всю.показанную.в.предыдущих.разделах.обработку.ошибок. Прослушивание на предмет перебалансировки Как.упоминалось.в.предыдущем.разделе,.посвященном.фиксации.смещений,.по- требителю.необходимо.выполнить.определенную.«чистку».перед.завершением. выполнения,.а.также.перед.перебалансировкой.разделов. Если.известно,.что.раздел.вот-вот.перестанет.принадлежать.данному.потребителю,. то.желательно.зафиксировать.смещения.последних.обработанных.событий..Возмож- но,.придется.также.закрыть.дескрипторы.файлов,.соединения.с.базой.данных.и.т..п. 108 Глава 4 • Потребители Kafka: чтение данных из Kafka API.потребителей.позволяет.вашему.коду.работать.во.время.смены.(добавления/ удаления).принадлежности.разделов.потребителю..Для.этого.необходимо.пере- дать.объект. ConsumerRebalanceListener .при.вызове.обсуждавшегося.ранее.метода. subscribe() ..У.класса. ConsumerRebalanceListener .есть.два.доступных.для.реали- зации.метода: public void onPartitionsRevoked(Collection partitions) .—. вызывается.до.начала.перебалансировки.и.после.завершения.получения.сообще- ний.потребителем..Именно.в.этом.методе.необходимо.фиксировать.смещения,. чтобы.следующий.потребитель,.которому.будет.назначен.этот.раздел,.знал,. с.какого.места.начинать; public void onPartitionsAssigned(Collection partitions) .—. вызывается.после.переназначения.разделов.потребителю,.но.до.того,.как.он. начнет.получать.сообщения. В.следующем.примере.вы.увидите,.как.использовать.метод. onPartitionsRevoked() для.фиксации.смещений.перед.сменой.принадлежности.раздела.(в.следующем. разделе.приведем.развернутый.пример,.демонстрирующий.использование.метода. onPartitionsAssigned() ): private Map new HashMap<>(); private class HandleRebalance implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection partitions) { } public void onPartitionsRevoked(Collection partitions) { System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets); consumer.commitSync(currentOffsets); } } try { consumer.subscribe(topics, new HandleRebalance()); while (true) { ConsumerRecords consumer.poll(100); for (ConsumerRecord { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); currentOffsets.put(new TopicPartition(record.topic(), Получение записей с заданными смещениями 109 record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata")); } consumer.commitAsync(currentOffsets, null); } } catch (WakeupException e) { // Игнорируем, поскольку закрываемся } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync(currentOffsets); } finally { consumer.close(); System.out.println("Closed consumer and we are done"); } } .Начинаем.с.реализации.класса. ConsumerRebalanceListener .В.этом.примере.при.назначении.нового.раздела.не.требуется.ничего.делать,.мы. просто.начинаем.получать.сообщения. .Но.когда.потребитель.вот-вот.потеряет.раздел.из-за.перебалансировки,.необ- ходимо.зафиксировать.смещения..Обратите.внимание.на.то,.что.мы.фиксируем. последние.обработанные.смещения,.а.не.последние.смещения.во.все.еще.обра- батываемом.пакете..Делаем.это.из-за.возможной.смены.принадлежности.раз- дела.в.ходе.обработки.пакета..Мы.фиксируем.смещения.для.всех.разделов,.а.не. только.тех,.которые.«потеряем»,.—.раз.смещения.относятся.к.уже.обработанным. событиям,.никакого.вреда.это.не.принесет..И.мы.используем.метод. commitSync() для.гарантии.фиксации.смещений.до.перебалансировки. .Самое.главное:.передаем.объект. ConsumerRebalanceListener .в.метод. subscribe() для.вызова.потребителем. Получение записей с заданными смещениями До.сих.пор.мы.использовали.метод. poll() ,.чтобы.запустить.получение.сообщений. с.последнего.зафиксированного.смещения.в.каждом.из.разделов.и.дальнейшей. обработки.всех.сообщений.по.очереди..Однако.иногда.необходимо.начать.чтение. с.другого.смещения. Если.вы.хотели.бы.начать.чтение.всех.сообщений.с.начала.раздела.или.хотя.бы. пропустить.все.до.конца.разделы.и.получать.только.новые.сообщения,.то.мож- но.применить.специализированные.API. seekToBeginning(TopicPartition tp) и. seekToEnd(TopicPartition tp) Однако.API.Kafka.дает.возможность.переходить.и.к.конкретному.смещению..Ее.мож- но.использовать.для.множества.различных.целей,.например,.чтобы.вернуться.на. |