Демонстрация использования наиболее часто используемых команд интерфейса командной строки Kafka

Введение

Apache Kafka — одна из наиболее часто используемых технологий, упрощающих создание архитектур с равномерной потоковой передачей. Несмотря на то, что его часто считают *просто* брокером сообщений (например, как RabbitMQ), Kafka определенно намного больше, чем просто это.

Kafka был внутренним проектом LinkedIn, исходный код которого был открыт в 2011 году и быстро превратился из брокера сообщений в полноценную платформу, обеспечивающую равномерную потоковую передачу в высоко масштабируемом, отказоустойчивом распределенном режиме.

В сегодняшней статье мы рассмотрим некоторые из наиболее часто используемых команд в интерфейсе командной строки Kafka. Не забудьте добавить это руководство в закладки, так как есть вероятность, что вам придется довольно часто обращаться к некоторым частям при выполнении определенных задач администрирования в кластерах Kafka.

1. Как перечислить все темы в кластере Kafka

Если вы хотите перечислить темы, включенные в конкретный брокер, следующая команда сделает свое дело:

$ kafka-topics \
  --bootstrap-server localhost:9092 \
  --list

Обратите внимание, что в более старых версиях вы также можете использовать конечную точку Zookeeper, как показано ниже:

$ kafka-topics \
  --zookeeper localhost:2181 \
  --list

Поскольку последние выпуски Kafka удалят зависимость от Zookeeper, я бы посоветовал использовать первую команду.

Наконец, если вы хотите перечислить темы по всему кластеру, обязательно включите всех брокеров в --bootstrap-server:

$ kafka-topics \
  --bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
  --list

2. Как удалить тему из Kafka

Чтобы удалить топик из кластера, нужно передать флаг --delete вместе с брокерами и именем удаляемого топика.

$ kafka-topics \
   --bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
   --delete \
   --topic topic_for_deletion

3. Удалите несколько тем из Kafka с помощью регулярного выражения

При удалении тем из брокеров Kafka вы даже можете указать выражения, подобные регулярным выражениям, чтобы удалить несколько тем за один раз. Например, предположим, что мы хотим удалить все темы, начинающиеся с префикса test-. Следующая команда должна помочь:

$ kafka-topics \
    --bootstrap-server localhost:9092,localhost:9093,localhost:9094 
    --delete \
    --topic 'test-.*'

4. Как создать тему Кафки

Теперь, если вы хотите создать новую тему, вы можете просто сделать это, используя опцию --create в kafka-topics runner.

$ kafka-topics \
  --bootstrap-server localhost:9092 \
  --create \
  --topic topic-name

Обратите внимание, что при создании раздела вы можете указать некоторые дополнительные параметры конфигурации, такие как количество разделов и/или коэффициент репликации. Например, чтобы создать тему с тремя разделами и коэффициентом репликации 2, подойдет следующая команда:

$ kafka-topics \
  --bootstrap-server localhost:9092 \
  --create \
  --topic topic-name \
  --partitions 3 \
  --replication-factor 2

5. Получите подробную информацию о теме Kafka (количество разделов, реплики и т. д.)

Если вы хотите получить детали конфигурации определенной темы, вы можете использовать опцию --describe с kafka-topics runner.

$ kafka-topics \
  --bootstrap-server localhost:9092 \
  --describe \
  --topic topic-name

Вывод команды предоставит вам такую ​​информацию, как количество разделов, коэффициент репликации, лидер, реплики и ISR, а также дополнительные конфигурации (например, период хранения, если значение по умолчанию было изменено).

Topic:topic-name  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: topic-name Partition: 0    Leader: 0   Replicas: 0 Isr: 0

6. Как производить данные по теме Кафки

Теперь приступим к генерации данных. Чтобы получить данные по определенной теме, все, что вам нужно, это бегун kafka-console-consumer:

$ kafka-console-producer \ 
  --bootstrap-server localhost:9092 \
  --topic topic-name
> One event
> Oh, another event!

Если вы также хотите установить ключи для каждого сгенерированного сообщения, вы можете использовать некоторые дополнительные параметры (например, включить ключи и символ-разделитель ключей), как показано ниже:

$ kafka-console-producer \ 
  --bootstrap-server localhost:9092 \
  --topic topic-name \
  --property "parse.key=true" \
  --property "key.separator=:"
> key1:One event
> key2:Oh, another event!

7. Как использовать данные из темы Kafka

С другой стороны, если вы хотите получать в терминале сообщения из определенной темы Kafka, вам понадобится только kafka-console-consumer:

$ kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic topic-name \
  --from-beginning
One event
Oh, another event!

Если вы также хотите распечатать ключи, вы также можете указать символ-разделитель ключей, флаги для включения печати как для ключа, так и для значения, а также десериализаторы ключа и значения.

Например,

$ kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic topic-name \
  --from-beginning \
  --property key.separator="-" \
  --property print.key=true \
  --property print.value=true \
  --property  key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
  --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

8. Как очистить тему Кафки

Теперь, чтобы очистить тему Kafka и удалить все сообщения, вы можете просто временно изменить период хранения на одну секунду, а затем снова изменить его.

Чтобы изменить удержание на одну секунду, вы можете просто использовать флаг --add-config с бегуном kafka-configs:

$ kafka-configs \
  --bootstrap-server localhost:9092 \
  --alter \
  --entity-type topics \
  --entity-name topic-name \
  --add-config retention.ms=1000

