Главная страница
Навигация по странице:

  • Реализация пользовательской стратегии секционирования.

  • Рис. 4.5.

  • ИЗМЕНЕНИЯ ЛОГИКИ РАБОТЫ КОНТРОЛЬНЫХ СИГНАЛОВ В ПОСЛЕДНИХ ВЕРСИЯХ KAFKA

  • Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly


    Скачать 7.59 Mb.
    НазваниеApache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
    Дата21.06.2022
    Размер7.59 Mb.
    Формат файлаpdf
    Имя файлаApache Kafka. Потоковая обработка и анализ данных.pdf
    ТипДокументы
    #609074
    страница11 из 39
    1   ...   7   8   9   10   11   12   13   14   ...   39
    84 Глава 3 • Производители Kafka: запись сообщений в Kafka new ProducerRecord GenericRecord>("customerContacts", name, customer);
    producer.send(data);
    }
    }
    .Мы.по-прежнему.используем.тот.же.класс.
    KafkaAvroSerializer
    .И.передаем.URI.того.же.реестра.схем.
    .Но.теперь.нам.приходится.указывать.схему.Avro,.поскольку.ее.уже.не.предо- ставляет.сгенерированный.Avro.объект.
    .Тип.объекта.теперь.
    GenericRecord
    ..Мы.инициализируем.его.своей.схемой.
    и.предназначенными.для.записи.данными.
    .Значение.
    ProducerRecord
    .представляет.собой.просто.объект.
    GenericRecord
    ,.
    содержащий.схему.и.данные..Сериализатор.будет.знать,.как.получить.из.этой.
    записи.схему.данных,.сохранить.ее.в.реестре.схем.и.сериализовать.данные.из.
    объекта.
    Разделы
    В.предыдущих.примерах.создаваемые.нами.объекты.
    ProducerRecord
    .включали.на- звание.темы,.ключ.и.значение..Сообщения.Kafka.представляют.собой.пары.«ключ/
    значение»,.и.хотя.можно.создавать.объекты.
    ProducerRecord
    .только.с.темой.и.зна- чением.или.с.неопределенным.значением.по.умолчанию.для.ключа,.большинство.
    приложений.отправляют.записи.с.ключами..Ключи.служат.для.двух.целей:.они.
    представляют.собой.дополнительную.информацию,.сохраняемую.вместе.с.сообще- нием,.и.на.их.основе.определяется,.в.какой.раздел.темы.записывать.сообщение..
    Все.сообщения.с.одинаковым.ключом.попадут.в.один.раздел..Это.значит,.что,.если.
    каждый.процесс.читает.лишь.часть.разделов.темы.(подробнее.об.этом.в.главе.4),.
    все.записи.для.конкретного.ключа.будет.читать.один.и.тот.же.процесс..Для.созда- ния.записи.типа.«ключ/значение».нужно.просто.создать.объект.
    ProducerRecord
    ,.
    вот.так:
    ProducerRecord record =
    new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
    При.создании.сообщений.с.неопределенным.значением.ключа.можно.просто.его.
    не.указывать:
    ProducerRecord record =
    new ProducerRecord<>("CustomerCountry", "USA");
    .В.этом.примере.ключ.будет.равен.
    null
    ,.что.может.указывать,.например,.на.то,.
    что.имя.покупателя.в.форме.опущено.
    Если.ключ.равен.
    null
    .и.используется.метод.секционирования.по.умолчанию,.то.
    запись.будет.отправлена.в.один.из.доступных.разделов.темы.случайным.образом..

    Разделы 85
    Для.балансировки.сообщений.по.разделам.при.этом.будет.использоваться.цикли- ческий.алгоритм.
    Если.же.ключ.присутствует.и.используется.метод.секционирования.по.умолчанию,.
    Kafka.вычислит.хеш-значение.ключа.с.помощью.собственного.алгоритма.хеширо- вания,.так.что.хеш-значения.не.изменятся.при.обновлении.Java,.и.отправит.сооб- щение.в.конкретный.раздел.на.основе.полученного.результата..А.поскольку.важно,.
    чтобы.ключи.всегда.соответствовали.одним.и.тем.же.разделам,.для.вычисления.
    соответствия.используются.все.разделы.темы,.не.только.доступные..Это.значит,.
    что,.если.конкретный.раздел.недоступен.на.момент.записи.в.него.данных,.будет.
    возвращена.ошибка..Это.происходит.довольно.редко,.как.вы.увидите.в.главе.6,.
    когда.мы.будем.обсуждать.репликацию.и.доступность.Kafka.
    Соответствие.ключей.разделам.остается.согласованным.лишь.до.тех.пор,.пока.
    число.разделов.в.теме.не.меняется..Так.что.пока.это.число.постоянно,.вы.можете.
    быть.уверены,.например,.что.относящиеся.к.пользователю.045189.записи.всегда.
    будут.записываться.в.раздел.34..Эта.особенность.открывает.дорогу.для.всех.видов.
    оптимизации.при.чтении.данных.из.разделов..Но.как.только.вы.добавите.в.тему.
    новые.разделы,.такое.поведение.больше.нельзя.будет.гарантировать:.старые.записи.
    останутся.в.разделе.34,.а.новые.могут.оказаться.записанными.в.другой.раздел..Если.
    ключи.секционирования.важны,.простейшим.решением.будет.создавать.темы.с.до- статочным.количеством.разделов.(в.главе.2.мы.приводили.соображения.по.поводу.
    выбора.оптимального.количества.разделов).и.никогда.не.добавлять.новые.
    Реализация пользовательской стратегии секционирования..До.сих.пор.мы.обсуж- дали.особенности.метода.секционирования.по.умолчанию,.используемого.чаще.
    всего..Однако.Kafka.не.ограничивает.вас.лишь.хеш-разделами,.и.иногда.появляются.
    веские.причины.секционировать.данные.иначе..Например,.представьте,.что.вы.—.
    B2B-поставщик,.а.ваш.крупнейший.покупатель.—.компания.Banana,.производящая.
    карманные.устройства..Допустим,.что.более.10.%.ваших.ежедневных.транзакций.
    приходится.на.этого.покупателя..При.использовании.хеш-секционирования,.при- нятого.по.умолчанию,.записи.Banana.будут.распределяться.в.тот.же.раздел,.что.
    и.записи.других.заказчиков,.в.результате.чего.один.из.разделов.окажется.намного.
    больше.других..Это.приведет.к.исчерпанию.места.на.серверах,.замедлению.об- работки.и.т..д..На.самом.деле.лучше.выделить.для.покупателя.Banana.отдельный.
    раздел,.после.чего.применить.хеш-секционирование.для.распределения.остальных.
    заказчиков.по.разделам.
    Вот.пример.пользовательского.объекта.
    Partitioner
    :
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.record.InvalidRecordException;
    import org.apache.kafka.common.utils.Utils;
    public class BananaPartitioner implements Partitioner {

    86 Глава 3 • Производители Kafka: запись сообщений в Kafka public void configure(Map configs) {} public int partition(String topic, Object key, byte[] keyBytes,
    Object value, byte[] valueBytes, Cluster cluster)
    {
    List partitions =
    cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String))) throw new InvalidRecordException("We expect all messages to have customer name as key")
    if (((String) key).equals("Banana"))
    return numPartitions-1;
    // Banana всегда попадает в последний раздел
    // Другие записи распределяются по разделам путем хеширования return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }
    public void close() {}
    }
    .Интерфейс.объекта.секционирования.включает.методы.
    configure
    ,.
    partition и.
    close
    ..Здесь.мы.реализовали.только.метод.
    partition
    ,.хотя.следовало.бы.пере- давать.имя.нашего.особого.заказчика.через.метод.
    configure
    ,.а.не.зашивать.его.
    в.код.метода.
    partition
    .Мы.ожидаем.только.строковые.ключи,.так.что.в.противном.случае.генерируем.
    исключение.
    Старые API производителей
    В.этой.главе.мы.обсудили.Java.клиент-производитель,.включенный.в.пакет.
    org.apa- che.kafka.clients
    ..Однако.в.Apache.Kafka.все.еще.содержатся
    1
    .два.более.старых.
    клиента,.написанных.на.языке.Scala.и.включенных.в.пакет.
    kafka.producer
    .(часть.
    базового.модуля.Kafka)..Эти.производители.называются.
    SyncProducer
    .(в.зависи- мости.от.значения.параметра.
    acks
    .он.может.ждать.подтверждения.сервером.полу- чения.каждого.сообщения.или.пакета.сообщений,.прежде.чем.отправлять.новые.
    сообщения).и.
    AsyncProducer
    .(формирует.пакеты.сообщений.в.фоновом.режиме,.
    отправляет.их.в.отдельном.потоке.и.не.предоставляет.клиенту.никакой.информа- ции.об.успехе.или.неудаче.отправки).
    Поскольку.текущий.производитель.поддерживает.оба.типа.поведения,.намного.
    более.надежен.и.предоставляет.разработчику.гораздо.бо'льшие.возможности.кон-
    1.
    Начиная.с.версии.0.11.0,.они.считаются.устаревшими.и.оставлены.только.из.соображений.
    совместимости..—.Примеч. пер.

    Резюме 87
    троля,.мы.не.будем.обсуждать.эти.старые.API..Если.вас.интересует.возможность.
    их.использования,.подумайте.еще.раз,.после.чего.обратитесь.к.документации.Kafka.
    за.дополнительной.информацией.
    Резюме
    Мы.начали.эту.главу.с.простого.примера.производителя.—.всего.десять.строк.кода,.
    отправляющих.события.в.Kafka..Расширили.его.за.счет.добавления.обработки.
    ошибок.и.опытов.с.синхронной.и.асинхронной.отправкой..Затем.изучили.наиболее.
    важные.конфигурационные.параметры.производителя.и.выяснили,.как.они.влияют.
    на.его.поведение..Мы.обсудили.сериализаторы,.которые.служат.для.управления.
    форматом.записываемых.в.Kafka.событий..Мы.подробно.рассмотрели.Avro.—.один.
    из.многих.способов.сериализации.событий,.часто.используемый.вместе.с.Kafka..
    В.завершение.главы.обсудили.секционирование.в.Kafka.и.привели.пример.про- двинутой.методики.пользовательского.секционирования.
    Теперь,.разобравшись.с.записью.событий.в.Kafka,.в.главе.4.рассмотрим.все,.что.
    касается.чтения.событий.из.Kafka.

    4
    Потребители Kafka: чтение данных из Kafka
    Приложения,.читающие.данные.из.Kafka,.используют.объект.
    KafkaConsumer
    .для.
    подписки.на.темы.Kafka.и.получения.из.них.сообщений..Чтение.данных.из.Kafka.
    несколько.отличается.от.чтения.данных.из.других.систем.обмена.сообщениями:.
    некоторые.принципы.его.работы.и.идеи.весьма.оригинальны..Чтобы.научиться.
    использовать.API.потребителей,.необходимо.сначала.разобраться.с.этими.прин- ципами..Мы.начнем.с.пояснений.по.поводу.важнейших.из.них,.а.затем.рассмотрим.
    примеры,.демонстрирующие.различные.способы.использования.API.потребителей.
    для.реализации.приложений.с.разнообразными.требованиями.
    Принципы работы потребителей Kafka
    Чтобы.научиться.читать.данные.из.Kafka,.нужно.сначала.разобраться.с.концепция- ми.потребителей.и.групп.потребителей..Мы.рассмотрим.эти.понятия.в.следующих.
    разделах.
    Потребители и группы потребителей
    Представьте.себе.приложение,.читающее.данные.из.темы.Kafka,.проверяющее.их.
    и.записывающее.результаты.в.другое.хранилище.данных..Это.приложение.должно.
    будет.создать.объект-потребитель,.подписаться.на.соответствующую.тему.и.при- ступить.к.получению.сообщений,.их.проверке.и.записи.результатов..До.поры.до.
    времени.такая.схема.будет.работать,.но.что.если.производители.записывают.со- общения.в.тему.быстрее,.чем.приложение.может.их.проверить?.Если.чтение.и.об- работка.данных.ограничиваются.одним.потребителем,.приложение,.не.способное.
    справиться.с.темпом.поступления.сообщений,.будет.все.больше.и.больше.отставать..
    Понятно,.что.в.такой.ситуации.нужно.масштабировать.получение.сообщений.из.
    тем..Необходимо,.чтобы.несколько.потребителей.могли.делить.между.собой.дан- ные.и.читать.из.одной.темы.подобно.тому,.как.несколько.производителей.могут.
    писать.в.одну.тему.

    Принципы работы потребителей Kafka 89
    Потребители.Kafka.обычно.состоят.в.группе потребителей..Если.несколько.по- требителей.подписаны.на.одну.тему.и.относятся.к.одной.группе,.все.потребители.
    группы.будет.получать.сообщения.из.различных.подмножеств.разделов.группы.
    Рассмотрим.тему.T1.с.четырьмя.разделами..Предположим,.что.мы.создали.новый.
    потребитель.C1.—.единственный.потребитель.в.группе.G1.и.подписали.его.на.
    тему.T1..C1.будет.получать.все.сообщения.из.всех.четырех.разделов.темы.(рис..4.1).
    Рис. 4.1. Одна группа потребителей с четырьмя разделами
    Если.мы.добавим.в.группу.G1.еще.один.потребитель,.C2,.то.каждый.потребитель.
    будет.получать.сообщения.только.из.двух.разделов..Например,.сообщения.из.раз- делов.0.и.2.попадут.к.C1,.а.из.разделов.1.и.3.—.к.C2.(рис..4.2).
    Рис. 4.2. Четыре раздела разбиты по двум потребителям
    Если.бы.в.группе.G1.было.четыре.потребителя,.то.каждый.из.них.читал.бы.сообще- ния.из.своего.раздела.(рис..4.3).

    90 Глава 4 • Потребители Kafka: чтение данных из Kafka
    Рис. 4.3. Четыре потребителя, по одному разделу каждый
    Если.же.в.одной.группе.с.одной.темой.будет.больше.потребителей,.чем.разделов,.
    часть.потребителей.будут.простаивать.и.вообще.не.получать.сообщений.(рис..4.4).
    Рис. 4.4. Потребителей больше, чем разделов, поэтому часть из них простаивает
    Основной.способ.масштабирования.получения.данных.из.Kafka.—.добавление.
    новых.потребителей.в.группу..Потребители.Kafka.часто.выполняют.операции.с.вы- соким.значением.задержки,.например,.запись.данных.в.базу.или.занимающие.много.
    времени.вычисления.с.ними..В.этих.случаях.отдельный.потребитель.неизбежно.
    будет.отставать.от.темпов.поступления.данных.в.тему,.и.разделение.нагрузки.пу- тем.добавления.новых.потребителей,.каждый.из.которых.отвечает.лишь.за.часть.
    разделов.и.сообщений,.—.основной.метод.масштабирования..Поэтому.имеет.смысл.

    Принципы работы потребителей Kafka 91
    создавать.темы.с.большим.количеством.разделов,.ведь.это.дает.возможность.до- бавлять.новых.потребителей.при.возрастании.нагрузки..Помните,.что.нет.смысла.
    добавлять.столько.потребителей,.сколько.нужно,.чтобы.их.стало.больше,.чем.раз- делов.в.теме,.—.часть.из.них.будет.просто.простаивать..В.главе.2.мы.приводили.
    соображения.по.поводу.выбора.количества.разделов.в.теме.
    Помимо.добавления.потребителей.для.масштабирования.отдельного.приложения.
    широко.распространено.чтение.несколькими.приложениями.данных.из.одной.
    темы..На.самом.деле.одной.из.главных.задач.создания.Kafka.было.обеспечение.
    возможности.использования.записанных.в.темы.Kafka.данных.во.множестве.
    сценариев.в.организации..В.подобном.случае.хотелось.бы,.чтобы.каждое.из.при- ложений.получило.все.данные,.а.не.только.их.подмножество..А.чтобы.приложе- ние.получило.все.данные.из.темы,.у.нее.должна.быть.своя.группа.потребителей..
    В.отличие.от.многих.традиционных.систем.обмена.сообщениями,.Kafka.масшта- бируется.до.очень.больших.количеств.потребителей.и.их.групп.без.снижения.
    производительности.
    Если.мы.добавим.в.предыдущем.примере.новую.группу.G2.с.одним.потребителем,.
    он.прочитает.все.сообщения.из.темы.T1.вне.зависимости.от.группы.G1..В.G2.мо- жет.быть.свыше.одного.потребителя,.подобно.тому.как.было.в.G1,.но.G2.все.равно.
    получит.все.сообщения.вне.зависимости.от.других.групп.потребителей.(рис..4.5).
    Рис. 4.5. Вторая группа потребителей тоже читает все сообщения

    92 Глава 4 • Потребители Kafka: чтение данных из Kafka
    Резюмируем:.для.каждого.приложения,.которому.нужны.все.сообщения.из.одной.
    темы.или.нескольких,.создается.новая.группа.потребителей..В.существующую.
    группу.их.добавляют.при.необходимости.масштабирования.чтения.и.обработки.
    сообщений.из.тем,.так.что.до.каждого.дополнительного.потребителя.в.группе.до- ходит.только.подмножество.сообщений.
    Группы потребителей и перебалансировка разделов
    Как.мы.видели.в.предыдущем.разделе,.потребители.в.группе.делят.между.собой.
    разделы.тем,.на.которые.подписаны..Добавленный.в.группу.потребитель.начинает.
    получать.сообщения.из.разделов,.за.которые.ранее.отвечал.другой.потребитель..
    То.же.самое.происходит,.если.потребитель.останавливается.или.аварийно.заверша- ет.работу:.он.покидает.группу,.а.разделы,.за.которые.он.ранее.отвечал,.обрабатыва- ются.одним.из.оставшихся.потребителей..Переназначение.разделов.потребителям.
    происходит.также.при.изменении.тем,.которые.читает.группа.(например,.при.до- бавлении.администратором.новых.разделов).
    Передача.раздела.от.одного.потребителя.другому.называется.перебалансировкой.
    (rebalance)..Перебалансировка.важна,.потому.что.обеспечивает.группе.потребителей.
    масштабируемость.и.высокую.доступность,.позволяя.легко.и.безопасно.добав- лять.и.удалять.потребителей,.но.при.обычных.обстоятельствах.она.нежелательна..
    Во.время.перебалансировки.потребители.не.могут.получать.сообщения,.так.что.
    она.фактически.представляет.собой.краткий.интервал.недоступности.всей.группы.
    потребителей..Кроме.того,.при.передаче.разделов.от.одного.потребителя.другому.
    потребитель.утрачивает.свое.текущее.состояние:.если.он.кэшировал.какие-либо.
    данные,.ему.придется.обновить.кэши,.что.замедлит.приложение.на.время.восстанов- ления.состояния..В.этой.главе.мы.обсудим,.как.сделать.так,.чтобы.перебалансировка.
    не.представляла.опасности,.и.как.избежать.нежелательной.перебалансировки.
    Потребители.поддерживают.членство.в.группе.и.принадлежность.разделов.за.счет.
    отправки.назначенному.координатором группы.брокеру.Kafka.(для.разных.групп.
    потребителей.это.могут.быть.разные.брокеры).периодических.контрольных сигна-
    лов
    .(heartbeats)..До.тех.пор,.пока.потребитель.регулярно.отправляет.контрольные.
    сигналы,.он.считается.активным,.нормально.работающим.и.обрабатывающим.со- общения.с.относящихся.к.нему.разделов..Контрольные.сигналы.отправляются.во.
    время.опросов.(то.есть.при.извлечении.потребителем.записей).и.при.фиксации.
    полученных.им.записей.
    Если.потребитель.на.длительное.время.прекращает.отправку.контрольных.сигна- лов,.время.его.сеанса.истекает.и.координатор.группы.признает.его.неработающим.
    и.инициирует.перебалансировку..В.случае.аварийного.сбоя.потребителя.и.прекра- щения.обработки.им.сообщений.координатору.группы.хватит.нескольких.секунд.
    без.контрольных.сигналов,.чтобы.признать.его.неработающим.и.инициировать.
    перебалансировку..На.протяжении.этого.времени.никакие.сообщения.из.относя- щихся.к.данному.потребителю.разделов.обрабатываться.не.будут..Если.же.потре- битель.завершает.работу.штатным.образом,.он.извещает.координатора.группы.об.

    Принципы работы потребителей Kafka 93
    этом,.и.координатор.группы.инициирует.перебалансировку.немедленно,.сокращая.
    тем.самым.перерыв.в.обработке..Далее.в.этой.главе.мы.обсудим.параметры.конфи- гурации,.управляющие.частотой.отправки.контрольных.сигналов.и.длительностью.
    времени.ожидания.сеанса,.и.их.настройку.подходящим.для.вас.образом.
    ИЗМЕНЕНИЯ ЛОГИКИ РАБОТЫ КОНТРОЛЬНЫХ СИГНАЛОВ
    В ПОСЛЕДНИХ ВЕРСИЯХ KAFKA
    В.версии.0.10.1.Kafka.появился.отдельный.поток.для.контрольных.сигналов,.отправ- ляющий.их.и.между.опросами..Он.дает.возможность.сделать.частоту.контрольных.
    сигналов.(а.значит,.и.время,.которое.нужно.группе.потребителей.для.обнаружения.
    аварийного. сбоя. потребителя. и. отсутствия. от. него. контрольных. сигналов). неза- висимой.от.частоты.опросов.(определяемой.временем,.необходимым.для.обработки.
    возвращаемых. брокерами. данных).. В. новых. версиях. Kafka. можно. задать. макси- мальную.длительность.работы.потребителя.без.опросов,.по.истечении.которой.его.
    признают.отказавшим.и.будет.инициирована.перебалансировка..Этот.параметр.ис- пользуется. для. предотвращения. динамической. взаимоблокировки. (livelock),. при.
    которой.не.происходит.аварийного.сбоя.приложения,.но.и.задача.не.выполняется..
    Он.не.зависим.от.параметра.session.timeout.ms,.соответствующего.промежутку.вре- мени,.необходимому.для.обнаружения.отказа.потребителя.и.прекращения.отправки.
    контрольных.сигналов.
    Оставшаяся.часть.данной.главы.посвящена.обсуждению.некоторых.проблем.более.
    старых.версий.Kafka.и.тому,.как.разработчик.может.с.ними.справиться..Мы.обсудим.
    также,. что. делать. с. приложениями,. которые. обрабатывают. записи. слишком. долго..
    Если.вы.работаете.с.версией.Kafka.0.10.1.или.более.новой,.эта.информация.окажется.
    для. вас. не. слишком. актуальной.. Если. при. использовании. одной. из. новых. версий.
    приходится. иметь. дело. с. записями,. обработка. которых. занимает. больше. времени,.
    просто. подберите. значение. параметра. max.poll.interval.ms,. подходящее. для. более.
    длительных.промежутков.между.опросами.
    1   ...   7   8   9   10   11   12   13   14   ...   39


    написать администратору сайта