Демонстрация использования наиболее часто используемых команд интерфейса командной строки Kafka
Введение
Apache Kafka — одна из наиболее часто используемых технологий, упрощающих создание архитектур с равномерной потоковой передачей. Несмотря на то, что его часто считают *просто* брокером сообщений (например, как RabbitMQ), Kafka определенно намного больше, чем просто это.
Kafka был внутренним проектом LinkedIn, исходный код которого был открыт в 2011 году и быстро превратился из брокера сообщений в полноценную платформу, обеспечивающую равномерную потоковую передачу в высоко масштабируемом, отказоустойчивом распределенном режиме.
В сегодняшней статье мы рассмотрим некоторые из наиболее часто используемых команд в интерфейсе командной строки Kafka. Не забудьте добавить это руководство в закладки, так как есть вероятность, что вам придется довольно часто обращаться к некоторым частям при выполнении определенных задач администрирования в кластерах Kafka.
1. Как перечислить все темы в кластере Kafka
Если вы хотите перечислить темы, включенные в конкретный брокер, следующая команда сделает свое дело:
$ kafka-topics \
--bootstrap-server localhost:9092 \
--list
Обратите внимание, что в более старых версиях вы также можете использовать конечную точку Zookeeper, как показано ниже:
$ kafka-topics \
--zookeeper localhost:2181 \
--list
Поскольку последние выпуски Kafka удалят зависимость от Zookeeper, я бы посоветовал использовать первую команду.
Наконец, если вы хотите перечислить темы по всему кластеру, обязательно включите всех брокеров в --bootstrap-server
:
$ kafka-topics \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--list
2. Как удалить тему из Kafka
Чтобы удалить топик из кластера, нужно передать флаг --delete
вместе с брокерами и именем удаляемого топика.
$ kafka-topics \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094 \
--delete \
--topic topic_for_deletion
3. Удалите несколько тем из Kafka с помощью регулярного выражения
При удалении тем из брокеров Kafka вы даже можете указать выражения, подобные регулярным выражениям, чтобы удалить несколько тем за один раз. Например, предположим, что мы хотим удалить все темы, начинающиеся с префикса test-
. Следующая команда должна помочь:
$ kafka-topics \
--bootstrap-server localhost:9092,localhost:9093,localhost:9094
--delete \
--topic 'test-.*'
4. Как создать тему Кафки
Теперь, если вы хотите создать новую тему, вы можете просто сделать это, используя опцию --create
в kafka-topics
runner.
$ kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic topic-name
Обратите внимание, что при создании раздела вы можете указать некоторые дополнительные параметры конфигурации, такие как количество разделов и/или коэффициент репликации. Например, чтобы создать тему с тремя разделами и коэффициентом репликации 2, подойдет следующая команда:
$ kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic topic-name \
--partitions 3 \
--replication-factor 2
5. Получите подробную информацию о теме Kafka (количество разделов, реплики и т. д.)
Если вы хотите получить детали конфигурации определенной темы, вы можете использовать опцию --describe
с kafka-topics
runner.
$ kafka-topics \
--bootstrap-server localhost:9092 \
--describe \
--topic topic-name
Вывод команды предоставит вам такую информацию, как количество разделов, коэффициент репликации, лидер, реплики и ISR, а также дополнительные конфигурации (например, период хранения, если значение по умолчанию было изменено).
Topic:topic-name PartitionCount:1 ReplicationFactor:1 Configs:
Topic: topic-name Partition: 0 Leader: 0 Replicas: 0 Isr: 0
6. Как производить данные по теме Кафки
Теперь приступим к генерации данных. Чтобы получить данные по определенной теме, все, что вам нужно, это бегун kafka-console-consumer
:
$ kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic topic-name
> One event
> Oh, another event!
Если вы также хотите установить ключи для каждого сгенерированного сообщения, вы можете использовать некоторые дополнительные параметры (например, включить ключи и символ-разделитель ключей), как показано ниже:
$ kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic topic-name \
--property "parse.key=true" \
--property "key.separator=:"
> key1:One event
> key2:Oh, another event!
7. Как использовать данные из темы Kafka
С другой стороны, если вы хотите получать в терминале сообщения из определенной темы Kafka, вам понадобится только kafka-console-consumer
:
$ kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic topic-name \
--from-beginning
One event
Oh, another event!
Если вы также хотите распечатать ключи, вы также можете указать символ-разделитель ключей, флаги для включения печати как для ключа, так и для значения, а также десериализаторы ключа и значения.
Например,
$ kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic topic-name \
--from-beginning \
--property key.separator="-" \
--property print.key=true \
--property print.value=true \
--property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
8. Как очистить тему Кафки
Теперь, чтобы очистить тему Kafka и удалить все сообщения, вы можете просто временно изменить период хранения на одну секунду, а затем снова изменить его.
Чтобы изменить удержание на одну секунду, вы можете просто использовать флаг --add-config
с бегуном kafka-configs
:
$ kafka-configs \
--bootstrap-server localhost:9092 \
--alter \
--entity-type topics \
--entity-name topic-name \
--add-config retention.ms=1000
И чтобы вернуть его обратно, просто удалите этот конфиг (чтобы затем использовалось сохранение по умолчанию):
$ kafka-configs \
--bootstrap-server localhost:9092 \
--alter \
--entity-type topics \
--entity-name topic-name \
--delete-config retention.ms
9. Как составить список всех групп потребителей по всем темам
Каждая тема Кафки может иметь множество групп потребителей. Чтобы вывести список всех групп потребителей по всем темам в кластере Kafka, вы можете просто использовать флаг --list
с бегуном kafka-consumer-groups
.
$ kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--list
Вывод будет содержать имена всех групп потребителей:
test-consumer-group-1
test-consumer-group-2
test-consumer-group-3
...
10. Получите подробную информацию о конкретной группе потребителей (смещения, отставание и т. д.)
Теперь, чтобы получить более подробную информацию о конкретной группе потребителей, вы можете использовать флаг --describe
вместе с именем группы потребителей (это можно вывести с помощью команды, которую мы продемонстрировали в предыдущем разделе).
$ kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group my-group
И вывод будет содержать следующую информацию:
TOPIC
PARTITION
CURRENT-OFFSET
LOG-END-OFFSET
LAG
CONSUMER-ID
HOST
CLIENT-ID
11. Получите список всех активных участников в группе потребителей
Опцию --describe
также можно комбинировать с дополнительными опциями, чтобы получить еще больше информации о конкретной группе потребителей.
Чтобы перечислить всех активных участников в определенной группе потребителей, вы также можете указать флаг --members
:
$ kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group my-group \
--members
Вот пример вывода приведенной выше команды:
CONSUMER-ID HOST CLIENT-ID #PARTITIONS
consumer1-3fc8d.. /127.0.0.1 consumer1 2
consumer4-117fe.. /127.0.0.1 consumer4 1
consumer2-e76ea.. /127.0.0.1 consumer2 3
consumer3-ecea4.. /127.0.0.1
Чтобы получить еще больше подробностей, вы также можете использовать параметр --verbose
, который также будет сообщать о разделах, назначенных каждому участнику.
$ kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--describe \
--group my-group \
--members \
--verbose
И результат должен выглядеть так, как показано ниже.
CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT
.. /127.0.0.1 consumer1 2 topic1(0), topic2(0)
.. /127.0.0.1 consumer4 1 topic3(2)
.. /127.0.0.1 consumer2 3 topic2(1), topic3(0,1)
.. /127.0.0.1 consumer3 0 -
12. Увеличьте количество разделов тем Kafka
Если по какой-либо причине вам нужно увеличить количество разделов для определенной темы, то вы можете использовать флаг --alter
для того, чтобы указать новое — увеличить — количество разделов.
$ kafka-topics \
--bootstrap-server localhost:9092 \
--alter \
--topic topic-name \
--partitions 40
13. Добавьте конфигурацию в Kafka Broker
Еще одна распространенная вещь, которую вам может понадобиться сделать, это добавить дополнительные параметры конфигурации для определенного (или даже для всех) брокера.
Например, предположим, что мы хотим указать количество фоновых потоков, которые будут использоваться для очистки журнала. Это поведение можно настроить с помощью параметра конфигурации log.cleaner.threads
.
Для этого вы можете запустить бегун kafka-configs
вместе с информацией о брокере и --add-config
, как показано ниже.
$ kafka-configs \
--bootstrap-server localhost:9092 \
--entity-type brokers \
--entity-name 0
--alter \
--add-config log.cleaner.threads=2
Если вы хотите добавить конфигурацию ко всем брокерам в кластере, вы можете использовать флаг --entity-default
(вместо указания конкретного брокера через (--entity-name broker-id
), как показано ниже.
$ kafka-configs \
--bootstrap-server localhost:9092 \
--entity-type brokers \
--entity-default \
--alter \
--add-config log.cleaner.threads=2
Здесь важно отметить, что для эффективной работы некоторых конфигураций может потребоваться перезапуск брокеров. Вы можете проверить режим обновления каждой отдельной конфигурации брокера в соответствующем разделе официальной документации.
14. Удаление конфигурации из брокера Kafka
С другой стороны, вы также можете удалить некоторые конфигурации из определенного брокера. Все, что вам нужно, это указать идентификатор брокера (--entity-name
) и флаг --delete-config
, чтобы указать опцию, которую вы хотите удалить.
$ kafka-configs \
--bootstrap-server localhost:9092 \
--entity-type brokers \
--entity-name 0 \
--alter \
--delete-config log.cleaner.threads
Опять же, вместо того, чтобы указывать идентификатор брокера через --entity-name
, вы можете указать флаг --entity-default
, если хотите, чтобы удаление было эффективным для каждого брокера в кластере.
15. Опишите текущие конфигурации динамического брокера для брокера Kafka.
Теперь, чтобы получить текущую конфигурацию конкретного брокера, вы можете указать идентификатор брокера (например, --entity-name
) вместе с флагом --describe
при выполнении kafka-configs
runner.
$ kafka-configs \
--bootstrap-server localhost:9092 \
--entity-type brokers \
--entity-name 0
--describe
Похожие статьи, которые вам также могут понравиться: