У меня есть приложение Kafka Streams с несколькими схемами, которое обогащает запись через соединение с KTable, а затем передает обогащенную запись.
Формат именования входных тем в настоящее время четко определен, но я меняю его на подстановочный знак. Я хочу определить входную тему каждой записи, получить выходную тему с помощью замены регулярного выражения и отправить ее.
Например. Во время прослушивания event.raw.*
запись приходит на event.raw.foo
, и я хочу передать ее на event.foo
.
Я понимаю, что могу получить темы ввода через Processor API:
public class EnrichmentProcessor extends AbstractProcessor<String, GenericRecord> {
@Override
public void process(String key, GenericRecord value) {
//Do Join...
//Determine output topic and forward
String outputTopic = context().topic().replaceFirst(".raw.", ".");
context().forward(key, value, To.child(outputTopic));
context().commit();
}
}
Но это не помогает мне, когда я пытаюсь определить свою топологию, потому что у меня нет возможности заранее узнать, какой будет моя тема вывода.
InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder();
topologyBuilder.addSource("SOURCE", stringDeserializer, genericRecordDeserializer, "event.raw.*")
.addProcessor("ENRICHER", EnrichmentProcessor::new, "SOURCE")
.addSink("OUTPUT", outputTopic, stringSerializer, genericRecordSerializer, "ENRICHER"); // How can I register all possible output topics here?
Кто-нибудь раньше решал подобную ситуацию?
Я знаю, что если бы у меня был список возможных имен выходных тем, я мог бы определить несколько приемников в топологии, но я не собираюсь этого делать.
Есть ли способ определить топологию для динамически выделяемых имен выходных тем, когда у меня нет заранее жестко закодированного списка возможных имен выходных тем?