Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
152 Глава 6 • Надежная доставка данных Фиксация сообщений и фиксация событий Зафиксированное смещение отличается от зафиксированного сообщения (com- mitted message), которое, как обсуждалось ранее, представляет собой сообще- ние, записанное во все согласованные реплики и доступное потребителям. Зафиксированные смещения (committed offsets) — это смещения, отправленные потребителем в Kafka в подтверждение получения и обработки ею всех сообще- ний в разделе вплоть до этого конкретного смещения. В.главе.4.мы.подробно.обсуждали.API.потребителей.и.видели.множество.методов. фиксации.смещений..Здесь.рассмотрим.некоторые.важные.соображения.и.до- ступные.альтернативы,.но.за.подробностями.использования.API.отправляем.вас. к.главе.4. Свойства конфигурации потребителей, важные для надежной обработки Существует.четыре.параметра.конфигурации.потребителей,.без.понимания.кото- рых.не.получится.настроить.их.достаточно.надежное.поведение. Первый.из.них,. group.id ,.очень.подробно.описан.в.главе.4..Основная.его.идея:. если.у.двух.потребителей.одинаковый.идентификатор.группы.и.они.подписаны. на.одну.тему,.каждый.получает.в.обработку.подмножество.разделов.темы.и.будет. читать.только.часть.сообщений,.но.группа.в.целом.прочитает.все.сообщения..Если. необходимо,.чтобы.отдельный.потребитель.увидел.каждое.из.сообщений.темы,.то. у.него.должен.быть.уникальный. group.id Второй.параметр.—. auto.offset.reset ..Он.определяет,.что.потребитель.будет. делать,.если.никаких.смещений.не.было.зафиксировано.(например,.при.первона- чальном.запуске.потребителя).или.когда.потребитель.запросил.смещения,.которых. нет.в.брокере.(почему.так.случается,.вы.можете.узнать.из.главы.4)..У.этого.параме- тра.есть.только.два.значения..Если.выбрать. earliest ,.то.при.отсутствии.коррект- ного.смещения.потребитель.начнет.с.начала.раздела..Это.приведет.к.повторной. обработке.множества.сообщений,.но.гарантирует.минимальные.потери.данных.. Если.же.выбрать. latest ,.то.потребитель.начнет.с.конца.раздела..Это.минимизирует. повторную.обработку,.но.почти.наверняка.приведет.к.пропуску.потребителем.не- которых.сообщений. Третий.из.этих.параметров.—. enable.auto.commit ..Нужно.принять.непростое.реше- ние:.разрешить.ли.потребителю.фиксировать.смещения.вместо.вас.по.расписанию. или.самостоятельно.фиксировать.смещения.в.своем.коде?.Основное.преимущество. автоматической.фиксации.смещений.в.том,.что.при.реализации.потребителей.на. одну.заботу.окажется.меньше..Если.выполнять.всю.обработку.прочитанных.записей. внутри.цикла.опроса.потребителя,.то.автоматическая.фиксация.смещений.гаран- тирует.невозможность.фиксации.необработанного.смещения.(если.вы.не.помните,. Использование потребителей в надежной системе 153 что.такое.цикл.опроса.потребителя,.—.обратитесь.к.главе.4)..Основной.недостаток. автоматической.фиксации.смещений.—.отсутствие.контроля.числа.дубликатов,. которые.придется.обработать,.поскольку.потребитель.останавливается.после.об- работки.части.записей,.но.до.запуска.автоматической.фиксации..Если.вы.занимае- тесь.чем-то.замысловатым.вроде.передачи.записей.другому.потоку.для.обработки. в.фоновом.режиме,.то.могут.оказаться.автоматически.зафиксированы.смещения.для. уже.прочитанных,.но,.возможно,.еще.не.обработанных.потребителем.записей. Четвертый.из.этих.параметров.связан.с.третьим.и.называется. auto.commit.inter- val.ms ..Если.вы.выберете.автоматическую.фиксацию.смещений,.то.этот.параметр. даст.вам.возможность.настроить.ее.частоту..Значение.по.умолчанию.—.каждые. 5.секунд..В.целом.более.высокая.частота.фиксации.приводит.к.дополнительным. вычислительным.расходам,.но.снижает.число.дубликатов,.которые.могут.возни- кать.при.останове.потребителя. Фиксация смещений в потребителях явным образом Если.вы.используете.автоматическую.фиксацию.смещений,.то.фиксация.смещений. явным.образом.вас.не.интересует..Но.если.вам.не.помешал.бы.больший.контроль. за.временем.фиксации.смещений,.чтобы.уменьшить.число.дубликатов.или,.до- пустим,.потому.что.обработка.событий.происходит.вне.основного.цикла.опроса. потребителя,.то.стоит.обдумать,.как.фиксировать.смещения. Мы.не.станем.рассматривать.здесь.механизм.и.API.фиксации.смещений,.поскольку. уже.сделали.это.в.главе.4..Вместо.этого.обсудим.важные.соображения.относитель- но.разработки.потребителей.для.надежной.обработки.данных..Начнем.с.простых. и,.наверное,.очевидных.вещей.и.постепенно.перейдем.к.более.сложным.схемам. Всегда фиксируйте смещения после обработки событий Это.не.составит.проблемы,.если.вся.обработка.происходит.внутри.цикла.опроса,. а.состояние.между.итерациями.цикла.опроса.не.сохраняется.(например,.произ- водится.агрегирование)..Можно.воспользоваться.параметром.автоматической. фиксации.или.фиксировать.события.в.конце.цикла.опроса. Частота фиксации — компромисс между производительностью и числом дубликатов, возникающих при аварийном сбое Даже.в.простейшем.случае,.когда.вся.обработка.происходит.внутри.цикла.опроса,. а.состояние.между.итерациями.цикла.опроса.не.сохраняется,.можно.или.выполнять. фиксацию.несколько.раз.внутри.цикла.(возможно,.после.каждого.обработанного. события),.или.фиксировать.только.один.раз.в.несколько.итераций..Фиксация. подобно.отправке.сообщений.при. acks=all .обусловливает.некоторые.накладные. расходы,.так.что.вам.нужно.найти.компромисс. 154 Глава 6 • Надежная доставка данных Убедитесь, что фиксируете правильные смещения Часто.допускаемая.при.фиксации.во.время.цикла.опроса.ошибка.состоит.в.слу- чайной.фиксации.последнего.прочитанного.при.опросе.смещения,.а.не.последнего. обработанного..Помните,.что.чрезвычайно.важно.всегда.фиксировать.смещения. сообщений.после.их.обработки.—.фиксация.смещений.для.прочитанных,.но.еще. не.обработанных.сообщений.приводит.к.потере.сообщений.потребителями..В.гла- ве.4.приведены.соответствующие.примеры. Перераспределение При.проектировании.приложения.не.забывайте,.что.время.от.времени.будут. происходить.перераспределения.потребителей.и.вам.придется.обрабатывать.их. должным.образом..В.главе.4.приведено.несколько.примеров,.но.в.более.глобаль- ном.смысле.это.значит,.что.перед.сменой.принадлежности.разделов.придется.за- фиксировать.смещения.и.при.назначении.новых.разделов.очистить.сохраненные. состояния. Потребителям может понадобиться повторить попытку В.некоторых.случаях.после.выполнения.опроса.и.обработки.записей.оказывается,. что.часть.записей.обработана.не.полностью.и.их.придется.обработать.позже..Допу- стим,.вы.пытались.перенести.записи.из.Kafka.в.базу.данных,.но.оказалось,.что.она. в.данный.момент.недоступна,.так.что.нужно.будет.повторить.попытку..Отметим,. что.в.отличие.от.обычных.систем.обмена.сообщениями.по.типу.«публикация/ подписка»,.вы.фиксируете.смещения,.но.не.подтверждаете.получение.отдельных. сообщений..Это.значит,.что.если.вы.не.смогли.обработать.запись.№.30,.но.успешно. обработали.запись.№.31,.то.фиксировать.31-ю.запись.не.следует.—.это.приведет. к.фиксации.всех.записей.до.31-й,.включая.30-ю,.что.было.бы.нежелательно..Вместо. этого.попробуйте.один.из.следующих.двух.вариантов. 1.. Столкнувшись.с.ошибкой,.которую.можно.разрешить.путем.повтора,.зафикси- руйте.последнюю.успешно.обработанную.запись..Затем.сохраните.ожидающие. обработки.записи.в.буфере,.чтобы.следующая.итерация.опроса.их.не.затерла,. и.продолжайте.обработку.(см..пояснения.в.главе.4)..Вам.может.потребовать- ся.продолжить.выполнение.цикла.опроса.вместе.с.обработкой.всех.записей.. Можете.воспользоваться.методом. pause() .потребителя.для.упрощения.повто- ров,.чтобы.гарантировать,.что.дополнительные.опросы.не.вернут.дополнитель- ные.данные. 2.. Столкнувшись.с.ошибкой,.которую.можно.разрешить.путем.повтора,.запишите. ее.в.отдельную.тему.и.продолжайте.выполнение..Для.обработки.записей.из. Использование потребителей в надежной системе 155 этой.темы.для.повторов.можно.воспользоваться.отдельной.группой.потребите- лей..Или.один.и.тот.же.потребитель.может.подписаться.как.на.основную.тему,. так.и.на.тему.для.повторов.с.приостановкой.между.повторами.потребления. данных.из.темы.для.повторов..Эта.схема.работы.напоминает.очереди.зависших. сообщений.(dead-letter.queue),.используемые.во.многих.системах.обмена.со- общениями. Потребителям может потребоваться сохранение состояния В.некоторых.приложениях.необходимо.сохранять.состояние.между.вызовами. опроса..Например,.если.нужно.вычислить.скользящее.среднее,.приходится.об- новлять.значение.среднего.при.каждом.опросе.Kafka.на.предмет.новых.событий.. В.случае.перезапуска.процесса.необходимо.не.только.начать.получение.с.последне- го.смещения,.но.и.восстановить.соответствующее.скользящее.среднее..Сделать.это. можно,.в.частности,.записав.последнее.накопленное.значение.в.тему.для.резуль- татов.одновременно.с.фиксацией.смещения..Это.значит,.что.поток.может.начать. работу.с.того.места,.где.остановился,.подхватив.последнее.накопленное.значение.. Однако.это.не.решает.проблемы,.ведь.Kafka.пока.не.предоставляет.функциональ- ности.транзакций..Может.произойти.аварийный.сбой.после.записи.последнего. результата,.но.до.фиксации.смещений,.или.наоборот..В.целом,.это.довольно.слож- ная.проблема,.и.мы.рекомендуем,.вместо.того.чтобы.пытаться.решить.ее.своими. силами,.обратиться.к.таким.библиотекам,.как.Kafka.Streams,.предоставляющим. высокоуровневые.DSL-подобные.API.для.агрегирования,.реализации.соединений,. оконных.функций.и.другой.сложной.аналитики. Длительная обработка записей Иногда.обработка.записей.занимает.много.времени..Например,.в.случае.взаи- модействия.с.блокирующим.обработку.или.выполняющим.очень.длительные. вычисления.сервисом..Как.вы.помните,.в.некоторых.версиях.Kafka.невозможно. остановить.выполнение.опросов.более.чем.на.несколько.секунд.(см..подробности. в.главе.4)..Даже.если.вы.не.хотите.обрабатывать.дальнейшие.записи,.все.равно. должны.продолжать.опросы,.чтобы.клиент.мог.отправлять.контрольные.сигналы. брокеру..Распространенная.схема.действий.в.подобных.случаях.такова:.передать. по.возможности.данные.пулу.из.множества.потоков.для.ускорения.работы.за.счет. параллельной.обработки..После.передачи.данных.потокам-исполнителям.можно. приостановить.потребитель.и.продолжать.выполнение.опросов.без.фактического. извлечения.дополнительных.данных.вплоть.до.завершения.выполнения.потоков- исполнителей..После.этого.можно.возобновить.работу.потребителя..А.поскольку. потребитель.не.прекращает.выполнение.опросов,.то.контрольные.сигналы.будут. отправляться.по.плану.и.перераспределение.запущено.не.будет. 156 Глава 6 • Надежная доставка данных Строго однократная доставка Для.некоторых.приложений.требуется.доставить.данные.не.просто.как.минимум. один.раз.(то.есть.без.потерь.данных),.а.ровно.один.раз..Хотя.в.настоящий.момент. Kafka.не.обеспечивает.поддержки.строго.однократной.доставки,.у.потребителей. есть.в.запасе.несколько.трюков,.позволяющих.гарантировать,.что.каждое.сообще- ние.в.Kafka.будет.записано.во.внешнюю.систему.ровно.один.раз.(обратите.внима- ние.на.то,.что.при.этом.не.отсеиваются.дубликаты,.которые.могли.возникнуть.при. отправке.данных.в.Kafka). Простейший.и,.наверное,.самый.распространенный.способ.строго.однократной. доставки.—.запись.результатов.в.систему,.поддерживающую.уникальные.ключи.. В.числе.таких.систем.все.хранилища.данных.типа.«ключ/значение»,.все.реля- ционные.базы.данных,.Elasticsearch.и,.вероятно,.множество.других.хранилищ. данных..При.записи.данных.в.такую.систему,.как.реляционная.база.данных.или. Elasticsearch,.или.сама.запись.содержит.уникальный.ключ.(довольно.распро- страненная.ситуация),.или.можно.создать.такой.ключ.на.основе.сочетания.темы,. раздела.и.смещения,.которое.однозначно.идентифицирует.запись.Kafka..Если.вы. уже.занесли.запись.в.виде.значения.с.уникальным.ключом,.а.позднее.случайно.про- читали.ее.снова,.то.вы.просто.запишете.те.же.ключ.и.значение..Хранилище.данных. при.этом.перезапишет.существующую.запись,.и.результат.будет.точно.таким.же,. как.и.без.случайного.дубликата..Эта.очень.распространенная.и.удобная.схема.на- зывается.идемпотентной операцией записи.(idempotent.write). Другой.вариант.возможен.при.записи.в.систему.с.поддержкой.транзакций..Про- стейший.пример.—.реляционные.базы.данных,.но.в.HDFS.есть.атомарные.пере- именования,.часто.используемые.для.тех.же.целей..Суть.процедуры.состоит. в.объединении.записей.и.их.смещении.в.одну.транзакцию..чтобы.добиться.согласо- ванности..В.начале.работы.извлекаются.смещения.последних.записей,.занесенных. во.внешнее.хранилище,.после.этого.применяется.метод. consumer.seek() ,.чтобы. начать.потребление.данных.с.этих.смещений..Пример.реализации.такого.варианта. приведен.в.главе.4. Проверка надежности системы Пройдя.процесс.выяснения.требований.надежности,.настроив.брокеры.и.клиенты,. воспользовавшись.API.оптимальным.для.конкретного.сценария.использования. образом,.можно.расслабиться.и.запускать.систему.в.промышленную.эксплуатацию. в.полной.уверенности,.что.ни.одно.событие.не.будет.пропущено,.правда? Можете.так.и.поступить,.но.лучше.сначала.выполнить.хотя.бы.небольшую. проверку..Мы.рекомендуем.выполнять.три.уровня.проверки:.проверку.конфи- гурации,.проверку.приложения.и.мониторинг.приложения.при.промышленной. эксплуатации..Рассмотрим.каждый.из.этих.этапов.и.разберемся,.что.нужно.про- верять.и.как. Проверка надежности системы 157 Проверка конфигурации Можно.легко.протестировать.настройки.брокера.и.клиента.независимо.от.логики. приложения,.так.и.рекомендуется.поступить.по.двум.причинам. Благодаря.этому.можно.проверить,.соответствует.ли.выбранная.конфигурация. вашим.требованиям. Это.хорошее.упражнение.на.прослеживание.ожидаемого.поведения.системы.. Данная.глава.носит.скорее.теоретический.характер,.так.что.важно.проверить,. насколько.эта.теория.применима.на.практике. Kafka.содержит.две.утилиты,.предназначенные.для.такой.проверки..Пакет. org.apa- che.kafka.tools .включает.классы. VerifiableProducer .и. VerifiableConsumer Их.можно.запускать.в.виде.утилит.командной.строки.или.встраивать.во.фрейм- ворк.автоматизированного.тестирования. Смысл.процедуры.состоит.в.генерации.контрольным.производителем.последова- тельности.сообщений.с.номерами.от.1.до.выбранного.вами.значения..Этот.произ- водитель.можно.настраивать.точно.так.же,.как.и.ваш.собственный,.задавая.нужное. значение.параметра. acks ,.количество.попыток.повторов.и.частоту,.с.которой.гене- рируются.сообщения..Он.выведет.для.каждого.отправленного.брокеру.сообщения. уведомление.об.ошибке.или.успехе.отправки.в.зависимости.от.полученных.под- тверждений..Контрольный.потребитель.позволяет.выполнить.дополнительную. проверку..Он.потребляет.события.(обычно.исходящие.от.контрольного.произво- дителя).и.выводит.их.в.соответствующем.порядке..А.также.выводит.информацию. о.фиксациях.и.перераспределении. Стоит.также.задуматься.о.том,.какие.тесты.имеет.смысл.выполнить. Выбор.ведущей.реплики:.что.произойдет,.если.остановить.ведущую.реплику?. Сколько.времени.займет.возобновление.нормальной.работы.производителя. и.потребителя? Выбор.контроллера:.через.какое.время.система.возобновит.работу.после.пере- запуска.контроллера? Плавающий.перезапуск:.можно.ли.перезапускать.брокеры.по.одному.без.потери. сообщений? «Нечистый».выбор.ведущей.реплики:.что.произойдет,.если.отключать.все.ре- плики.раздела.по.одной,.чтобы.они.точно.переставали.быть.согласованными,. после.чего.запустить.несогласованный.брокер?.Что.должно.произойти.для. возобновления.работы?.Приемлемо.ли.это? Выбрав.сценарий.тестирования,.вы.запускаете.контрольный.производитель.и.кон- трольный.потребитель.и.выполняете.выбранный.сценарий.—.например,.останав- ливаете.ведущую.реплику.раздела,.для.которой.генерирует.данные.производитель.. Если.вы.ожидаете.лишь.небольшой.паузы,.после.которой.функционирование. возобновится.без.потери.каких-либо.данных,.то.проверьте,.совпадает.ли.число. 158 Глава 6 • Надежная доставка данных сообщений,.сгенерированных.производителем,.и.число.сообщений,.потребленных. потребителем. Репозиторий.исходного.кода.Apache.Kafka.включает.обширный.набор.тестов.. Многие.из.них.основаны.на.одном.и.том.же.принципе.—.например,.проверке.функ- ционирования.плавающих.обновлений.с.помощью.контрольного.производителя. и.контрольного.потребителя. Проверка приложений Убедившись,.что.настройки.брокера.и.клиента.соответствуют.требованиям,.можете. приступить.к.проверке.того,.обеспечивает.ли.ваше.приложение.необходимые.га- рантии..Это.включает.проверку.таких.вещей,.как.пользовательский.код.обработки. ошибок,.фиксация.смещений,.перераспределение.прослушивателей.и.других.мест,. в.которых.логика.приложения.взаимодействует.с.клиентскими.библиотеками.Kafka. Конечно,.поскольку.приложение.ваше,.вам.виднее,.как.его.тестировать,.мы.можем. лишь.подсказать.некоторые.моменты..Надеемся,.что.в.процессе.разработки.вы. используете.комплексные.тесты..Но.как.бы.вы.ни.проверяли.приложение,.мы. рекомендуем.запускать.тесты.при.различных.сбойных.состояниях: потере.клиентами.соединения.с.сервером.(помочь.в.моделировании.сетевых. сбоев.может.системный.администратор); выборе.ведущей.реплики; плавающем.перезапуске.брокеров; плавающем.перезапуске.потребителей; плавающем.перезапуске.производителей. В.каждом.из.этих.сценариев.существует.ожидаемое поведение.—.то,.что.вы.хотели. получить,.когда.создавали.приложение..И.вы.можете.выполнить.тест,.чтобы.уви- деть,.что.произойдет.на.самом.деле..Например,.когда.вы.планировали.плавающий. перезапуск.потребителей,.то.ожидали.небольшой.паузы.вследствие.перераспреде- ления.потребителей,.после.которой.потребление.продолжилось.бы.при.не.более. чем.1000.дублирующихся.значений..Тест.покажет,.действительно.ли.приложение. фиксирует.смещения.и.выполняет.перераспределение.подобным.образом. Мониторинг надежности при промышленной эксплуатации Тестирование.приложения.играет.важную.роль,.но.не.заменяет.необходимости. непрерывного.мониторинга.системы.для.контроля.потоков.данных.при.промыш- ленной.эксплуатации..В.главе.9.приведены.подробные.советы.по.мониторингу. состояния.кластера.Kafka,.но.помимо.этого.важно.контролировать.также.клиенты. и.потоки.данных.в.системе. |