From bc3b7c2f94067b588538868e0d60fb9eb4650060 Mon Sep 17 00:00:00 2001 From: Steve Skrla Date: Sun, 14 Mar 2021 09:31:50 -0500 Subject: [PATCH] Per #345, log the topology that is being started. --- .../kafka/streams/KafkaStreamsFactory.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java index ac1023213..9189c8b57 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java @@ -20,9 +20,13 @@ import io.micronaut.context.annotation.Context; import io.micronaut.context.annotation.EachBean; import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Parameter; import io.micronaut.context.event.ApplicationEventPublisher; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.PreDestroy; import javax.inject.Singleton; @@ -41,6 +45,8 @@ @Factory public class KafkaStreamsFactory implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsFactory.class); + private final Map streams = new ConcurrentHashMap<>(); private final ApplicationEventPublisher eventPublisher; @@ -84,15 +90,20 @@ public Map getStreams() { @EachBean(ConfiguredStreamBuilder.class) @Context KafkaStreams kafkaStreams( + @Parameter String name, ConfiguredStreamBuilder builder, KStream... kStreams ) { + Topology topology = builder.build(builder.getConfiguration()); KafkaStreams kafkaStreams = new KafkaStreams( - builder.build(builder.getConfiguration()), + topology, builder.getConfiguration() ); eventPublisher.publishEvent(new BeforeKafkaStreamStart(kafkaStreams, kStreams)); streams.put(kafkaStreams, builder); + if(LOG.isDebugEnabled()) { + LOG.debug("Initializing Application {} with topology:\n{}", name, topology.describe().toString()); + } kafkaStreams.start(); eventPublisher.publishEvent(new AfterKafkaStreamsStart(kafkaStreams, kStreams)); return kafkaStreams;