Главная страница

Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


Скачать 7.59 Mb.
НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Дата21.06.2022
Размер7.59 Mb.
Формат файлаpdf
Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
ТипДокументы
#609074
страница14 из 39
1   ...   10   11   12   13   14   15   16   17   ...   39
110 Глава 4 • Потребители Kafka: чтение данных из Kafka несколько.сообщений.назад.или.пропустить.несколько.сообщений.(если.чувстви- тельному.к.задержкам.приложению,.отстающему.от.графика,.нужно.перескочить.
к.более.актуальным.сообщениям)..Самый.интересный.сценарий.использования.
этой.возможности.—.когда.смещения.хранятся.не.в.Kafka.
Рассмотрим.распространенный.сценарий.использования:.приложение.читает.со- бытия.из.Kafka.(например,.поток.данных.о.маршрутах.перемещения.пользователей.
по.веб-сайту),.обрабатывает.данные.(к.примеру,.удаляет.записи,.соответствующие.
посещению.сайта.автоматизированными.программами,.а.не.живыми.пользовате- лями),.после.чего.сохраняет.результаты.в.базе.данных,.NoSQL-хранилище.или.
Hadoop..Допустим,.что.нам.не.хотелось.бы.ни.терять.какие-либо.данные,.ни.со- хранять.одни.и.те.же.результаты.в.базе.данных.дважды..В.этом.случае.цикл.по- требителя.выглядел.бы.примерно.так:
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
{
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()),
record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}
В.этом.примере.мы.ведем.себя.совершенно.параноидально,.сохраняя.смещения.
после.обработки.каждой.записи..Однако.все.равно.остается.вероятность.аварий- ного.сбоя.приложения.после.сохранения.записи.в.базе.данных,.но.до.фиксации.
смещений,.вследствие.чего.запись.будет.обработана.снова.и.в.базе.данных.окажутся.
дубликаты.
Этого.можно.было.бы.избежать,.одномоментно.сохраняя.записи.и.смещения..
При.этом.или.запись.и.смещение.фиксировались.бы.вместе,.или.не.фиксирова- лось.бы.ни.то.ни.другое..Но.при.сохранении.записей.в.базе.данных,.а.смещений.—.
в.Kafka.это.невозможно.
А.если.заносить.и.запись,.и.смещение.в.базу.данных.в.одной.транзакции?.Тогда.
можно.быть.уверенным,.что.или.и.запись.обработана,.и.смещение.зафиксировано,.
или.не.выполнено.ни.то.ни.другое.и.запись.будет.обработана.повторно.
Остается.единственная.проблема:.если.смещение.хранится.в.базе.данных,.а.не.
в.Kafka,.то.как.наш.потребитель.узнает,.откуда.начинать.чтение.при.назначении.ему.
нового.раздела?.В.этом.нам.поможет.метод.
seek()
..При.начале.работы.потребителя.
или.назначении.нового.раздела.им.можно.воспользоваться.для.поиска.смещения.
в.базе.данных.и.перехода.к.нужному.месту.

Получение записей с заданными смещениями 111
Вот.набросок.примера.использования.этой.методики..Чтобы.начинать.обработку.
с.хранимых.в.базе.данных.смещений,.используем.класс.
ConsumerRebalanceListener и.метод.
seek()
:
public class SaveOffsetsOnRebalance implements
ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection
partitions) {
commitDBTransaction();
}
public void onPartitionsAssigned(Collection
partitions) {
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition)); while (true) {
ConsumerRecords records =
consumer.poll(100);
for (ConsumerRecord record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(),
record.offset());
}
commitDBTransaction();
}
.Используем.тут.фиктивный.метод,.предназначенный.для.фиксации.транзакций.
в.базе.данных..Идея.состоит.во.вставке.записей.и.смещений.в.базу.данных.в.про- цессе.обработки.записей,.так.что.для.обеспечения.сохранности.данных.нужно.
только.зафиксировать.транзакции.перед.«потерей».раздела.
.У.нас.также.есть.фиктивный.метод.для.извлечения.смещений.из.базы.данных,.
после.чего.путем.вызова.
seek()
мы.переходим.к.этим.записям.при.назначении.
новых.разделов.
.В.самом.начале.работы.потребителя.после.подписки.на.темы.мы.один.раз.вы- зываем.метод.
poll()
.для.присоединения.к.группе.потребителей.и.назначения.
разделов,.после.чего.сразу.же.переходим.с.помощью.
seek()
.к.нужному.сме- щению.в.назначенных.разделах..Не.забывайте,.что.
seek()
.лишь.меняет.место,.

