Главная страница

Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


Скачать 7.59 Mb.
НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Дата21.06.2022
Размер7.59 Mb.
Формат файлаpdf
Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
ТипДокументы
#609074
страница10 из 39
1   ...   6   7   8   9   10   11   12   13   ...   39
75
batch.size
При.отправлении.в.один.раздел.нескольких.записей.производитель.соберет.их.
в.один.пакет..Этот.параметр.определяет.объем.памяти.в.байтах.(не.число.сообще- ний!).для.каждого.пакета..По.заполнении.пакета.все.входящие.в.него.сообщения.
отправляются..Но.это.не.значит,.что.производитель.будет.ждать.наполнения.паке- та..Он.может.отправлять.наполовину.полные.пакеты.и.даже.пакеты,.содержащие.
лишь.одно.сообщение..Следовательно,.задание.слишком.большого.размера.пакета.
приведет.не.к.задержкам.отправки,.а.лишь.к.использованию.большего.количества.
памяти.для.пакетов..Задание.слишком.маленького.размера.пакета.обусловит.до- полнительный.расход.памяти,.поскольку.производителю.придется.отправлять.
сообщения.чаще.
linger.ms
Параметр.
linger.ms
.управляет.длительностью.ожидания.дополнительных.сообще- ний.перед.отправкой.текущего.пакета..
KafkaProducer
.отправляет.пакет.сообще- ний.или.при.наполнении.текущего.пакета,.или.по.достижении.лимита.
linger.ms
По.умолчанию.производитель.будет.отправлять.сообщения,.как.только.появится.
свободный.поток.для.этого,.даже.если.в.пакете.содержится.лишь.одно.сообщение..
Устанавливая.параметр.
linger.ms
.в.значение.больше.0,.мы.указываем.производи- телю.на.необходимость.подождать.несколько.миллисекунд,.чтобы.перед.отправкой.
пакета.брокерам.в.него.были.добавлены.дополнительные.сообщения..Это.повышает.
длительность.задержки,.но.повышает.и.пропускную.способность,.поскольку.чем.
больше.сообщений.отправляются.одновременно,.тем.меньше.накладные.расходы.
из.расчета.на.одно.сообщение.
client.id
Значение.этого.параметра.может.быть.любой.строкой..Его.впоследствии.будут.
использовать.брокеры.для.идентификации.отправленных.клиентом.сообщений..
Применяется.для.журналирования.и.показателей,.а.также.задания.квот.
max.in.flight.requests.per.connection
Управляет.количеством.сообщений,.которые.производитель.может.отправить.
серверу,.не.получая.ответов..Высокое.значение.этого.параметра.приведет.к.по- вышенному.использованию.памяти,.но.одновременно.к.увеличению.пропускной.
способности..Однако.слишком.высокое.значение.уменьшит.пропускную.способ- ность.вследствие.снижения.эффективности.пакетной.обработки..Равное.1.значение.
этого.параметра.гарантирует.запись.сообщений.в.брокер.в.порядке.их.отправки.
даже.в.случае.повторов.отправки.

