Apache Kafka. Потоковая обработка и анализ данных. Apache Kafka. Потоковая обработка и анализ данныхСерия Бестселлеры OReilly
Скачать 7.59 Mb.
|
Задание настроек темы Во время создания темы можно также явным образом указывать для нее ре- плики или переопределять настройки. Ни одну из этих операций мы не будем рассматривать. Сведения о переопределении настроек вы можете найти далее в этой главе. Передать их сценарию kafka-topics.sh можно с помощью параметра командной строки --config. Переназначение разделов также описывается далее в этой главе. Названия.тем.могут.содержать.алфавитно-цифровые.символы,.а.также.символы. подчеркивания,.тире.и.точки. Именование тем Разрешается, хотя и не рекомендуется начинать названия тем с двух символов подчеркивания. Подобные темы считаются предназначенными для внутренних це- лей кластера (как, например, __consumer_offsets, предназначенная для хранения смещений для группы потребителей). Не рекомендуется применять в названиях в одном кластере и точки, и подчеркивания, поскольку при использовании на- званий тем в названиях показателей внутри Kafka точки заменяются подчерки- ваниями (например, topic.1 в показателях превращается в topic_1). Выполните.сценарий. kafka-topics.sh .со.следующими.параметрами: kafka-topics.sh --zookeeper --replication-factor Операции с темами 215 В.результате.кластер.создаст.тему.с.заданными.названием.и.количеством.разделов.. Для.каждого.раздела.он.подберет.заданное.число.подходящих.реплик..Это.значит,. что.если.кластер.настроен.для.распределения.реплик.с.учетом.стоек,.то.реплики. разделов.будут.находиться.в.отдельных.стойках..Если.же.такое.поведение.нежела- тельно,.укажите.аргумент.командной.строки. --disable-rack-aware Например,.создадим.тему.my-topic.с.8.разделами,.в.каждом.из.которых.по.две. реплики: # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --create --topic my-topic --replication-factor 2 --partitions 8 Created topic "my-topic". # Игнорирование ошибки в том случае, когда тема уже существует При использовании этого сценария для автоматизации может оказаться полезным аргумент --if-not-exists, при котором не возвращается ошибка в случае, когда тема уже существует. Добавление разделов Иногда.оказывается.нужно.увеличить.количество.разделов.темы..Темы.мас- штабируются.и.реплицируются.в.кластере.посредством.разделов,.так.что.чаще. всего.число.разделов.увеличивают.при.необходимости.расширения.темы.или. снижения.нагрузки.на.отдельный.раздел..Можно.также.увеличивать.темы,.если. требуется.несколько.экземпляров.одного.и.того.же.потребителя.в.рамках.одной. группы,.поскольку.только.один.участник.группы.потребителей.может.читать. один.раздел. Тонкая настройка тем с ключами С точки зрения потребителей добавление разделов в темы, содержащие со- общения с ключами, может оказаться весьма непростой задачей. Дело в том, что соответствие ключей разделам может меняться при смене числа разделов. Поэтому рекомендуется задавать число разделов для тем, содержащих сообще- ния с ключами, однократно при их создании и стараться не менять их размер. Игнорирование ошибки в случае, когда темы не существует Хотя для команды --alter существует аргумент --if-exists, использовать его не ре- комендуется, поскольку в этом случае команда не будет возвращать ошибки, если модифицируемой темы не существует. Это может замаскировать проблему с отсутствием нужной темы. 216 Глава 9 • Администрирование Kafka Например,.увеличим.количество.разделов.для.темы.my-topic.до.16: # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --alter --topic my-topic --partitions 16 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! # Уменьшение количества разделов Уменьшить количество разделов темы невозможно. Причина, по которой эта возможность не поддерживается, такова: удаление раздела из темы привело бы к удалению части его данных и несогласованности с точки зрения клиента. Кроме того, попытка перераспределения данных по оставшимся разделам — задача непростая, чреватая неправильным упорядочением сообщений. При необходимости уменьшить число разделов следует удалить тему и создать ее заново. Удаление темы Даже.не.содержащая.сообщений.тема.расходует.ресурсы.кластера,.включая.диско- вое.пространство,.открытые.дескрипторы.файлов.и.оперативную.память..Если.тема. больше.не.нужна,.следует.ее.удалить,.чтобы.освободить.эти.ресурсы..Для.этого. параметр.конфигурации.брокеров.кластера. delete.topic.enable .должен.быть. равен. true ..Если.же.этот.параметр.установлен.в. false ,.запросы.на.удаление.тем. будут.проигнорированы. Данные будут утеряны Удаление темы приводит к удалению всех ее сообщений. Эта операция необ- ратима, так что выполняйте ее осторожно. Например,.удалим.тему.с.названием.my-topic: # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --topic my-topic Topic my-topic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. # Вывод списка всех тем кластера Утилита.для.работы.с.темами.может.также.вывести.список.всех.тем.в.кластере.. Формат.списка.—.по.одной.теме.в.строке,.упорядоченность.отсутствует. Операции с темами 217 Например,.выведем.список.всех.тем.кластера: # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --list my-topic - marked for deletion other-topic # Подробное описание тем В.числе.прочего.можно.получить.подробную.информацию.по.одной.или.несколь- ким.темам.кластера..Выводимая.информация.включает.количество.разделов,. переопределения.настроек.тем.и.список.разделов.с.распределением.реплик.. Можно.ограничиться.информацией.по.одной.теме,.указав.для.команды.аргу- мент. --topic Например,.выведем.описание.всех.тем.кластера: # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe Topic:other-topic PartitionCount:8 ReplicationFactor:2 Configs: Topic:other-topic Partition: 0 ... Replicas: 1,0 Isr: 1,0 Topic:other-topic Partition: 1 ... Replicas: 0,1 Isr: 0,1 Topic:other-topic Partition: 2 ... Replicas: 1,0 Isr: 1,0 Topic:other-topic Partition: 3 ... Replicas: 0,1 Isr: 0,1 Topic:other-topic Partition: 4 ... Replicas: 1,0 Isr: 1,0 Topic:other-topic Partition: 5 ... Replicas: 0,1 Isr: 0,1 Topic:other-topic Partition: 6 ... Replicas: 1,0 Isr: 1,0 Topic:other-topic Partition: 7 ... Replicas: 0,1 Isr: 0,1 # У.команды. describe .есть.несколько.полезных.параметров.для.фильтрации.выво- димой.информации..Они.могут.пригодиться.при.диагностике.проблем.с.кластером.. В.случае.их.использования.не.задавайте.аргумент. –topic ,.поскольку.смысл.состоит. в.том,.чтобы.найти.все.темы.или.разделы.кластера,.соответствующие.заданному. критерию..Эти.параметры.не.работают.с.командой. list ,.описанной.в.предыдущем. разделе. Для.поиска.всех.тем,.у.которых.были.переопределены.настройки,.воспользуйтесь. аргументом. --topics-with-overrides ..При.этом.будут.выведены.только.те.темы,. чьи.настройки.отличаются.от.настроек.кластера.по.умолчанию. Существует.два.фильтра,.которые.можно.использовать.для.поиска.проблемных. разделов..Аргумент. --under-replicated-partitions .выводит.все.разделы,.одна. или.более.реплик.которых.не.согласованы.с.ведущей.репликой..Аргумент. --un- available-partitions .выводит.все.разделы,.у.которых.нет.ведущей.реплики.. Это.более.серьезная.проблема,.означающая,.что.раздел.в.настоящий.момент. отключен.и.недоступен.для.клиентов-потребителей.и.клиентов-производителей. 218 Глава 9 • Администрирование Kafka Например,.выведем.список.недореплицированных.разделов: # kafka-topics.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe --under-replicated-partitions Topic: other-topic Partition: 2 Leader: 0 Replicas: 1,0 Isr: 0 Topic: other-topic Partition: 4 Leader: 0 Replicas: 1,0 Isr: 0 # Группы потребителей Управление.группами.потребителей.в.Kafka.осуществляется.в.двух.местах:.инфор- мация.для.потребителей.старых.версий.хранится.в.ZooKeeper,.новых.—.в.брокерах. Kafka..Утилиту.kafka-consumer-groups.sh.можно.использовать.для.вывода.и.описа- ния.обоих.типов.групп,.а.также.для.удаления.информации.о.группах.потребителей. и.смещениях,.но.только.если.это.группы.потребителей.старых.версий,.хранимых. в.ZooKeeper..Работая.с.группами.потребителей.старых.версий,.можно.обращаться. к.кластеру.Kafka.посредством.задания.для.утилиты.параметра.командной.строки. --zookeeper ..А.для.групп.потребителей.новых.версий.необходимо.использовать. параметр. --bootstrap-server .с.указанием.имени.хоста.и.порта.брокера.Kafka,. к.которому.должно.осуществляться.подключение. Вывод списка и описание групп Для.вывода.списка.групп.потребителей,.если.это.старые.клиенты-потребители,. необходимо.использовать.параметры. --zookeeper .и. --list ..Для.новых.восполь- зуйтесь.параметрами. --bootstrap-server ,. --list .и. --new-consumer Например,.выведем.список.групп.старой.версии: # kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --list console-consumer-79697 myconsumer # И.список.групп.новой.версии: # kafka-consumer-groups.sh --new-consumer --bootstrap-server kafka1.example.com:9092 --list kafka-python-test my-new-consumer # Более.подробное.описание.любой.из.перечисленных.групп.можно.получить,.за- менив.параметр. --list .на. --describe .и.добавив.параметр. --group ..В.результате. этого.будут.выведены.все.темы,.которые.читает.группа,.а.также.смещения.для.всех. разделов.тем. Группы потребителей 219 Например,.выведем.подробную.информацию.о.группе.потребителей.старой.версии. с.названием.testgroup: # kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --describe --group testgroup GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER myconsumer my-topic 0 1688 1688 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 1 1418 1418 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 2 1314 1315 1 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 3 2012 2012 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 4 1089 1089 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 5 1429 1432 3 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 6 1634 1634 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 myconsumer my-topic 7 2261 2261 0 myconsumer_host1.example.com-1478188622741-7dab5ca7-0 # В.табл..9.1.описаны.выводимые.здесь.поля. Таблица 9.1. Поля вывода информации о группе потребителей старой версии с названием testgroup Поле Описание GROUP Название.группы.потребителей TOPIC Название.читаемой.темы PARTITION Идентификатор.читаемого.раздела CURRENT-OFFSET Последнее.смещение,.зафиксированное.группой.потребителей.для. данного.раздела.темы..Представляет.собой.позицию.потребителя. в.разделе LOG-END-OFFSET Текущее.максимальное.смещение.для.данного.раздела.темы.из. имеющихся.в.брокере LAG Разница.между.Current-Offset.потребителя.и.Log-End-Offset.брокера. для.данного.раздела.темы OWNER Член.группы.потребителей,.который.в.настоящий.момент.выполняет. потребление.данного.раздела.темы..Представляет.собой.произвольный. идентификатор,.задаваемый.самим.членом.группы..Не.обязан. содержать.имя.хоста.потребителя 220 Глава 9 • Администрирование Kafka Удаление группы Удаление.группы.потребителей.поддерживается.только.для.клиентов.—.потре- бителей.старых.версий..Это.действие.приводит.к.удалению.из.ZooKeeper.всей. группы,.включая.все.сохраненные.смещения.для.всех.потребляемых.группой.тем.. Чтобы.выполнить.удаление,.необходимо.прекратить.работу.всех.потребителей. группы..Если.не.сделать.этого.заранее,.поведение.потребителей.может.оказаться. непредсказуемым.из-за.удаления.метаданных.ZooKeeper.для.группы.во.время.их. использования. Например,.удалим.группу.потребителей.testgroup: # kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --group testgroup Deleted all consumer group information for group testgroup in zookeeper. # Ту.же.самую.команду.можно.использовать.и.для.удаления.смещений.для.отдель- ной.читаемой.группой.темы,.не.удаляя.всю.группу..Опять.же.рекомендуется.пред- варительно.остановить.работу.группы.потребителей.или.исключить.с.помощью. настроек.потребление.ею.удаляемой.темы. Например,.удалим.смещения.для.темы.my-topic.из.группы.потребителей.test- group: # kafka-consumer-groups.sh --zookeeper zoo1.example.com:2181/kafka-cluster --delete --group testgroup --topic my-topic Deleted consumer group information for group testgroup topic my-topic in zookeeper. # Управление смещениями Помимо.отображения.и.удаления.смещений.групп.потребителей.с.помощью. клиентов.старых.версий.существует.возможность.извлечения.этих.смещений. и.сохранения.их.в.пакете..Это.может.пригодиться.для.сброса.значения.смещений. конкретного.потребителя.в.случае.возникновения.проблемы,.из-за.которой.нужно. будет.перечитать.сообщения.или.перескочить.через.проблемное.сообщение.(на- пример,.плохо.отформатированное.сообщение,.обработать.которое.у.потребителя. не.получается). Управление зафиксированными в Kafka смещениями Сейчас не существует утилиты для управления зафиксированными в Kafka смеще- ниями клиента-потребителя. Эта возможность доступна только для потребителей, фиксирующих смещения в ZooKeeper. Чтобы управлять смещениями группы по- требителей, фиксирующих смещения в Kafka, для фиксации смещений придется воспользоваться имеющимися API клиента. Группы потребителей 221 Экспорт смещений Именованного.сценария.для.экспорта.смещений.не.существует,.но.можно.вос- пользоваться.сценарием. kafka-run-class.sh ,.чтобы.выполнить.соответствующий. Java-класс.в.подходящей.среде..В.результате.экспорта.смещений.будет.создан. файл,.содержащий.разделы.всех.тем.группы.и.их.смещения.в.подходящем.для. утилиты.импорта.формате..В.этом.файле.в.каждой.строке.будет.по.одному.разделу. темы.в.следующем.формате: /consumers/GROUPNAME/off sets/topic/TOPICNAME/PARTITIONID:OFFSET. Например,.экспортируем.смещения.группы.потребителей.testgroup.в.файл. offsets : # kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect zoo1.example.com:2181/kafka-cluster --group testgroup --output-file offsets # cat offsets /consumers/testgroup/offsets/my-topic/0:8905 /consumers/testgroup/offsets/my-topic/1:8915 /consumers/testgroup/offsets/my-topic/2:9845 /consumers/testgroup/offsets/my-topic/3:8072 /consumers/testgroup/offsets/my-topic/4:8008 /consumers/testgroup/offsets/my-topic/5:8319 /consumers/testgroup/offsets/my-topic/6:8102 /consumers/testgroup/offsets/my-topic/7:12739 # Импорт смещений Утилита.импорта.смещений.представляет.собой.противоположность.утилиты.экс- порта..Она.принимает.на.входе.файл,.полученный.в.результате.работы.утилиты. экспорта.из.прошлого.раздела,.и.устанавливает.на.его.основе.текущие.смещения. группы.потребителей..Общепринятая.практика.—.экспортировать.текущие.сме- щения.группы.потребителей,.делать.резервную.копию.файла.и.вносить.в.файл. изменения,.задавая.для.смещений.нужные.значения..Обратите.внимание.на.то,. что.для.команды.импорта.параметр. --group .не.используется,.поскольку.название. группы.потребителей.включено.в.импортируемый.файл. Предварительно остановите работу потребителей Перед выполнением этого шага нужно остановить все потребители группы. Они не будут читать новые смещения, записываемые в ходе работы группы по- требителей, а просто перекроют импортированные. Например,.импортируем.смещения.группы.потребителей.testgroup.из.файла. offsets : # kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect zoo1.example.com:2181/kafka-cluster --input-file offsets # 222 Глава 9 • Администрирование Kafka Динамические изменения конфигурации Существует.возможность.перекрытия.настроек.тем.и.квот.клиентов.во.время.рабо- ты.кластера..В.будущем.планируется.добавить.еще.больше.динамических.настроек,. поэтому.мы.поместили.их.в.отдельную.утилиту.командной.строки,. kafka-configs. sh ..Это.позволяет.задавать.настройки.для.отдельных.тем.и.ID.клиентов..Кластер. использует.заданные.настройки.на.постоянной.основе..Они.хранятся.в.ZooKeeper,. и.каждый.брокер.читает.их.при.запуске..В.утилитах.и.документации.о.подобных. динамических.настройках.говорят.как.о.настройках для отдельных тем.(per-topic). или.настройках для отдельных клиентов.(per-client),.а.также.используют.термин. «переопределение настроек».(override). Как.и.для.предыдущих.утилит,.необходимо.указать.строку.подключения.ZooKeeper. для.кластера.с.помощью.аргумента. --zookeeper ..В.последующих.примерах.пред- полагается,.что.строка.подключения.ZooKeeper.имеет.вид. zoo1.example.com:2181/ kafka-cluster Переопределение значений настроек тем по умолчанию Сочетать.различные.сценарии.использования.в.одном.кластере.можно,.меняя. множество.настроек.отдельных.тем..У.большинства.этих.настроек.есть.задаваемые. в.конфигурации.брокера.значения.по.умолчанию,.которые.и.будут.применяться,. если.их.не.переопределить. Формат.команды.изменения.настроек.темы: kafka-configs.sh --zookeeper zoo1.example.com:2181/kafka-cluster --alter --entity-type topics --entity-name --add-config В.табл..9.2.приведены.возможные.настройки.(ключи.конфигурации).тем. Таблица 9.2. Допустимые ключи [конфигурации] тем Ключ конфигурации Описание cleanup.policy При.значении.compact.сообщения.темы.будут.отбрасываться,. а.из.сообщений.с.заданным.ключом.будет.сохраняться.только.самое. последнее.(сжатые.журналы) compression.type Тип.сжатия,.используемый.брокером.при.записи.на.диск.пакетов. сообщений.для.данной.темы..Допустимые.значения.gzip,.snappy. и.lz4 delete.retention.ms Длительность.(в.миллисекундах).хранения.отметок.об.удалении. для.данной.темы..Имеет.смысл.только.для.тем.со.сжатием. журналов |