Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
75 batch.size При.отправлении.в.один.раздел.нескольких.записей.производитель.соберет.их. в.один.пакет..Этот.параметр.определяет.объем.памяти.в.байтах.(не.число.сообще- ний!).для.каждого.пакета..По.заполнении.пакета.все.входящие.в.него.сообщения. отправляются..Но.это.не.значит,.что.производитель.будет.ждать.наполнения.паке- та..Он.может.отправлять.наполовину.полные.пакеты.и.даже.пакеты,.содержащие. лишь.одно.сообщение..Следовательно,.задание.слишком.большого.размера.пакета. приведет.не.к.задержкам.отправки,.а.лишь.к.использованию.большего.количества. памяти.для.пакетов..Задание.слишком.маленького.размера.пакета.обусловит.до- полнительный.расход.памяти,.поскольку.производителю.придется.отправлять. сообщения.чаще. linger.ms Параметр. linger.ms .управляет.длительностью.ожидания.дополнительных.сообще- ний.перед.отправкой.текущего.пакета.. KafkaProducer .отправляет.пакет.сообще- ний.или.при.наполнении.текущего.пакета,.или.по.достижении.лимита. linger.ms По.умолчанию.производитель.будет.отправлять.сообщения,.как.только.появится. свободный.поток.для.этого,.даже.если.в.пакете.содержится.лишь.одно.сообщение.. Устанавливая.параметр. linger.ms .в.значение.больше.0,.мы.указываем.производи- телю.на.необходимость.подождать.несколько.миллисекунд,.чтобы.перед.отправкой. пакета.брокерам.в.него.были.добавлены.дополнительные.сообщения..Это.повышает. длительность.задержки,.но.повышает.и.пропускную.способность,.поскольку.чем. больше.сообщений.отправляются.одновременно,.тем.меньше.накладные.расходы. из.расчета.на.одно.сообщение. client.id Значение.этого.параметра.может.быть.любой.строкой..Его.впоследствии.будут. использовать.брокеры.для.идентификации.отправленных.клиентом.сообщений.. Применяется.для.журналирования.и.показателей,.а.также.задания.квот. max.in.flight.requests.per.connection Управляет.количеством.сообщений,.которые.производитель.может.отправить. серверу,.не.получая.ответов..Высокое.значение.этого.параметра.приведет.к.по- вышенному.использованию.памяти,.но.одновременно.к.увеличению.пропускной. способности..Однако.слишком.высокое.значение.уменьшит.пропускную.способ- ность.вследствие.снижения.эффективности.пакетной.обработки..Равное.1.значение. этого.параметра.гарантирует.запись.сообщений.в.брокер.в.порядке.их.отправки. даже.в.случае.повторов.отправки. 76 Глава 3 • Производители Kafka: запись сообщений в Kafka timeout.ms, request.timeout.ms и metadata.fetch.timeout.ms Эти.параметры.управляют.длительностью.ожидания.производителем.ответа. от.сервера.при.отправке.данных.( request.timeout.ms ).и.запросе.метаданных,. например,.текущих.ведущих.узлов.разделов,.в.которые.производится.запись. ( metada ta.fetch.timeout.ms )..Если.время.ожидания.истекло,.а.ответ.получен. не.был,.производитель.или.повторит.попытку.отправки,.или.вернет.ошибку.посред- ством.генерации.исключения.или.функции.обратного.вызова..Параметр. timeout.ms управляет.длительностью.ожидания.брокером.от.синхронизируемых.реплик.под- тверждения.того,.что.сообщение.получено,.в.соответствии.с.параметром. acks ..Брокер. вернет.ошибку,.если.время.ожидания.истечет,.а.подтверждения.получены.не.будут. max.block.ms Управляет.длительностью.блокировки.при.вызове.производителем.метода. send() и.запросе.явным.образом.метаданных.посредством.вызова. partitionsFor() ..Эти.ме- тоды.выполняют.блокировку.при.переполнении.буфера.отправки.производителя. или.недоступности.метаданных..По.истечении.времени.ожидания. max.block.ms генерирует.соответствующее.исключение. max.request.size Этот.параметр.задает.максимальный.размер.отправляемого.производителем.запро- са..Он.ограничивает.как.максимальный.размер.сообщения,.так.и.число.сообщений,. отсылаемых.в.одном.запросе..Например,.при.максимальном.размере.сообщения. по.умолчанию.1.Мбайт.производитель.может.как.максимум.отправить.одно.со- общение.размером.1.Мбайт.или.скомпоновать.в.один.запрос.1024.сообщения.по. 1.Кбайт.каждое..Кроме.того,.у.брокеров.тоже.есть.ограничение.максимального. размера.принимаемого.сообщения.( message.max.bytes )..Обычно.рекомендуется. делать.значения.этих.параметров.одинаковыми,.чтобы.производитель.не.отправлял. сообщения.неприемлемого.для.брокера.размера. receive.buffer.bytes и send.buffer.bytes Это.размеры.TCP-буферов.отправки.и.получения,.используемых.сокетами.при. записи.и.чтении.данных..Если.значение.этих.параметров.равно.–1,.будут.использо- ваться.значения.по.умолчанию.операционной.системы..Рекомендуется.повышать. их.в.случае,.когда.производители.или.потребители.взаимодействуют.с.брокерами. из.другого.ЦОД,.поскольку.подобные.сетевые.подключения.обычно.характеризу- ются.более.длительной.задержкой.и.низкой.пропускной.способностью.сети. Сериализаторы 77 Гарантии упорядоченности Apache Kafka сохраняет порядок сообщений внутри раздела. Это значит, что брокер запишет в раздел сообщения, отправленные из производителя в опре- деленном порядке, в том же порядке и все потребители будут читать их. Для некоторых сценариев использования упорядоченность играет важную роль. Помещение на счет 100 долларов и их снятие в дальнейшем сильно отличается от снятия с последующим помещением! Однако для некоторых сценариев ис- пользования порядок не имеет значения. Установка для параметра retries ненулевого значения, а для max.in.flights.re- quests.per.connection — значения, превышающего 1, означает, что брокер, которому не удалось записать первый пакет сообщений, сможет успешно запи- сать второй (уже отправленный), после чего вернуться к первому и успешно повторить попытку его записи, в результате чего порядок будет изменен на противоположный. Обычно в надежной системе нельзя взять и установить нулевое число повторов, так что если обеспечение упорядоченности критично важно, рекомендуем задать max.in.flight.requests.per.connection=1, чтобы гарантировать, что во время повтора попытки не будут отправляться другие сообщения, поскольку это может привести к изменению порядка на противоположный и существенному ограничению про- пускной способности производителя, поэтому так следует поступать лишь тогда, когда порядок важен. Сериализаторы Как.мы.видели.в.предыдущих.примерах,.конфигурация.производителя.обяза- тельно.включает.сериализаторы..Мы.уже.рассматривали.применение.сериали- затора.по.умолчанию.—. StringSerializer ..Kafka.также.включает.сериализаторы. для.целых.чисел.и.байтовых.массивов,.но.это.охватывает.далеко.не.все.сценарии. использования..В.конце.концов.вам.понадобится.сериализовывать.более.общие. виды.записей. Начнем.с.написания.пользовательского.сериализатора,.после.чего.покажем.реко- мендуемый.альтернативный.вариант.—.сериализатор.Avro. Пользовательские сериализаторы Когда.нужно.отправить.в.Kafka.объект.более.сложный,.чем.просто.строка.или.цело- численное.значение,.можно.или.воспользоваться.для.создания.записей.универсаль- ной.библиотекой.сериализации,.например,.Avro,.Thrift.либо.Protobuf,.или.создать. пользовательский.сериализатор.для.уже.используемых.объектов..Мы.настоятельно. рекомендуем.вам.воспользоваться.универсальной.библиотекой.сериализации.. Но.чтобы.разобраться,.как.работают.сериализаторы.и.почему.лучше.использовать. библиотеку.сериализации,.посмотрим,.что.требуется.для.написания.собственного. сериализатора. 78 Глава 3 • Производители Kafka: запись сообщений в Kafka Пусть.вместо.того,.чтобы.записывать.только.имя.покупателя,.вы.создали.простой. класс. Customer : public class Customer { private int customerID; private String customerName; public Customer(int ID, String name) { this.customerID = ID; this.customerName = name; } public int getID() { return customerID; } public String getName() { return customerName; } } Теперь.предположим,.что.вам.нужно.создать.пользовательский.сериализатор.для. этого.класса..Он.будет.выглядеть.примерно.так: import org.apache.kafka.common.errors.SerializationException; import java.nio.ByteBuffer; import java.util.Map; public class CustomerSerializer implements Serializer @Override public void configure(Map configs, boolean isKey) { // нечего настраивать } @Override /** Мы сериализуем объект Customer как: 4-байтное целое число, соответствующее customerId 4-байтное целое число, соответствующее длине customerName в байтах в кодировке UTF-8 (0, если имя не заполнено) N байт, соответствующих customerName в кодировке UTF-8 */ public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (data == null) return null; else { if (data.getName() != null) { serializedName = data.getName().getBytes("UTF-8"); stringSize = serializedName.length; Сериализаторы 79 } else { serializedName = new byte[0]; stringSize = 0; } } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getID()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } } @Override public void close() { // нечего закрывать } } Настройка.производителя.с.использованием.этого. CustomerSerializer .дает.воз- можность.определить.тип. ProducerRecord Customer> .и.отправлять.данные. типа. Customer ,.непосредственно.передавая.объекты. Customer .производителю.. Приведенный.пример.очень.прост,.но.из.него.можно.понять,.насколько.ненадежен. такой.код..Если,.например,.у.нас.слишком.много.покупателей.и.понадобится.по- менять.тип. customerID .на. Long .или.добавить.в.тип. Customer .поле. startDate ,.то.мы. столкнемся.с.непростой.проблемой.поддержания.совместимости.между.старым. и.новым.форматами.сообщения..Отладка.проблем.совместимости.между.различ- ными.версиями.сериализаторов.и.десериализаторов.—.весьма.непростая.задача,. ведь.приходится.сравнивать.неформатированные.байтовые.массивы..Что.еще.хуже,. если.нескольким.группам.разработчиков.одной.компании.понадобится.записывать. данные.о.покупателях.в.Kafka,.им.придется.использовать.одинаковые.сериализа- торы.и.менять.код.совершенно.синхронно. Поэтому.мы.рекомендуем.использовать.существующие.сериализаторы.и.десери- ализаторы,.например,.JSON,.Apache.Avro,.Thrift.или.Protobuf..В.следующем.раз- деле.расскажем.про.Apache.Avro,.а.затем.покажем,.как.сериализовать.записи.Avro. и.отправлять.их.в.Kafka. Сериализация с помощью Apache Avro Apache.Avro.—.независимый.от.языка.программирования.формат.сериализации. данных..Этот.проект.создан.Дугом.Каттингом.(Doug.Cutting).для.обеспечения. возможности.использования.данных.совместно.с.большим.количеством.других. людей. 80 Глава 3 • Производители Kafka: запись сообщений в Kafka Данные.Avro.описываются.независимой.от.языка.схемой..Она.обычно.выполняется. в.формате.JSON,.а.сериализация.производится.в.двоичные.файлы,.хотя.сериализа- ция.в.JSON.тоже.поддерживается..При.записи.и.чтении.файлов.Avro.предполагает. наличие.схемы,.обычно.во.вложенном.в.файлы.виде. Одна.из.самых.интересных.возможностей.Avro,.благодаря.которой.он.так.хорошо. подходит.для.использования.в.системах.обмена.сообщениями.вроде.Kafka,.состоит. в.том,.что.при.переходе.записывающего.сообщения.приложения.на.новую.схему. читающее.данные.приложение.может.продолжать.обработку.сообщений.без.каких- либо.изменений.или.обновлений. Пусть.исходная.схема.выглядела.следующим.образом: {"namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "faxNumber", "type": ["null", "string"], "default": "null"} ] } .Поля. id .и. name .—.обязательные,.а.номер.факса.—.необязателен,.по.умолчанию. это.неопределенное.значение. Допустим,.что.эта.схема.использовалась.в.течение.нескольких.месяцев.и.в.таком. формате.было.сгенерировано.несколько.терабайт.данных..Теперь.предположим,. что.в.новой.версии.мы.решили.признать,.что.наступил.XXI.век,.и.вместо.номера. факса.будем.использовать.поле. Новая.схема.будет.выглядеть.вот.так: {"namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "email", "type": ["null", "string"], "default": "null"} ] } Теперь.после.обновления.до.новой.версии.в.старых.записях.будет.содержаться.поле. faxNumber ,.а.в.новых.—. ..Во.многих.организациях.обновления.выполняются. медленно,.на.протяжении.многих.месяцев..Так.что.придется.продумать.обработку. в.Kafka.всех.событий.еще.не.обновленными.приложениями,.использующими.но- мера.факса,.и.уже.обновленными.—.с.адресом.электронной.почты. Выполняющее.чтение.приложение.должно.содержать.вызовы.таких.методов,.как. getName() ,. getId() .и. getFaxNumber() ..Наткнувшись.на.записанное.по.новой.схеме. Сериализаторы 81 сообщение,.методы. getName() .и. getId() .продолжат.работать.без.всяких.изменений,. но.метод. getFaxNumber() .вернет. null ,.поскольку.сообщение.не.содержит.номера. факса. Теперь.предположим,.что.мы.модифицировали.читающее.приложение.и.в.нем. вместо.метода. getFaxNumber() .теперь.есть.метод. getEmail() ..Наткнувшись.на.за- писанное.по.старой.схеме.сообщение,.метод. getEmail() .вернет. null ,.поскольку. в.старых.сообщениях.нет.адреса.электронной.почты. Этот.пример.иллюстрирует.выгоды.использования.Avro:.хотя.мы.и.поменяли. схему.сообщений.без.изменения.всех.читающих.данные.приложений,.никаких.ис- ключений.или.серьезных.ошибок.не.возникло,.как.не.понадобилось.и.выполнять. дорогостоящие.обновления.существующих.данных. Однако.у.этого.сценария.есть.два.нюанса. Используемая.для.записи.данных.схема.и.схема,.которую.ожидает.читающее. данные.приложение,.должны.быть.совместимыми..В.документации.Avro.опи- саны.правила.совместимости.( http://www.bit.ly/2t9FmEb ). У.десериализатора.должен.быть.доступ.к.схеме,.использованной.при.записи. данных,.даже.если.она.отличается.от.схемы,.которую.ожидает.обращающееся. к.данным.приложение..В.файлах.Avro.схема.для.записи.включается.в.сам.файл,. но.для.сообщений.Kafka.есть.более.удачный.способ..Мы.рассмотрим.его.далее. Использование записей Avro с Kafka В.отличие.от.файлов.Avro,.при.использовании.которых.хранение.всей.схемы. в.файле.данных.дает.довольно.умеренные.накладные.расходы,.хранение.всей.схемы. в.каждой.записи.обычно.более.чем.вдвое.увеличивает.размер.последней..Однако. в.Avro.при.чтении.записи.необходима.полная.схема,.так.что.нам.нужно.поместить. ее.куда-то.в.другое.место..Для.этого.мы,.придерживаясь.распространенного.ар- хитектурного.паттерна,.воспользуемся.реестром схем (Schema Registry)..Реестр. схем.не.включен.в.Kafka,.но.существует.несколько.его.вариантов.с.открытым. исходным.кодом..Для.нашего.примера.воспользуемся.Confluent.Schema.Registry.. Код.Confluent.Schema.Registry.можно.найти.на.GitHub.( https://github.com/confluentinc/ schema-registry ),.его.также.можно.установить.в.виде.части.платформы.Confluent. ( http://docs.confluent.io/current/installation.html )..Если.вы.решите.использовать.этот.ре- естр.схем,.рекомендуем.заглянуть.в.его.документацию.( http://docs.confluent.io/current/ schema-registry/docs/index.html ). Идея.состоит.в.хранении.в.реестре.всех.используемых.для.записи.данных.в.Kafka. схем..В.этом.случае.можно.хранить.в.отправляемой.в.Kafka.записи.только.иденти- фикатор.схемы..Потребители.могут.в.дальнейшем.извлечь.запись.из.реестра.схем. по.идентификатору.и.десериализовать.данные..Самое.главное,.что.вся.работа.—. сохранение.схемы.в.реестре.и.извлечение.ее.при.необходимости.—.выполняется. 82 Глава 3 • Производители Kafka: запись сообщений в Kafka в.сериализаторах.и.десериализаторах..Код.производителя.отправляемых.в.Kafka. сообщений.просто.использует.сериализатор.Avro.так.же,.как.использовал.бы.любой. другой.сериализатор..Этот.процесс.показан.на.рис..3.2. Рис. 3.2. Блок-схема сериализации и десериализации записей Avro Вот.пример.отправки.сгенерированных.Avro.объектов.в.Kafka.(см..документацию. Avro.по.адресу. http://avro.apache.org/docs/current/ ,.чтобы.получить.информацию.о.ге- нерации.кода.с.его.помощью): Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", schemaUrl); String topic = "customerContacts"; int wait = 500; Producer // Генерация новых событий продолжается вплоть до нажатия ctrl+c while (true) { Customer customer = CustomerGenerator.getNext(); System.out.println("Generated customer " + customer.toString()); ProducerRecord new ProducerRecord<>(topic, customer.getId(), customer); producer.send(record); } Сериализаторы 83 .Для.сериализации.объектов.с.помощью.Avro.мы.используем.класс. KafkaAvroSe- rializer ..Обратите.внимание.на.то,.что. AvroSerializer .может.работать.с.про- стыми.типами.данных,.именно.поэтому.в.дальнейшем.объект. String .использу- ется.в.качестве.ключа.записи.и.объект. Customer .—.в.качестве.значения. schema.registry.url .—.новый.параметр,.указывающий.на.место.хранения.схем. Customer .—.сгенерированный.объект..Мы.сообщаем.производителю,.что.значе- ние.наших.записей.будет.представлять.собой.объект. Customer .Мы.также.создаем.экземпляр.класса. ProducerRecord .с.объектом. Customer .в.ка- честве.типа.значения.и.передаем.объект. Customer .при.создании.новой.записи. .Вот.и.все..Мы.отправили.запись.с.объектом. Customer ,.а. KafkaAvroSerializer позаботится.обо.всем.остальном. Но.что.если.нам.понадобится.использовать.обобщенные.объекты.Avro.вместо. сгенерированных.объектов.Avro?.Никаких.проблем..В.этом.случае.нужно.всего. лишь.указать.схему: Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", url); String schemaString = "{\"namespace\": \"customerManagement.avro\", \"type\": \"record\", " + "\"name\": \"Customer\"," + "\"fields\": [" + "{\"name\": \"id\", \"type\": \"int\"}," + "{\"name\": \"name\", \"type\": \"string\"}," + "{\"name\": \"email\", \"type\": [\"null\", \"string\"], \"default\":\"null\" }" + "]}"; Producer new KafkaProducer Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(schemaString); for (int nCustomers = 0; nCustomers < customers; nCustomers++) { String name = "exampleCustomer" + nCustomers; String email = "example " + nCustomers + "@example.com"; GenericRecord customer = new GenericData.Record(schema); customer.put("id", nCustomers); customer.put("name", name); customer.put("email", email); ProducerRecord |