Как направлять сообщения из kafka-sink в несколько тем

У меня есть модуль http-процессора spring-xd с http-outbound-gateway, который имеет errorChannel и outputChannel. Любое сообщение с HTTP 200 приходит в outputChannel, а остальные попадают в failureChannel.

Прямо сейчас модуль http-процессора подключается к Kafka-Sink с помощью исходящего адаптера kafka с TopicX. TopicX получает только сообщения HTTP 200 для дальнейшей обработки. Теперь нам нужно, чтобы сообщения в failureChannel направлялись в TopicY.

Как я могу отправлять сообщения в несколько тем kafka в kafka-sink. У меня есть httpStatusCode в заголовке сообщения. Версия Kafka, используемая в моем проекте, — 0.8.2, а версия Java — 1.7.

<!-- http-processor-config -->
<int-http:outbound-gateway
        request-channel="input"
        url-expression="'myUrlLink'"
        http-method="POST"
        expected-response-type="java.lang.String"
        charset="UTF-8"
        reply-timeout="10"
        reply-channel="output">

        <int-http:request-handler-advice-chain>
                    <bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
                        <property name="recoveryCallback">
                            <bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
                                <constructor-arg ref="errorChannel" />
                            </bean>
                        </property>
                        <property name="retryTemplate" ref="retryTemplate" />
                    </bean>
        </int-http:request-handler-advice-chain>

</int-http:outbound-gateway>


<!-- Handle failed messages and route to failureChannel for specific http codes-->
<int:service-activator input-channel="errorChannel" ref="customErrorHandler" method="handleFailedRequest" output-channel="failureChannel"/>

В Kafka Sink у меня есть следующий контекст производителя:

    <int-kafka:producer-context id="kafkaProducerContext">
    <int-kafka:producer-configurations>
        <int-kafka:producer-configuration broker-list="localhost:9092"
                                          topic="${topicX}"
                                          key-class-type="java.lang.String"
                                          key-serializer="serializer"
                                          value-class-type="[B"
                                          value-serializer="valueSerializer"/>
    </int-kafka:producer-configurations>
</int-kafka:producer-context>

person Vidhya    schedule 28.03.2017    source источник


Ответы (2)


Правда не поддерживается и не будет. Spring XD уже в этом году EOL. Всем рекомендуется перейти на Spring Cloud Data Flow.

Для вашего варианта использования вы можете отредактировать конфигурацию модуля Kafka Sink. Добавьте еще один <int-kafka:outbound-channel-adapter> для другой темы. Чтобы решить, куда в какую тему отправлять входящее сообщение, можно добавить <router> в этот конфиг.

Или просто рассмотрите возможность использования Router Sink. И иметь два отдельных потока для каждого типа сообщений и, следовательно, каждой темы.

person Artem Bilan    schedule 28.03.2017

У меня наконец заработало. Прямо сейчас я нашел обходной путь с версией 0.8.x, добавив сплиттер в модуль http-процессора и добавив переменную kafka_topic в заголовок сообщения. Основываясь на коде состояния HTTP, я просто устанавливаю разные темы.

В Kafka-sink я добавил еще одну конфигурацию производителя с новой переменной имени темы, установленной через параметры XD. Я не могу придумать никакого другого решения, потому что повторно использую модуль kafka-source и kafka-sink в нескольких потоках.

Этот конкретный приемник kafka отправляет ввод в другой поток XD. Итак, добавлен фильтр заголовков для удаления kafka_topic в модуле kafka-source при запуске следующего потока.

Чтобы узнать больше: http://docs.spring.io/autorepo/docs/spring-kafka-dist/1.0.2.RELEASE/reference/html/_spring_integration.html

Найдите строки, чтобы установить целевую тему кафки. Это ключ.

person Vidhya    schedule 04.04.2017