И чтобы вернуть его обратно, просто удалите этот конфиг (чтобы затем использовалось сохранение по умолчанию):

$ kafka-configs \
  --bootstrap-server localhost:9092 \
  --alter \
  --entity-type topics \
  --entity-name topic-name \
  --delete-config retention.ms

9. Как составить список всех групп потребителей по всем темам

Каждая тема Кафки может иметь множество групп потребителей. Чтобы вывести список всех групп потребителей по всем темам в кластере Kafka, вы можете просто использовать флаг --list с бегуном kafka-consumer-groups.

$ kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --list

Вывод будет содержать имена всех групп потребителей:

test-consumer-group-1
test-consumer-group-2
test-consumer-group-3
...

10. Получите подробную информацию о конкретной группе потребителей (смещения, отставание и т. д.)

Теперь, чтобы получить более подробную информацию о конкретной группе потребителей, вы можете использовать флаг --describe вместе с именем группы потребителей (это можно вывести с помощью команды, которую мы продемонстрировали в предыдущем разделе).

$ kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe \
  --group my-group

И вывод будет содержать следующую информацию:

TOPIC 
PARTITION 
CURRENT-OFFSET 
LOG-END-OFFSET 
LAG 
CONSUMER-ID 
HOST 
CLIENT-ID

11. Получите список всех активных участников в группе потребителей

Опцию --describe также можно комбинировать с дополнительными опциями, чтобы получить еще больше информации о конкретной группе потребителей.

Чтобы перечислить всех активных участников в определенной группе потребителей, вы также можете указать флаг --members:

$ kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe \
  --group my-group \
  --members

Вот пример вывода приведенной выше команды:

CONSUMER-ID          HOST           CLIENT-ID       #PARTITIONS
consumer1-3fc8d..   /127.0.0.1      consumer1       2
consumer4-117fe..   /127.0.0.1      consumer4       1
consumer2-e76ea..   /127.0.0.1      consumer2       3
consumer3-ecea4..   /127.0.0.1

Чтобы получить еще больше подробностей, вы также можете использовать параметр --verbose, который также будет сообщать о разделах, назначенных каждому участнику.

$ kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --describe \
  --group my-group \
  --members \
  --verbose

И результат должен выглядеть так, как показано ниже.

CONSUMER-ID HOST       CLIENT-ID  #PARTITIONS ASSIGNMENT
..          /127.0.0.1 consumer1  2           topic1(0), topic2(0)
..          /127.0.0.1 consumer4  1           topic3(2)
..          /127.0.0.1 consumer2  3           topic2(1), topic3(0,1)
..          /127.0.0.1  consumer3 0           -

12. Увеличьте количество разделов тем Kafka

Если по какой-либо причине вам нужно увеличить количество разделов для определенной темы, то вы можете использовать флаг --alter для того, чтобы указать новое — увеличить — количество разделов.

$ kafka-topics \
  --bootstrap-server localhost:9092 \
  --alter \
  --topic topic-name \
  --partitions 40

13. Добавьте конфигурацию в Kafka Broker

Еще одна распространенная вещь, которую вам может понадобиться сделать, это добавить дополнительные параметры конфигурации для определенного (или даже для всех) брокера.

Например, предположим, что мы хотим указать количество фоновых потоков, которые будут использоваться для очистки журнала. Это поведение можно настроить с помощью параметра конфигурации log.cleaner.threads.

Для этого вы можете запустить бегун kafka-configs вместе с информацией о брокере и --add-config, как показано ниже.

$ kafka-configs \
  --bootstrap-server localhost:9092 \
  --entity-type brokers \
  --entity-name 0
  --alter \
  --add-config log.cleaner.threads=2

Если вы хотите добавить конфигурацию ко всем брокерам в кластере, вы можете использовать флаг --entity-default (вместо указания конкретного брокера через (--entity-name broker-id), как показано ниже.

$ kafka-configs \
  --bootstrap-server localhost:9092 \
  --entity-type brokers \
  --entity-default \
  --alter \
  --add-config log.cleaner.threads=2

Здесь важно отметить, что для эффективной работы некоторых конфигураций может потребоваться перезапуск брокеров. Вы можете проверить режим обновления каждой отдельной конфигурации брокера в соответствующем разделе официальной документации.

14. Удаление конфигурации из брокера Kafka

С другой стороны, вы также можете удалить некоторые конфигурации из определенного брокера. Все, что вам нужно, это указать идентификатор брокера (--entity-name) и флаг --delete-config, чтобы указать опцию, которую вы хотите удалить.

$ kafka-configs \
  --bootstrap-server localhost:9092 \
  --entity-type brokers \
  --entity-name 0 \
  --alter \
  --delete-config log.cleaner.threads

Опять же, вместо того, чтобы указывать идентификатор брокера через --entity-name, вы можете указать флаг --entity-default, если хотите, чтобы удаление было эффективным для каждого брокера в кластере.

15. Опишите текущие конфигурации динамического брокера для брокера Kafka.

Теперь, чтобы получить текущую конфигурацию конкретного брокера, вы можете указать идентификатор брокера (например, --entity-name) вместе с флагом --describe при выполнении kafka-configs runner.

$ kafka-configs \
  --bootstrap-server localhost:9092 \
  --entity-type brokers \
  --entity-name 0 
  --describe

Похожие статьи, которые вам также могут понравиться: