Вопросы по теме 'apache-kafka-streams'

Kafka Streams с поисковыми данными в HDFS
Я пишу приложение с Kafka Streams (v0.10.0.1) и хочу обогатить обрабатываемые записи данными поиска. Эти данные (файл с отметкой времени) записываются в каталог HDFS ежедневно (или 2-3 раза в день). Как я могу загрузить это в приложение Kafka...
867 просмотров

API потребителя/производителя Kafka
Как подсчитать, сколько всего сообщений было отправлено в тему в Kafka и сколько было потреблено или зафиксировано потребителем в то время? Я инициирую коннектор kafka как- Map<String, String> kafkaParams = new HashMap<>();...
79 просмотров

При запросе глобального хранилища состояний в Kafka Streams возникает исключение Null
При создании глобальной таблицы KTable с использованием API процессора и функции addGlobalStore в результирующем хранилище отображается ОК. Но последующие попытки перебрать содержимое магазина приводят к следующему исключению: Exception in...
2924 просмотров
schedule 05.11.2023

Агрегируйте большой объем данных с помощью потоков Kafka
Я пытаюсь агрегировать большой объем данных, используя временные окна разного размера, используя Kafka Streams. Я увеличил размер кеша до 2 ГБ, но когда я установил размер окна в 1 час, я получил загрузку процессора на 100%, и приложение начало...
626 просмотров

Поток Kafka с HTTP-запросом
Я хочу реализовать следующий поток потока кафки from(kafka topic) -> transform (here should be http request) -> to (kafka topic) Правильно ли устанавливать http-запрос во время трансформации в потоке kafka или правильнее использовать...
615 просмотров
schedule 20.01.2024

Почему моя топология Kafka Streams не воспроизводится/не обрабатывается правильно?
У меня есть топология, которая выглядит так: KTable<ByteString, User> users = topology.table(USERS); KStream<ByteString, JoinRequest> joinRequests = topology.stream(JOIN_REQUESTS) .mapValues(entityTopologyProcessor::userNew)...
779 просмотров
schedule 13.11.2023

Kafka Streams GlobalKTable синхронизация с приложениями
При работе с обычными k-потоками kafka сохраняет смещения каждого приложения по своим внутренним темам смещения. При перезапуске приложения приложения повторно обрабатывают темы в зависимости от auto.offset.reset политики. Это действительно...
1554 просмотров
schedule 21.04.2024

Потоки Kafka и работа с окнами для подсчета во временном окне
Я новичок в Stackoverflow, так что простите меня, если вопрос задан неправильно. Любая помощь/вдохновение очень ценится! Я использую потоки Kafka для фильтрации входящих данных в мою базу данных. Входящие сообщения выглядят как...
3370 просмотров
schedule 19.11.2023

Приведение потоков Kafka к проблемам со строками с помощью KTable при группировке и агрегировании
У меня есть поток Kafka с входящими сообщениями, который выглядит как sensor_code: x, time: 1526978768, address: Y . Я хочу создать KTable, в котором будет храниться каждый уникальный адрес для каждого кода датчика. KTable KTable<String,...
1530 просмотров

Потоки Kafka: потоки потоков против хранилища
Допустим, у нас есть экземпляр со следующей конфигурацией и состояниями, поддерживаемыми в магазине. тема - 1 разделы - 6 num.stream.threads - 6 Toplogy источник - 1 низкоуровневые процессоры - 3 (по одному процессору для ежедневного,...
264 просмотров
schedule 16.10.2023

потоковая передача kafka: java.nio.file.DirectoryNotEmptyException
У нас возникла проблема с удалением каталога состояний в потоковом приложении Kafka. Мы запускаем приложение на собственной контейнерной платформе. Мы будем очень признательны за понимание этого вопроса. Журнал исключения: 2018-09-18 09:...
1216 просмотров
schedule 28.04.2024

Как запланировать периодическое задание по количеству обработанных сообщений?
Я хочу использовать API процессора Kafka для обработки сообщений от Kafka. Я хотел бы периодически вызывать какую-то функцию - что-то вроде: context.schedule(IntervalMS,punctuationType, somePunctuator) , где somePunctuator выполняет какое-то...
101 просмотров
schedule 14.10.2022

Модульное тестирование топологии kafka, использующей соединения kstream
У меня есть топология, которая выполняет два соединения kstream, проблема, с которой я сталкиваюсь, заключается в попытке выполнить модульное тестирование с помощью TopologyTestDriver, отправляющего пару ConsumerRecords с помощью pipeInput, а затем...
574 просмотров

Как прослушивать несколько тем с помощью нескольких StreamListener Spring Cloud Stream и Kafka Stream
Я хочу послушать две темы Kafka, как в приведенном ниже коде, и есть два исходных события, которые необходимо обработать и преобразовать в другое событие. Итак, я хочу послушать эти два события в одном EnableBinding...
1807 просмотров

Как использовать интерактивный запрос в топологии процесса kafka в Spring-Cloud-Stream?
Можно ли использовать интерактивный запрос (InteractiveQueryService) в Spring Cloud Stream в классе с аннотацией @EnableBinding или внутри метода с @StreamListener? Я попытался создать экземпляр ReadOnlyKeyValueStore в предоставленном...
924 просмотров

Spring-Cloud-Stream обрабатывает сообщения kafka только после того, как хранилища состояний, материализованные при запуске приложения, были полностью заполнены и готовы
Ссылаясь на это решение , мой файл spring-cloud-stream application.yml имеет следующую конфигурацию: #application.yml spring.cloud.stream.bindings.input: destination: my-topic-name contentType: application/json consumer:...
923 просмотров

Есть ли в приложении Kafka Streams способ определить топологию с использованием подстановочного списка тем вывода?
У меня есть приложение Kafka Streams с несколькими схемами, которое обогащает запись через соединение с KTable, а затем передает обогащенную запись. Формат именования входных тем в настоящее время четко определен, но я меняю его на подстановочный...
357 просмотров

Ошибка приложения Kafka-streams с использованием метода mapValue() с Gson
Я пишу приложение kafka-streams, которое получает данные из темы «topic_one» (данные были получены из MySQL). Затем я хочу получить часть (раздел «после», см. ниже) этих данных с интерфейсом KStream для выполнения других операций. Но у меня ошибка с...
1231 просмотров
schedule 15.12.2023

Почему я должен использовать KStream или KTable?
Я читал, но не понимал слишком многого. Я читал, что могу использовать KTable вместо сжатия журнала. Или у него есть еще много возможностей. Однако мне не удалось найти на этот счет хорошего примера. Я тоже не увидел этого в хорошем источнике,...
1007 просмотров
schedule 11.08.2022

Потоки Kafka: присоединяйтесь во время загрузки
У меня есть две темы довольно разного объема (может быть что-то вроде 1000 событий, генерируемых в левой теме для каждого события в правой теме). Я пытаюсь leftJoin объединить эти две темы, и у меня сложилось впечатление, что окно соединения...
162 просмотров