76 Глава 3 • Производители Kafka: запись сообщений в Kafka timeout.ms, request.timeout.ms и metadata.fetch.timeout.ms
Эти.параметры.управляют.длительностью.ожидания.производителем.ответа.
от.сервера.при.отправке.данных.(
request.timeout.ms
).и.запросе.метаданных,.
например,.текущих.ведущих.узлов.разделов,.в.которые.производится.запись.
(
metada ta.fetch.timeout.ms
)..Если.время.ожидания.истекло,.а.ответ.получен.
не.был,.производитель.или.повторит.попытку.отправки,.или.вернет.ошибку.посред- ством.генерации.исключения.или.функции.обратного.вызова..Параметр.
timeout.ms управляет.длительностью.ожидания.брокером.от.синхронизируемых.реплик.под- тверждения.того,.что.сообщение.получено,.в.соответствии.с.параметром.
acks
..Брокер.
вернет.ошибку,.если.время.ожидания.истечет,.а.подтверждения.получены.не.будут.
max.block.ms
Управляет.длительностью.блокировки.при.вызове.производителем.метода.
send()
и.запросе.явным.образом.метаданных.посредством.вызова.
partitionsFor()
..Эти.ме- тоды.выполняют.блокировку.при.переполнении.буфера.отправки.производителя.
или.недоступности.метаданных..По.истечении.времени.ожидания.
max.block.ms генерирует.соответствующее.исключение.
max.request.size
Этот.параметр.задает.максимальный.размер.отправляемого.производителем.запро- са..Он.ограничивает.как.максимальный.размер.сообщения,.так.и.число.сообщений,.
отсылаемых.в.одном.запросе..Например,.при.максимальном.размере.сообщения.
по.умолчанию.1.Мбайт.производитель.может.как.максимум.отправить.одно.со- общение.размером.1.Мбайт.или.скомпоновать.в.один.запрос.1024.сообщения.по.
1.Кбайт.каждое..Кроме.того,.у.брокеров.тоже.есть.ограничение.максимального.
размера.принимаемого.сообщения.(
message.max.bytes
)..Обычно.рекомендуется.
делать.значения.этих.параметров.одинаковыми,.чтобы.производитель.не.отправлял.
сообщения.неприемлемого.для.брокера.размера.
receive.buffer.bytes и send.buffer.bytes
Это.размеры.TCP-буферов.отправки.и.получения,.используемых.сокетами.при.
записи.и.чтении.данных..Если.значение.этих.параметров.равно.–1,.будут.использо- ваться.значения.по.умолчанию.операционной.системы..Рекомендуется.повышать.
их.в.случае,.когда.производители.или.потребители.взаимодействуют.с.брокерами.
из.другого.ЦОД,.поскольку.подобные.сетевые.подключения.обычно.характеризу- ются.более.длительной.задержкой.и.низкой.пропускной.способностью.сети.

Сериализаторы 77
Гарантии упорядоченности
Apache Kafka сохраняет порядок сообщений внутри раздела. Это значит, что брокер запишет в раздел сообщения, отправленные из производителя в опре- деленном порядке, в том же порядке и все потребители будут читать их. Для некоторых сценариев использования упорядоченность играет важную роль.
Помещение на счет 100 долларов и их снятие в дальнейшем сильно отличается от снятия с последующим помещением! Однако для некоторых сценариев ис- пользования порядок не имеет значения.
Установка для параметра retries ненулевого значения, а для max.in.flights.re- quests.per.connection — значения, превышающего 1, означает, что брокер, которому не удалось записать первый пакет сообщений, сможет успешно запи- сать второй (уже отправленный), после чего вернуться к первому и успешно повторить попытку его записи, в результате чего порядок будет изменен на противоположный.
Обычно в надежной системе нельзя взять и установить нулевое число повторов, так что если обеспечение упорядоченности критично важно, рекомендуем задать max.in.flight.requests.per.connection=1, чтобы гарантировать, что во время повтора попытки не будут отправляться другие сообщения, поскольку это может привести к изменению порядка на противоположный и существенному ограничению про- пускной способности производителя, поэтому так следует поступать лишь тогда, когда порядок важен.
Сериализаторы
Как.мы.видели.в.предыдущих.примерах,.конфигурация.производителя.обяза- тельно.включает.сериализаторы..Мы.уже.рассматривали.применение.сериали- затора.по.умолчанию.—.
StringSerializer
..Kafka.также.включает.сериализаторы.
для.целых.чисел.и.байтовых.массивов,.но.это.охватывает.далеко.не.все.сценарии.
использования..В.конце.концов.вам.понадобится.сериализовывать.более.общие.
виды.записей.
Начнем.с.написания.пользовательского.сериализатора,.после.чего.покажем.реко- мендуемый.альтернативный.вариант.—.сериализатор.Avro.
Пользовательские сериализаторы
Когда.нужно.отправить.в.Kafka.объект.более.сложный,.чем.просто.строка.или.цело- численное.значение,.можно.или.воспользоваться.для.создания.записей.универсаль- ной.библиотекой.сериализации,.например,.Avro,.Thrift.либо.Protobuf,.или.создать.
пользовательский.сериализатор.для.уже.используемых.объектов..Мы.настоятельно.
рекомендуем.вам.воспользоваться.универсальной.библиотекой.сериализации..
Но.чтобы.разобраться,.как.работают.сериализаторы.и.почему.лучше.использовать.
библиотеку.сериализации,.посмотрим,.что.требуется.для.написания.собственного.
сериализатора.

