Клиент kafka отправляет запрос на раздел, где упал брокер

Я использую модуль kafka-node для отправки сообщения kafka. В кластерной среде, где у меня есть тема с 3 разделами и фактором репликации как 3.

Описание темы -

Topic:clusterTopic      PartitionCount:3        ReplicationFactor:3    Configs:min.insync.replicas=2,segment.bytes=1073741824
        Topic: clusterTopic     Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: clusterTopic     Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 1,2,3
        Topic: clusterTopic     Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 1,2,3

Конфигурация производителя -

        "requireAcks": 1,
        "attributes": 2,
        "partitionerType": 2,
        "retries": 2

Когда я отправляю данные, он следует за типом раздела как циклический (2), как в циклическом режиме

когда я следую нижеприведенным шагам

  • Получите экземпляр HighLevelProducer, подключенный к kafka: 9092, kafka: 9093
  • Отправить сообщение
  • остановить kafka-server: 9092 вручную
  • попробуйте отправить другое сообщение с помощью HighLevelProducer, и send () вызовет обратный вызов с ошибкой: TimeoutError: Время ожидания запроса истекло через 30000 мс

Я ожидаю, что если раздел недоступен (поскольку брокер не работает), производитель должен автоматически отправлять данные в следующий доступный раздел, но я теряю сообщение из-за исключения

Исключение составляет следующее -

  TimeoutError: Request timed out after 3000ms
    at new TimeoutError (\package\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
    at Timeout.timeoutId._createTimeout [as _onTimeout] (\package\node_modules\kafka-node\lib\kafkaClient.js:980:14)
    at ontimeout (timers.js:424:11)
    at tryOnTimeout (timers.js:288:5)
    at listOnTimeout (timers.js:251:5)
    at Timer.processTimers (timers.js:211:10)
(node:56416) [DEP0079] DeprecationWarning: Custom inspection function on Objects via .inspect() is deprecated
  kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
  kafka-node:KafkaClient createBroker kafka1 9092 +1ms
  kafka-node:KafkaClient kafka-node-client reconnecting to kafka1:9092 +3s
  kafka-node:KafkaClient createBroker kafka1 9092 +0ms

person Anand Jain    schedule 23.09.2019    source источник
comment
Вы уверены, что репликация работает? Отображаются ли в синхронизированных репликах всех доступных брокеров?   -  person OneCricketeer    schedule 23.09.2019
comment
Пожалуйста, добавьте более подробную информацию к вашему вопросу, а именно настройку min.insync.replicas для темы и acks, retries и delivery.timeout.ms в вашем разработчике.   -  person mazaneicha    schedule 24.09.2019
comment
@ cricket_007 да репликация работает нормально (см. описание темы)   -  person Anand Jain    schedule 24.09.2019


Ответы (1)


Пожалуйста, отправьте серверы начальной загрузки для подтверждения, но я полагаю, что вы испытываете, основываясь на имеющейся информации, следующее:

  • У вас установлено значение min.insync.replicas равным 2
  • У вас есть acks, установленное на 1

С этими настройками производитель отправит событие реплике лидера и предположит, что сообщение безопасно.

Если он не удастся сразу после отправки, но до того, как последователи догнали его, вы потеряете сообщение, так как ожидаете только одного подтверждения.

Однако с точки зрения брокера вы указываете, что для доступности темы требуется наличие двух синхронизируемых реплик. По умолчанию только синхронизированные реплики могут быть избраны лидерами. Поскольку отказ первого из них приведет к рассинхронизации подписчиков, ваша тема может быть отключена. Вы можете проверить это в своих тестах, предполагая некоторые настройки.

Чтобы исправить это, попробуйте следующее:

  1. Если высокая доступность наиболее важна, установите min.insync.replicas на 1 и acks на 1.
  2. Если потеря данных неприемлема, установите min.insync.replicas равным 2 и подтвердите все

Вы также можете установить unclean.leader.election.enable на true для высокой доступности, так как это позволит выбрать лидером несинхронизированную реплику, но тогда существует вероятность потери данных.

person Charl    schedule 03.10.2019
comment
Эй, я не думаю, что проблема связана с репликами ack и insync (я пробовал то, что вы предложили по-прежнему с такой же ошибкой у производителя), но это связано с partitionerType, который является циклическим. Таким образом, данные отправляются в раздел 1,2, а затем в 3, но если я остановлю брокера, где присутствует какой-либо из разделов, модуль попытается отправить его туда же. Я хочу, чтобы он обнаружил следующий доступный раздел / брокера и повторно отправил данные - person Anand Jain; 04.10.2019
comment
Вы пытались установить unclean.leader.election.enable на true? Если производитель отправит запрос мертвому брокеру, он завершится ошибкой и впоследствии попробует другого брокера, но только при наличии лидера для этого раздела. Глядя на свои показатели с точки зрения брокера, видите ли вы какие-либо автономные разделы, когда это происходит? - person Charl; 04.10.2019
comment
Я попытался установить для unclean.leader.election.enable значение true, но я все еще получаю сообщение об ошибке подключения для отключенного брокера. Он повторяет попытку 5 раз, а затем останавливается с исключением KafkaJSNumberOfRetriesExceeded. предыдущая ошибка - ошибка подключения: подключите ECONNREFUSED к брокеру - person Anand Jain; 04.10.2019
comment
Не могли бы вы установить это в конфигурации своего производителя и отправить журнал: logLevel: logLevel.DEBUG - person Charl; 04.10.2019
comment
Обновил вопрос за исключением - person Anand Jain; 04.10.2019