112 Глава 4 • Потребители Kafka: чтение данных из Kafka откуда.мы.получаем.данные,.так.что.нужные.сообщения.будут.извлечены.при.
следующем.вызове.метода.
poll()
..Если.в.вызове.
seek()
.содержалась.ошибка.
(например,.такого.смещения.не.существует),.
poll()
.сгенерирует.исключение.
.Еще.один.фиктивный.метод:.на.этот.раз.мы.обновляем.таблицу.в.базе.данных,.
в.которой.хранятся.смещения..Мы.предполагаем,.что.обновление.записей.про- исходит.быстро,.так.что.обновляем.все.записи,.а.фиксация.—.медленно,.поэтому.
фиксируем.транзакцию.только.в.конце.обработки.пакета..Однако.есть.несколь- ко.вариантов.оптимизации.этих.действий.
Существует.множество.способов.реализации.строго.однократной.доставки.путем.
хранения.смещений.и.данных.во.внешнем.хранилище,.но.все.они.требуют.исполь- зования.класса.
ConsumerRebalanceListener
.и.метода.
seek()
,.чтобы.гарантировать.
своевременное.сохранение.смещений.и.чтение.сообщений.потребителем.с.пра- вильного.места.
Выход из цикла
Ранее.в.этой.главе.при.обсуждении.цикла.опроса.я.советовал.не.волноваться.по.
поводу.выполнения.опроса.в.бесконечном.цикле,.потому.что.мы.скоро.обсудим,.
как.аккуратно.выйти.из.этого.цикла..Что.ж,.поговорим.об.этом.
Если.вы.решите.выйти.из.цикла.опроса,.вам.понадобится.еще.один.поток.вы- полнения.для.вызова.метода.
consumer.wakeup()
..Если.цикл.опроса.выполняется.
в.главном.потоке,.это.можно.сделать.из.потока.
ShutdownHook
..Отметим,.что.
consu- mer.wakeup()
.—.единственный.метод.потребителя,.который.можно.безопасно.вызы- вать.из.другого.потока..Вызов.метода.
wakeup()
.приведет.к.завершению.выполнения.
метода.
poll()
.с.генерацией.исключения.
WakeupException
..А.если.
consumer.wakeup()
был.вызван.в.момент,.когда.поток.не.ожидает.опроса,.то.исключение.будет.вызвано.
при.вызове.метода.
poll()
.во.время.следующей.итерации..Обрабатывать.исключение.
WakeupException
.не.требуется,.но.перед.завершением.выполнения.потока.нужно.
вызвать.
consumer.close()
..Закрытие.потребителя.приведет.при.необходимости.
к.фиксации.смещений.и.отправке.координатору.группы.сообщения.о.том,.что.по- требитель.покидает.группу..Координатор.группы.сразу.же.инициирует.перебаланси- ровку,.и.вам.не.придется.ждать.истечения.времени.ожидания.сеанса.для.назначения.
разделов.закрываемого.потребителя.другому.потребителю.из.данной.группы.
Если.потребитель.работает.в.главном.потоке.приложения,.то.код.завершения.его.
выполнения.выглядит.следующим.образом.(пример.немного.сокращен,.полный.
вариант.можно.найти.по.адресу:.
http://bit.ly/2u47e9A
):
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup(); try {
mainThread.join();
} catch (InterruptedException e) {

Десериализаторы 113
e.printStackTrace();
}
}
});
try {
// Выполняем цикл вплоть до нажатия ctrl+c, об очистке
// при завершении выполнения позаботится ShutdownHook while (true) {
ConsumerRecords records =
movingAvg.consumer.poll(1000);
System.out.println(System.currentTimeMillis() + "
-- waiting for data...");
for (ConsumerRecord record :
records) {
System.out.printf("offset = %d, key = %s,
value = %s\n",
record.offset(), record.key(),
record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" +
consumer.position(tp));
movingAvg.consumer.commitSync();
}
} catch (WakeupException e) {
// Игнорируем
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
ShutdownHook
.работает.в.отдельном.потоке,.так.что.единственное,.что.можно.
сделать.безопасно,.—.это.вызвать.
wakeup
.для.выхода.из.цикла.
poll()
.В.результате.вызова.
wakeup
.из.другого.потока.
poll
.сгенерирует.исключение.
WakeupException
..Его.лучше.перехватить,.чтобы.не.произошло.непредвиденного.
завершения.выполнения.приложения,.но.ничего.делать.с.ним.не.требуется.
.Перед.завершением.выполнения.потребителя.аккуратно.закрываем.его.
Десериализаторы
Как.уже.обсуждалось.в.предыдущей.главе,.для.производителей.Kafka.требуются.се-
риализаторы.для.преобразования.объектов.в.отправляемые.в.Kafka.байтовые.мас- сивы..А.для.потребителей.Kafka.необходимы.десериализаторы.для.преобразования.
полученных.из.Kafka.байтовых.массивов.в.объекты.Java..В.предыдущих.примерах.
мы.просто.считали,.что.ключ.и.значение.всех.сообщений.—.строки,.и.оставили.
в.настройках.потребителей.сериализатор.по.умолчанию.—.
StringDeserializer

114 Глава 4 • Потребители Kafka: чтение данных из Kafka
В.главе.3,.посвященной.производителям.Kafka,.мы.наблюдали.сериализацию.поль- зовательских.типов.данных,.а.также.использование.Avro.и.объектов.
AvroSerializer для.генерации.объектов.Avro.на.основе.описания.схемы.и.последующей.их.сериа- лизации.при.отправке.сообщений.в.Kafka..Теперь.изучим.создание.пользователь- ских.десериализаторов.для.ваших.собственных.объектов,.а.также.использование.
десериализаторов.Avro.
Вполне.очевидно,.что.используемый.для.отправки.событий.в.Kafka.сериализатор.
должен.соответствовать.десериализатору,.применяемому.при.их.получении.от- туда..Ничего.хорошего.из.сериализации.с.помощью.
IntSerializer
.с.последующей.
десериализацией.посредством.
StringDeserializer
.не.выйдет..Это.значит,.что,.как.
разработчик,.вы.должны.отслеживать,.какие.сериализаторы.использовались.для.
записи.в.каждую.из.тем,.и.гарантировать,.что.темы.содержат.только.такие.данные,.
которые.понятны.используемым.вами.десериализаторам..Как.раз.в.этом.состоит.
одно.из.преимуществ.сериализации.и.десериализации.с.помощью.Avro.и.репозито- рия.схем.—.
AvroSerializer
.гарантирует,.что.все.записываемые.в.конкретную.тему.
данные.совместимы.со.схемой.темы,.а.значит,.их.можно.будет.десериализовать.
с.помощью.соответствующего.десериализатора.и.схемы..Любые.несовместимо- сти.—.на.стороне.производителя.или.потребителя.—.легко.поддаются.перехвату.
с.выводом.соответствующего.сообщения.об.ошибке,.так.что.нет.нужды.в.отладке.
байтовых.массивов.в.поисках.ошибок.сериализации.
Начнем.с.небольшого.примера.написания.пользовательского.десериализатора,.
хотя.этот.вариант.применяется.реже.прочих,.после.чего.перейдем.к.примеру.ис- пользования.Avro.для.десериализации.ключей.и.значений.
Пользовательские сериализаторы
Возьмем.тот.же.пользовательский.объект,.который.мы.сериализовали.в.главе.3,.
и.напишем.для.него.десериализатор:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}

Десериализаторы 115
Пользовательский.десериализатор.выглядит.следующим.образом:
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerDeserializer implements
Deserializer {
@Override public void configure(Map configs, boolean isKey) {
// настраивать нечего
}
@Override public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data received by
IntegerDeserializer is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
String nameSize = buffer.getInt();
byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name);
} catch (Exception e) {
throw new SerializationException("Error when serializing
Customer to byte[] " + e);
}
}
@Override public void close() {
// закрывать нечего
}
}
.Потребителю.требуется.также.реализация.класса.
Customer
,.причем.как.класс,.
так.и.сериализатор.должны.совпадать.в.приложении-производителе.и.при- ложении-потребителе..В.большой.компании.со.множеством.потребителей.

