Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
110 Глава 4 • Потребители Kafka: чтение данных из Kafka несколько.сообщений.назад.или.пропустить.несколько.сообщений.(если.чувстви- тельному.к.задержкам.приложению,.отстающему.от.графика,.нужно.перескочить. к.более.актуальным.сообщениям)..Самый.интересный.сценарий.использования. этой.возможности.—.когда.смещения.хранятся.не.в.Kafka. Рассмотрим.распространенный.сценарий.использования:.приложение.читает.со- бытия.из.Kafka.(например,.поток.данных.о.маршрутах.перемещения.пользователей. по.веб-сайту),.обрабатывает.данные.(к.примеру,.удаляет.записи,.соответствующие. посещению.сайта.автоматизированными.программами,.а.не.живыми.пользовате- лями),.после.чего.сохраняет.результаты.в.базе.данных,.NoSQL-хранилище.или. Hadoop..Допустим,.что.нам.не.хотелось.бы.ни.терять.какие-либо.данные,.ни.со- хранять.одни.и.те.же.результаты.в.базе.данных.дважды..В.этом.случае.цикл.по- требителя.выглядел.бы.примерно.так: while (true) { ConsumerRecords for (ConsumerRecord { currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); processRecord(record); storeRecordInDB(record); consumer.commitAsync(currentOffsets); } } В.этом.примере.мы.ведем.себя.совершенно.параноидально,.сохраняя.смещения. после.обработки.каждой.записи..Однако.все.равно.остается.вероятность.аварий- ного.сбоя.приложения.после.сохранения.записи.в.базе.данных,.но.до.фиксации. смещений,.вследствие.чего.запись.будет.обработана.снова.и.в.базе.данных.окажутся. дубликаты. Этого.можно.было.бы.избежать,.одномоментно.сохраняя.записи.и.смещения.. При.этом.или.запись.и.смещение.фиксировались.бы.вместе,.или.не.фиксирова- лось.бы.ни.то.ни.другое..Но.при.сохранении.записей.в.базе.данных,.а.смещений.—. в.Kafka.это.невозможно. А.если.заносить.и.запись,.и.смещение.в.базу.данных.в.одной.транзакции?.Тогда. можно.быть.уверенным,.что.или.и.запись.обработана,.и.смещение.зафиксировано,. или.не.выполнено.ни.то.ни.другое.и.запись.будет.обработана.повторно. Остается.единственная.проблема:.если.смещение.хранится.в.базе.данных,.а.не. в.Kafka,.то.как.наш.потребитель.узнает,.откуда.начинать.чтение.при.назначении.ему. нового.раздела?.В.этом.нам.поможет.метод. seek() ..При.начале.работы.потребителя. или.назначении.нового.раздела.им.можно.воспользоваться.для.поиска.смещения. в.базе.данных.и.перехода.к.нужному.месту. Получение записей с заданными смещениями 111 Вот.набросок.примера.использования.этой.методики..Чтобы.начинать.обработку. с.хранимых.в.базе.данных.смещений,.используем.класс. ConsumerRebalanceListener и.метод. seek() : public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener { public void onPartitionsRevoked(Collection partitions) { commitDBTransaction(); } public void onPartitionsAssigned(Collection partitions) { for(TopicPartition partition: partitions) consumer.seek(partition, getOffsetFromDB(partition)); } } } consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer)); consumer.poll(0); for (TopicPartition partition: consumer.assignment()) consumer.seek(partition, getOffsetFromDB(partition)); while (true) { ConsumerRecords consumer.poll(100); for (ConsumerRecord { processRecord(record); storeRecordInDB(record); storeOffsetInDB(record.topic(), record.partition(), record.offset()); } commitDBTransaction(); } .Используем.тут.фиктивный.метод,.предназначенный.для.фиксации.транзакций. в.базе.данных..Идея.состоит.во.вставке.записей.и.смещений.в.базу.данных.в.про- цессе.обработки.записей,.так.что.для.обеспечения.сохранности.данных.нужно. только.зафиксировать.транзакции.перед.«потерей».раздела. .У.нас.также.есть.фиктивный.метод.для.извлечения.смещений.из.базы.данных,. после.чего.путем.вызова. seek() мы.переходим.к.этим.записям.при.назначении. новых.разделов. .В.самом.начале.работы.потребителя.после.подписки.на.темы.мы.один.раз.вы- зываем.метод. poll() .для.присоединения.к.группе.потребителей.и.назначения. разделов,.после.чего.сразу.же.переходим.с.помощью. seek() .к.нужному.сме- щению.в.назначенных.разделах..Не.забывайте,.что. seek() .лишь.меняет.место,. 112 Глава 4 • Потребители Kafka: чтение данных из Kafka откуда.мы.получаем.данные,.так.что.нужные.сообщения.будут.извлечены.при. следующем.вызове.метода. poll() ..Если.в.вызове. seek() .содержалась.ошибка. (например,.такого.смещения.не.существует),. poll() .сгенерирует.исключение. .Еще.один.фиктивный.метод:.на.этот.раз.мы.обновляем.таблицу.в.базе.данных,. в.которой.хранятся.смещения..Мы.предполагаем,.что.обновление.записей.про- исходит.быстро,.так.что.обновляем.все.записи,.а.фиксация.—.медленно,.поэтому. фиксируем.транзакцию.только.в.конце.обработки.пакета..Однако.есть.несколь- ко.вариантов.оптимизации.этих.действий. Существует.множество.способов.реализации.строго.однократной.доставки.путем. хранения.смещений.и.данных.во.внешнем.хранилище,.но.все.они.требуют.исполь- зования.класса. ConsumerRebalanceListener .и.метода. seek() ,.чтобы.гарантировать. своевременное.сохранение.смещений.и.чтение.сообщений.потребителем.с.пра- вильного.места. Выход из цикла Ранее.в.этой.главе.при.обсуждении.цикла.опроса.я.советовал.не.волноваться.по. поводу.выполнения.опроса.в.бесконечном.цикле,.потому.что.мы.скоро.обсудим,. как.аккуратно.выйти.из.этого.цикла..Что.ж,.поговорим.об.этом. Если.вы.решите.выйти.из.цикла.опроса,.вам.понадобится.еще.один.поток.вы- полнения.для.вызова.метода. consumer.wakeup() ..Если.цикл.опроса.выполняется. в.главном.потоке,.это.можно.сделать.из.потока. ShutdownHook ..Отметим,.что. consu- mer.wakeup() .—.единственный.метод.потребителя,.который.можно.безопасно.вызы- вать.из.другого.потока..Вызов.метода. wakeup() .приведет.к.завершению.выполнения. метода. poll() .с.генерацией.исключения. WakeupException ..А.если. consumer.wakeup() был.вызван.в.момент,.когда.поток.не.ожидает.опроса,.то.исключение.будет.вызвано. при.вызове.метода. poll() .во.время.следующей.итерации..Обрабатывать.исключение. WakeupException .не.требуется,.но.перед.завершением.выполнения.потока.нужно. вызвать. consumer.close() ..Закрытие.потребителя.приведет.при.необходимости. к.фиксации.смещений.и.отправке.координатору.группы.сообщения.о.том,.что.по- требитель.покидает.группу..Координатор.группы.сразу.же.инициирует.перебаланси- ровку,.и.вам.не.придется.ждать.истечения.времени.ожидания.сеанса.для.назначения. разделов.закрываемого.потребителя.другому.потребителю.из.данной.группы. Если.потребитель.работает.в.главном.потоке.приложения,.то.код.завершения.его. выполнения.выглядит.следующим.образом.(пример.немного.сокращен,.полный. вариант.можно.найти.по.адресу:. http://bit.ly/2u47e9A ): Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("Starting exit..."); consumer.wakeup(); try { mainThread.join(); } catch (InterruptedException e) { Десериализаторы 113 e.printStackTrace(); } } }); try { // Выполняем цикл вплоть до нажатия ctrl+c, об очистке // при завершении выполнения позаботится ShutdownHook while (true) { ConsumerRecords movingAvg.consumer.poll(1000); System.out.println(System.currentTimeMillis() + " -- waiting for data..."); for (ConsumerRecord records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } for (TopicPartition tp: consumer.assignment()) System.out.println("Committing offset at position:" + consumer.position(tp)); movingAvg.consumer.commitSync(); } } catch (WakeupException e) { // Игнорируем } finally { consumer.close(); System.out.println("Closed consumer and we are done"); } } ShutdownHook .работает.в.отдельном.потоке,.так.что.единственное,.что.можно. сделать.безопасно,.—.это.вызвать. wakeup .для.выхода.из.цикла. poll() .В.результате.вызова. wakeup .из.другого.потока. poll .сгенерирует.исключение. WakeupException ..Его.лучше.перехватить,.чтобы.не.произошло.непредвиденного. завершения.выполнения.приложения,.но.ничего.делать.с.ним.не.требуется. .Перед.завершением.выполнения.потребителя.аккуратно.закрываем.его. Десериализаторы Как.уже.обсуждалось.в.предыдущей.главе,.для.производителей.Kafka.требуются.се- риализаторы.для.преобразования.объектов.в.отправляемые.в.Kafka.байтовые.мас- сивы..А.для.потребителей.Kafka.необходимы.десериализаторы.для.преобразования. полученных.из.Kafka.байтовых.массивов.в.объекты.Java..В.предыдущих.примерах. мы.просто.считали,.что.ключ.и.значение.всех.сообщений.—.строки,.и.оставили. в.настройках.потребителей.сериализатор.по.умолчанию.—. StringDeserializer 114 Глава 4 • Потребители Kafka: чтение данных из Kafka В.главе.3,.посвященной.производителям.Kafka,.мы.наблюдали.сериализацию.поль- зовательских.типов.данных,.а.также.использование.Avro.и.объектов. AvroSerializer для.генерации.объектов.Avro.на.основе.описания.схемы.и.последующей.их.сериа- лизации.при.отправке.сообщений.в.Kafka..Теперь.изучим.создание.пользователь- ских.десериализаторов.для.ваших.собственных.объектов,.а.также.использование. десериализаторов.Avro. Вполне.очевидно,.что.используемый.для.отправки.событий.в.Kafka.сериализатор. должен.соответствовать.десериализатору,.применяемому.при.их.получении.от- туда..Ничего.хорошего.из.сериализации.с.помощью. IntSerializer .с.последующей. десериализацией.посредством. StringDeserializer .не.выйдет..Это.значит,.что,.как. разработчик,.вы.должны.отслеживать,.какие.сериализаторы.использовались.для. записи.в.каждую.из.тем,.и.гарантировать,.что.темы.содержат.только.такие.данные,. которые.понятны.используемым.вами.десериализаторам..Как.раз.в.этом.состоит. одно.из.преимуществ.сериализации.и.десериализации.с.помощью.Avro.и.репозито- рия.схем.—. AvroSerializer .гарантирует,.что.все.записываемые.в.конкретную.тему. данные.совместимы.со.схемой.темы,.а.значит,.их.можно.будет.десериализовать. с.помощью.соответствующего.десериализатора.и.схемы..Любые.несовместимо- сти.—.на.стороне.производителя.или.потребителя.—.легко.поддаются.перехвату. с.выводом.соответствующего.сообщения.об.ошибке,.так.что.нет.нужды.в.отладке. байтовых.массивов.в.поисках.ошибок.сериализации. Начнем.с.небольшого.примера.написания.пользовательского.десериализатора,. хотя.этот.вариант.применяется.реже.прочих,.после.чего.перейдем.к.примеру.ис- пользования.Avro.для.десериализации.ключей.и.значений. Пользовательские сериализаторы Возьмем.тот.же.пользовательский.объект,.который.мы.сериализовали.в.главе.3,. и.напишем.для.него.десериализатор: public class Customer { private int customerID; private String customerName; public Customer(int ID, String name) { this.customerID = ID; this.customerName = name; } public int getID() { return customerID; } public String getName() { return customerName; } } Десериализаторы 115 Пользовательский.десериализатор.выглядит.следующим.образом: import org.apache.kafka.common.errors.SerializationException; import java.nio.ByteBuffer; import java.util.Map; public class CustomerDeserializer implements Deserializer @Override public void configure(Map configs, boolean isKey) { // настраивать нечего } @Override public Customer deserialize(String topic, byte[] data) { int id; int nameSize; String name; try { if (data == null) return null; if (data.length < 8) throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected"); ByteBuffer buffer = ByteBuffer.wrap(data); id = buffer.getInt(); String nameSize = buffer.getInt(); byte[] nameBytes = new Array[Byte](nameSize); buffer.get(nameBytes); name = new String(nameBytes, 'UTF-8'); return new Customer(id, name); } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } } @Override public void close() { // закрывать нечего } } .Потребителю.требуется.также.реализация.класса. Customer ,.причем.как.класс,. так.и.сериализатор.должны.совпадать.в.приложении-производителе.и.при- ложении-потребителе..В.большой.компании.со.множеством.потребителей. 116 Глава 4 • Потребители Kafka: чтение данных из Kafka и.производителей,.совместно.работающих.с.данными,.это.представляет.собой. непростую.задачу. .Мы.просто.меняем.логику.сериализатора.на.противоположную.ей.—.извлекаем. идентификатор.и.имя.покупателя.из.байтового.массива.и.используем.их.для. формирования.нужного.объекта. Использующий.этот.сериализатор.код.потребителя.будет.выглядеть.примерно.так: Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.CustomerDeserializer"); KafkaConsumer new KafkaConsumer<>(props); consumer.subscribe("customerCountries") while (true) { ConsumerRecords consumer.poll(100); for (ConsumerRecord { System.out.println("current customer Id: " + record.value().getID() + " and current customer name: " + record.value().getName()); } } Важно.отметить,.что.реализовывать.пользовательские.сериализаторы.и.десери- ализаторы.не.рекомендуется..Такое.решение.приводит.к.сильному.сцеплению. производителей.и.потребителей,.ненадежно.и.чревато.возникновением.ошибок.. Лучше.воспользоваться.стандартным.форматом.сообщений,.например,.JSON,. Thrift,.Protobuf.или.Avro..Сейчас.мы.рассмотрим.использование.десериализаторов. Avro.в.потребителе.Kafka..Основные.сведения.о.библиотеке.Apache.Avro,.ее.схемах. и.совместимости.схем.приведены.в.главе.3. Использование десериализации Avro в потребителе Kafka Предположим,.что.мы.используем.показанный.в.главе.3.класс. Customer ..Чтобы. получать.такие.объекты.из.Kafka,.необходимо.реализовать.примерно.такое.при- ложение-потребитель: Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.serializer", Автономный потребитель: зачем и как использовать потребитель без группы 117 "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", schemaUrl); String topic = "customerContacts" KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url)); consumer.subscribe(Collections.singletonList(topic)); System.out.println("Reading topic:" + topic); while (true) { ConsumerRecords consumer.poll(1000); for (ConsumerRecord System.out.println("Current customer name is: " + record.value().getName()); } consumer.commitSync(); }- .Для.десериализации.сообщений.Avro.используем.класс. KafkaAvroDeserializer schema.registry.url .—.параметр,.указывающий.на.место.хранения.схем..С.его. помощью.потребитель.может.использовать.зарегистрированную.производите- лем.схему.для.десериализации.сообщения. .Указываем.сгенерированный.класс. Customer .в.качестве.типа.значения.записи. record.value() .представляет.собой.экземпляр.класса. Customer ,.и.его.можно. использовать.соответствующим.образом. Автономный потребитель: зачем и как использовать потребитель без группы До.сих.пор.мы.обсуждали.группы.потребителей,.в.которых.потребителям.автома- тически.назначаются.разделы.и.которые.автоматически.подвергаются.перебалан- сировке.при.добавлении.или.удалении.потребителей..Обычно.такое.поведение.—. именно.то,.что.требуется,.но.в.некоторых.случаях.хочется.чего-то.более.простого.. Иногда.у.вас.заведомо.один.потребитель,.которому.нужно.всегда.читать.данные.из. всех.разделов.темы.или.из.конкретного.раздела.темы..В.этом.случае.оснований.для. организации.группы.потребителей.или.перебалансировки.нет,.достаточно.просто. назначить.потребителю.соответствующие.тему.и/или.разделы,.получать.сообщения. и.периодически.фиксировать.смещения. Если.вы.точно.знаете,.какие.разделы.должен.читать.потребитель,.то.не.подписыва- етесь.на.тему,.а.просто.назначаете.себе.несколько.разделов..Пользователь.может. или.подписываться.на.темы.и.состоять.в.группе.потребителей,.или.назначать.себе. разделы,.но.не.то.и.другое.одновременно. 118 Глава 4 • Потребители Kafka: чтение данных из Kafka Вот.пример,.в.котором.потребитель.назначает.себе.все.разделы.конкретной.темы. и.получает.из.них.сообщения: List partitionInfos = null; partitionInfos = consumer.partitionsFor("topic"); if (partitionInfos != null) { for (PartitionInfo partition : partitionInfos) partitions.add(new TopicPartition(partition.topic(), partition.partition())); consumer.assign(partitions); while (true) { ConsumerRecords consumer.poll(1000); for (ConsumerRecord System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitSync(); } } .Начинаем.с.запроса.у.кластера.доступных.в.данной.теме.разделов..Если.вы.со- бираетесь.получать.только.данные.из.конкретного.раздела,.то.можете.эту.часть. пропустить. .Выяснив,.какие.разделы.нам.нужны,.вызываем.метод. assign() ,.передавая.ему. их.список. Если.не.считать.отсутствия.перебалансировок.и.необходимости.вручную.искать. разделы,.все.остальное.происходит.как.обычно..Не.забывайте,.что.при.добавлении. в.тему.новых.разделов.потребитель.уведомлен.не.будет..Об.этом.вам.придется. позаботиться.самим,.периодически.обращаясь.к. consumer.partitionsFor() .или. просто.перезапуская.приложение.при.добавлении.разделов. Старые API потребителей В.этой.главе.мы.обсудили.Java-клиент. KafkaConsumer ,.включенный.в.пакет. org.apa- che.kafka.clients ..Однако.на.момент.написания.данной.книги.в.Apache.Kafka.все. еще.есть.два.более.старых.клиента,.написанных.на.языке.Scala.и.включенных. в.пакет. kafka.consumer ,.являющийся.частью.базового.модуля.Kafka..Один.из.этих. потребителей.носит.название. SimpleConsumer .(и.он.не.так.уж.прост).. SimpleConsumer представляет.собой.тонкий.адаптер.для.API.Kafka,.позволяющий.получать.данные. из.конкретных.разделов.и.с.конкретных.смещений..Еще.один.старый.API.называ- ется.высокоуровневым.потребителем.или. ZookeeperConsumerConnector ..Высоко- |