Есть ли в приложении Kafka Streams способ определить топологию с использованием подстановочного списка тем вывода?

У меня есть приложение Kafka Streams с несколькими схемами, которое обогащает запись через соединение с KTable, а затем передает обогащенную запись.

Формат именования входных тем в настоящее время четко определен, но я меняю его на подстановочный знак. Я хочу определить входную тему каждой записи, получить выходную тему с помощью замены регулярного выражения и отправить ее.

Например. Во время прослушивания event.raw.* запись приходит на event.raw.foo, и я хочу передать ее на event.foo.

Я понимаю, что могу получить темы ввода через Processor API:

public class EnrichmentProcessor extends AbstractProcessor<String, GenericRecord> {

    @Override
    public void process(String key, GenericRecord value) {
        //Do Join...

        //Determine output topic and forward
        String outputTopic = context().topic().replaceFirst(".raw.", ".");
        context().forward(key, value, To.child(outputTopic));
        context().commit();
    }
}

Но это не помогает мне, когда я пытаюсь определить свою топологию, потому что у меня нет возможности заранее узнать, какой будет моя тема вывода.

  InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder();
        topologyBuilder.addSource("SOURCE", stringDeserializer, genericRecordDeserializer, "event.raw.*")
        .addProcessor("ENRICHER", EnrichmentProcessor::new, "SOURCE")
        .addSink("OUTPUT", outputTopic, stringSerializer, genericRecordSerializer, "ENRICHER"); // How can I register all possible output topics here?

Кто-нибудь раньше решал подобную ситуацию?

Я знаю, что если бы у меня был список возможных имен выходных тем, я мог бы определить несколько приемников в топологии, но я не собираюсь этого делать.

Есть ли способ определить топологию для динамически выделяемых имен выходных тем, когда у меня нет заранее жестко закодированного списка возможных имен выходных тем?


person larjudge    schedule 24.06.2019    source источник


Ответы (1)


Это должно быть возможно: вы можете использовать Topology#addSink(..., new TopicNameExtractor(){...}, ...) для динамической установки имени выходной темы. TopicNameExtractor имеет доступ к RecordContext, который позволяет вам получить имя темы ввода через context.topic(). Следовательно, вы должны иметь возможность вычислить имя выходной темы на основе имени входной темы.

person Matthias J. Sax    schedule 24.06.2019
comment
Привет, Матиас, спасибо, что так быстро вернулся. Извините за задержку с ответом. Я реализовал это в своей кодовой базе, но это не дает мне именно того, что мне нужно. Функция возвращает event.raw.* вместо event.raw.foo. - person larjudge; 27.06.2019
comment
Это странно. context.topic() должен возвращать не шаблон, а фактическое название темы. Не уверен, почему что-то пошло не так. Вы можете поделиться своим кодом? Как вам протестировать код? - person Matthias J. Sax; 27.06.2019