116 Глава 4 • Потребители Kafka: чтение данных из Kafka и.производителей,.совместно.работающих.с.данными,.это.представляет.собой.
непростую.задачу.
.Мы.просто.меняем.логику.сериализатора.на.противоположную.ей.—.извлекаем.
идентификатор.и.имя.покупателя.из.байтового.массива.и.используем.их.для.
формирования.нужного.объекта.
Использующий.этот.сериализатор.код.потребителя.будет.выглядеть.примерно.так:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.CustomerDeserializer");
KafkaConsumer consumer =
new KafkaConsumer<>(props);
consumer.subscribe("customerCountries")
while (true) {
ConsumerRecords records =
consumer.poll(100);
for (ConsumerRecord record : records)
{
System.out.println("current customer Id: " +
record.value().getID() + " and current customer name: " + record.value().getName());
}
}
Важно.отметить,.что.реализовывать.пользовательские.сериализаторы.и.десери- ализаторы.не.рекомендуется..Такое.решение.приводит.к.сильному.сцеплению.
производителей.и.потребителей,.ненадежно.и.чревато.возникновением.ошибок..
Лучше.воспользоваться.стандартным.форматом.сообщений,.например,.JSON,.
Thrift,.Protobuf.или.Avro..Сейчас.мы.рассмотрим.использование.десериализаторов.
Avro.в.потребителе.Kafka..Основные.сведения.о.библиотеке.Apache.Avro,.ее.схемах.
и.совместимости.схем.приведены.в.главе.3.
Использование десериализации Avro в потребителе Kafka
Предположим,.что.мы.используем.показанный.в.главе.3.класс.
Customer
..Чтобы.
получать.такие.объекты.из.Kafka,.необходимо.реализовать.примерно.такое.при- ложение-потребитель:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer",

