Вопросы по теме 'kafka-producer-api'

ИНФОРМАЦИЯ Закрытое сокетное соединение для клиента /127.0.0.1:48452 с идентификатором сеанса 0x15698f5ac360001 (org.apache.zookeeper.server.NIOServerCnxn)
Я установил свежий зоопарк и кафку. Я начал их обоих. Затем, когда я хочу увидеть список тем с помощью этой команды: bin/kafka-topics.sh --list --zookeeper localhost 2181 Это дает мне закрытое соединение сокета. Вот снимок экрана:...
16651 просмотров

Как направлять сообщения из kafka-sink в несколько тем
У меня есть модуль http-процессора spring-xd с http-outbound-gateway, который имеет errorChannel и outputChannel. Любое сообщение с HTTP 200 приходит в outputChannel, а остальные попадают в failureChannel. Прямо сейчас модуль http-процессора...
1017 просмотров

Обойти Zookeeper в клиентах производителя/потребителя?
Это дополнительный вопрос к предыдущему обсуждению . Я думаю о Zookeeper как о координаторе экземпляров брокера Kafka или «шины сообщений». Я понимаю, почему мы можем захотеть, чтобы клиенты-производители/потребители совершали транзакции через...
216 просмотров

Как измерить время, затраченное на перебалансировку kafka?
Я очень плохо знаком с Кафкой. Так что этот вопрос может быть очень простым. Чего я пытаюсь добиться, так это выяснить время, необходимое для перебалансировки, когда брокер терпит неудачу, а затем снова добавляется. Из моего чтения документации (...
640 просмотров
schedule 12.10.2022

Подключите клиент CometD к производителю Kafka
Можно ли подключить клиент cometD к производителю Kafka? Какие-либо предложения? В настоящее время у меня есть клиент CometD на python, который извлекает данные в реальном времени из объекта Salesforce. Теперь я хочу передать эти данные...
911 просмотров

Когда вызывать метод закрытия KafkaProducer?
Как сказано в документе Кафки, Производитель является потокобезопасным, и совместное использование одного экземпляра производителя между потоками, как правило, будет быстрее, чем наличие нескольких экземпляров. Итак, у меня есть следующий...
1616 просмотров
schedule 04.02.2024

Какое свойство производителя Kafka использовать, если я больше не хочу получать данные после определенного порога?
Какое свойство производителя Kafka использовать для достижения следующего? Я использую UDP Kafka Bridge , который отправляет сообщение от UDP порт в тему Кафка. Если объем памяти на производителе Kafka превышает определенный МБ (скажем, 300 МБ),...
107 просмотров
schedule 14.04.2024

Когда закрывать производителя или потребителя
В последнее время у нас возникают некоторые проблемы с производительностью наших потребителей и производителей Kafka. Мы используем API Kafka Java в scala. Что считается хорошей практикой при открытии и закрытии объектов потребителей и...
1573 просмотров

Настройка кафки в aws ec2
У меня есть настройка какфа на экземпляре ec2. Я назначил эластичный IP-адрес этому экземпляру. Я могу запустить хранителя зоопарка и кафку и создавать темы. Я не могу подключиться к брокеру с моей локальной машины. Когда я искал, я понял, что мне...
186 просмотров

Приложение должно продолжать работать даже после разрыва соединения с Kafka.
Я публикую сообщения для внешней команды, которая получает сообщения для Kafka и возвращает нам ответ. Всякий раз, когда наше приложение работает и пытается отправить сообщение о публикации в Kafka, и если соединение с Kafka потеряно или отключено,...
84 просмотров

Есть ли в приложении Kafka Streams способ определить топологию с использованием подстановочного списка тем вывода?
У меня есть приложение Kafka Streams с несколькими схемами, которое обогащает запись через соединение с KTable, а затем передает обогащенную запись. Формат именования входных тем в настоящее время четко определен, но я меняю его на подстановочный...
357 просмотров

Клиент kafka отправляет запрос на раздел, где упал брокер
Я использую модуль kafka-node для отправки сообщения kafka. В кластерной среде, где у меня есть тема с 3 разделами и фактором репликации как 3. Описание темы - Topic:clusterTopic PartitionCount:3 ReplicationFactor:3...
459 просмотров

Spring Cloud Stream и Kafka: ошибка сериализации Kafka Producer
Я новичок в Spring Cloud Stream и Kafka. Я получаю следующую ошибку от производителя kafka при отправке строки в полезной нагрузке. Любая помощь или идеи приветствуются. Я пробовал использовать сериализатор / десериализатор bytearray, а также json...
3396 просмотров

не могу читать \ писать из темы кафки
У меня Kafka работает в контейнере на моем рабочем столе. Я могу легко подключиться к нему с помощью инструмента под названием «Kafka tool», где, например, я могу просматривать свои темы. У меня проблемы с чтением и записью в / из темы Kafka. что...
580 просмотров

Транзакционный производитель Kafka
Я пытаюсь сделать моего производителя kafka транзакционным. Я отправляю 10 сообщений. Если возникает какая-либо ошибка, не следует отправлять сообщения в kafka, т.е. ничего или все. Я использую Spring Boot KafkaTemplate. @Configuration...
595 просмотров

Как отправлять сообщения о ключах и значениях с помощью производителя консоли kafka
У меня есть вариант использования, когда мне нужно отправлять сообщения о ключевом значении с помощью Kafka Console Producer. Итак, как добиться этого с помощью команды Kafka Console Producer ?
14477 просмотров

Создание сообщения avro с использованием php-enqueue
Я изучаю способ создания сообщений avro с php на kafka, используя php-enqueue . В их документации указано, что вы можете использовать другие форматы, включая Apache Avro. По умолчанию транспорт сериализует сообщения в формате json, но вы...
379 просмотров

Как выбрать объект класса Case в качестве DataFrame в Kafka-Spark Structured Streaming
У меня есть класс случая: case class clickStream(userid:String, adId :String, timestamp:String) экземпляр которого я хочу отправить с KafkaProducer как: val record = new ProducerRecord[String,clickStream]( "clicktream",...
231 просмотров