78 Глава 3 • Производители Kafka: запись сообщений в Kafka
Пусть.вместо.того,.чтобы.записывать.только.имя.покупателя,.вы.создали.простой.
класс.
Customer
:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
Теперь.предположим,.что.вам.нужно.создать.пользовательский.сериализатор.для.
этого.класса..Он.будет.выглядеть.примерно.так:
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer {
@Override public void configure(Map configs, boolean isKey) {
// нечего настраивать
}
@Override
/**
Мы сериализуем объект Customer как:
4-байтное целое число, соответствующее customerId
4-байтное целое число, соответствующее длине customerName в байтах в кодировке UTF-8 (0, если имя не заполнено)
N байт, соответствующих customerName в кодировке UTF-8
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializedName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;

Сериализаторы 79
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override public void close() {
// нечего закрывать
}
}
Настройка.производителя.с.использованием.этого.
CustomerSerializer
.дает.воз- можность.определить.тип.
ProducerRecord,.
Customer>
.и.отправлять.данные.
типа.
Customer
,.непосредственно.передавая.объекты.
Customer
.производителю..
Приведенный.пример.очень.прост,.но.из.него.можно.понять,.насколько.ненадежен.
такой.код..Если,.например,.у.нас.слишком.много.покупателей.и.понадобится.по- менять.тип.
customerID
.на.
Long
.или.добавить.в.тип.
Customer
.поле.
startDate
,.то.мы.
столкнемся.с.непростой.проблемой.поддержания.совместимости.между.старым.
и.новым.форматами.сообщения..Отладка.проблем.совместимости.между.различ- ными.версиями.сериализаторов.и.десериализаторов.—.весьма.непростая.задача,.
ведь.приходится.сравнивать.неформатированные.байтовые.массивы..Что.еще.хуже,.
если.нескольким.группам.разработчиков.одной.компании.понадобится.записывать.
данные.о.покупателях.в.Kafka,.им.придется.использовать.одинаковые.сериализа- торы.и.менять.код.совершенно.синхронно.
Поэтому.мы.рекомендуем.использовать.существующие.сериализаторы.и.десери- ализаторы,.например,.JSON,.Apache.Avro,.Thrift.или.Protobuf..В.следующем.раз- деле.расскажем.про.Apache.Avro,.а.затем.покажем,.как.сериализовать.записи.Avro.
и.отправлять.их.в.Kafka.
Сериализация с помощью Apache Avro
Apache.Avro.—.независимый.от.языка.программирования.формат.сериализации.
данных..Этот.проект.создан.Дугом.Каттингом.(Doug.Cutting).для.обеспечения.
возможности.использования.данных.совместно.с.большим.количеством.других.
людей.

80 Глава 3 • Производители Kafka: запись сообщений в Kafka
Данные.Avro.описываются.независимой.от.языка.схемой..Она.обычно.выполняется.
в.формате.JSON,.а.сериализация.производится.в.двоичные.файлы,.хотя.сериализа- ция.в.JSON.тоже.поддерживается..При.записи.и.чтении.файлов.Avro.предполагает.
наличие.схемы,.обычно.во.вложенном.в.файлы.виде.
Одна.из.самых.интересных.возможностей.Avro,.благодаря.которой.он.так.хорошо.
подходит.для.использования.в.системах.обмена.сообщениями.вроде.Kafka,.состоит.
в.том,.что.при.переходе.записывающего.сообщения.приложения.на.новую.схему.
читающее.данные.приложение.может.продолжать.обработку.сообщений.без.каких- либо.изменений.или.обновлений.
Пусть.исходная.схема.выглядела.следующим.образом:
{"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "faxNumber", "type": ["null", "string"], "default": "null"}
]
}
.Поля.
id
.и.
name
.—.обязательные,.а.номер.факса.—.необязателен,.по.умолчанию.
это.неопределенное.значение.
Допустим,.что.эта.схема.использовалась.в.течение.нескольких.месяцев.и.в.таком.
формате.было.сгенерировано.несколько.терабайт.данных..Теперь.предположим,.
что.в.новой.версии.мы.решили.признать,.что.наступил.XXI.век,.и.вместо.номера.
факса.будем.использовать.поле.
email
Новая.схема.будет.выглядеть.вот.так:
{"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": "null"}
]
}
Теперь.после.обновления.до.новой.версии.в.старых.записях.будет.содержаться.поле.
faxNumber
,.а.в.новых.—.
email
..Во.многих.организациях.обновления.выполняются.
медленно,.на.протяжении.многих.месяцев..Так.что.придется.продумать.обработку.
в.Kafka.всех.событий.еще.не.обновленными.приложениями,.использующими.но- мера.факса,.и.уже.обновленными.—.с.адресом.электронной.почты.
Выполняющее.чтение.приложение.должно.содержать.вызовы.таких.методов,.как.
getName()
,.
getId()
.и.
getFaxNumber()
..Наткнувшись.на.записанное.по.новой.схеме.

Сериализаторы 81
сообщение,.методы.
getName()
.и.
getId()
.продолжат.работать.без.всяких.изменений,.
но.метод.
getFaxNumber()
.вернет.
null
,.поскольку.сообщение.не.содержит.номера.
факса.
Теперь.предположим,.что.мы.модифицировали.читающее.приложение.и.в.нем.
вместо.метода.
getFaxNumber()
.теперь.есть.метод.
getEmail()
..Наткнувшись.на.за- писанное.по.старой.схеме.сообщение,.метод.
getEmail()
.вернет.
null
,.поскольку.
в.старых.сообщениях.нет.адреса.электронной.почты.
Этот.пример.иллюстрирует.выгоды.использования.Avro:.хотя.мы.и.поменяли.
схему.сообщений.без.изменения.всех.читающих.данные.приложений,.никаких.ис- ключений.или.серьезных.ошибок.не.возникло,.как.не.понадобилось.и.выполнять.
дорогостоящие.обновления.существующих.данных.
Однако.у.этого.сценария.есть.два.нюанса.
‰
‰
Используемая.для.записи.данных.схема.и.схема,.которую.ожидает.читающее.
данные.приложение,.должны.быть.совместимыми..В.документации.Avro.опи- саны.правила.совместимости.(
http://www.bit.ly/2t9FmEb
).
‰
‰
У.десериализатора.должен.быть.доступ.к.схеме,.использованной.при.записи.
данных,.даже.если.она.отличается.от.схемы,.которую.ожидает.обращающееся.
к.данным.приложение..В.файлах.Avro.схема.для.записи.включается.в.сам.файл,.
но.для.сообщений.Kafka.есть.более.удачный.способ..Мы.рассмотрим.его.далее.
Использование записей Avro с Kafka
В.отличие.от.файлов.Avro,.при.использовании.которых.хранение.всей.схемы.
в.файле.данных.дает.довольно.умеренные.накладные.расходы,.хранение.всей.схемы.
в.каждой.записи.обычно.более.чем.вдвое.увеличивает.размер.последней..Однако.
в.Avro.при.чтении.записи.необходима.полная.схема,.так.что.нам.нужно.поместить.
ее.куда-то.в.другое.место..Для.этого.мы,.придерживаясь.распространенного.ар- хитектурного.паттерна,.воспользуемся.реестром схем (Schema Registry)..Реестр.
схем.не.включен.в.Kafka,.но.существует.несколько.его.вариантов.с.открытым.
исходным.кодом..Для.нашего.примера.воспользуемся.Confluent.Schema.Registry..
Код.Confluent.Schema.Registry.можно.найти.на.GitHub.(
https://github.com/confluentinc/
schema-registry
),.его.также.можно.установить.в.виде.части.платформы.Confluent.
(
http://docs.confluent.io/current/installation.html
)..Если.вы.решите.использовать.этот.ре- естр.схем,.рекомендуем.заглянуть.в.его.документацию.(
http://docs.confluent.io/current/
schema-registry/docs/index.html
).
Идея.состоит.в.хранении.в.реестре.всех.используемых.для.записи.данных.в.Kafka.
схем..В.этом.случае.можно.хранить.в.отправляемой.в.Kafka.записи.только.иденти- фикатор.схемы..Потребители.могут.в.дальнейшем.извлечь.запись.из.реестра.схем.
по.идентификатору.и.десериализовать.данные..Самое.главное,.что.вся.работа.—.
сохранение.схемы.в.реестре.и.извлечение.ее.при.необходимости.—.выполняется.

82 Глава 3 • Производители Kafka: запись сообщений в Kafka в.сериализаторах.и.десериализаторах..Код.производителя.отправляемых.в.Kafka.
сообщений.просто.использует.сериализатор.Avro.так.же,.как.использовал.бы.любой.
другой.сериализатор..Этот.процесс.показан.на.рис..3.2.
Рис. 3.2. Блок-схема сериализации и десериализации записей Avro
Вот.пример.отправки.сгенерированных.Avro.объектов.в.Kafka.(см..документацию.
Avro.по.адресу.
http://avro.apache.org/docs/current/
,.чтобы.получить.информацию.о.ге- нерации.кода.с.его.помощью):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer producer = new KafkaProducer Customer>(props);
// Генерация новых событий продолжается вплоть до нажатия ctrl+c while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " +
customer.toString());
ProducerRecord record =
new ProducerRecord<>(topic, customer.getId(), customer); producer.send(record);
}

Сериализаторы 83
.Для.сериализации.объектов.с.помощью.Avro.мы.используем.класс.
KafkaAvroSe- rializer
..Обратите.внимание.на.то,.что.
AvroSerializer
.может.работать.с.про- стыми.типами.данных,.именно.поэтому.в.дальнейшем.объект.
String
.использу- ется.в.качестве.ключа.записи.и.объект.
Customer
.—.в.качестве.значения.
schema.registry.url
.—.новый.параметр,.указывающий.на.место.хранения.схем.
Customer
.—.сгенерированный.объект..Мы.сообщаем.производителю,.что.значе- ние.наших.записей.будет.представлять.собой.объект.
Customer
.Мы.также.создаем.экземпляр.класса.
ProducerRecord
.с.объектом.
Customer
.в.ка- честве.типа.значения.и.передаем.объект.
Customer
.при.создании.новой.записи.
.Вот.и.все..Мы.отправили.запись.с.объектом.
Customer
,.а.
KafkaAvroSerializer позаботится.обо.всем.остальном.
Но.что.если.нам.понадобится.использовать.обобщенные.объекты.Avro.вместо.
сгенерированных.объектов.Avro?.Никаких.проблем..В.этом.случае.нужно.всего.
лишь.указать.схему:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);
String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\",
\"type\": [\"null\", \"string\"],
\"default\":\"null\" }" +
"]}";
Producer producer =
new KafkaProducer(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example " + nCustomers + "@example.com";
GenericRecord customer = new GenericData.Record(schema); customer.put("id", nCustomers);
customer.put("name", name);
customer.put("email", email);
ProducerRecord data =

1   ...   6   7   8   9   10   11   12   13   ...   39


написать администратору сайта