Автономный потребитель: зачем и как использовать потребитель без группы 117
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer"); props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"
KafkaConsumer consumer = new
KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
ConsumerRecords records =
consumer.poll(1000); for (ConsumerRecord record: records) {
System.out.println("Current customer name is: " +
record.value().getName());
}
consumer.commitSync();
}-
.Для.десериализации.сообщений.Avro.используем.класс.
KafkaAvroDeserializer schema.registry.url
.—.параметр,.указывающий.на.место.хранения.схем..С.его.
помощью.потребитель.может.использовать.зарегистрированную.производите- лем.схему.для.десериализации.сообщения.
.Указываем.сгенерированный.класс.
Customer
.в.качестве.типа.значения.записи.
record.value()
.представляет.собой.экземпляр.класса.
Customer
,.и.его.можно.
использовать.соответствующим.образом.
Автономный потребитель: зачем и как использовать потребитель без группы
До.сих.пор.мы.обсуждали.группы.потребителей,.в.которых.потребителям.автома- тически.назначаются.разделы.и.которые.автоматически.подвергаются.перебалан- сировке.при.добавлении.или.удалении.потребителей..Обычно.такое.поведение.—.
именно.то,.что.требуется,.но.в.некоторых.случаях.хочется.чего-то.более.простого..
Иногда.у.вас.заведомо.один.потребитель,.которому.нужно.всегда.читать.данные.из.
всех.разделов.темы.или.из.конкретного.раздела.темы..В.этом.случае.оснований.для.
организации.группы.потребителей.или.перебалансировки.нет,.достаточно.просто.
назначить.потребителю.соответствующие.тему.и/или.разделы,.получать.сообщения.
и.периодически.фиксировать.смещения.
Если.вы.точно.знаете,.какие.разделы.должен.читать.потребитель,.то.не.подписыва-
етесь.на.тему,.а.просто.назначаете.себе.несколько.разделов..Пользователь.может.
или.подписываться.на.темы.и.состоять.в.группе.потребителей,.или.назначать.себе.
разделы,.но.не.то.и.другое.одновременно.

118 Глава 4 • Потребители Kafka: чтение данных из Kafka
Вот.пример,.в.котором.потребитель.назначает.себе.все.разделы.конкретной.темы.
и.получает.из.них.сообщения:
List partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic"); if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(),
partition.partition()));
consumer.assign(partitions); while (true) {
ConsumerRecords records =
consumer.poll(1000);
for (ConsumerRecord record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitSync();
}
}
.Начинаем.с.запроса.у.кластера.доступных.в.данной.теме.разделов..Если.вы.со- бираетесь.получать.только.данные.из.конкретного.раздела,.то.можете.эту.часть.
пропустить.
.Выяснив,.какие.разделы.нам.нужны,.вызываем.метод.
assign()
,.передавая.ему.
их.список.
Если.не.считать.отсутствия.перебалансировок.и.необходимости.вручную.искать.
разделы,.все.остальное.происходит.как.обычно..Не.забывайте,.что.при.добавлении.
в.тему.новых.разделов.потребитель.уведомлен.не.будет..Об.этом.вам.придется.
позаботиться.самим,.периодически.обращаясь.к.
consumer.partitionsFor()
.или.
просто.перезапуская.приложение.при.добавлении.разделов.
Старые API потребителей
В.этой.главе.мы.обсудили.Java-клиент.
KafkaConsumer
,.включенный.в.пакет.
org.apa- che.kafka.clients
..Однако.на.момент.написания.данной.книги.в.Apache.Kafka.все.
еще.есть.два.более.старых.клиента,.написанных.на.языке.Scala.и.включенных.
в.пакет.
kafka.consumer
,.являющийся.частью.базового.модуля.Kafka..Один.из.этих.
потребителей.носит.название.
SimpleConsumer
.(и.он.не.так.уж.прост)..
SimpleConsumer представляет.собой.тонкий.адаптер.для.API.Kafka,.позволяющий.получать.данные.
из.конкретных.разделов.и.с.конкретных.смещений..Еще.один.старый.API.называ- ется.высокоуровневым.потребителем.или.
ZookeeperConsumerConnector
..Высоко-

Резюме
1   ...   10   11   12   13   14   15   16   17   ...   39


написать администратору сайта