Skip to content

Commit

Permalink
Per #345, log the topology that is being started.
Browse files Browse the repository at this point in the history
  • Loading branch information
sskrla authored and sskrla committed Mar 14, 2021
1 parent 48c0e1b commit bc3b7c2
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.event.ApplicationEventPublisher;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PreDestroy;
import javax.inject.Singleton;
Expand All @@ -41,6 +45,8 @@
@Factory
public class KafkaStreamsFactory implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsFactory.class);

private final Map<KafkaStreams, ConfiguredStreamBuilder> streams = new ConcurrentHashMap<>();

private final ApplicationEventPublisher eventPublisher;
Expand Down Expand Up @@ -84,15 +90,20 @@ public Map<KafkaStreams, ConfiguredStreamBuilder> getStreams() {
@EachBean(ConfiguredStreamBuilder.class)
@Context
KafkaStreams kafkaStreams(
@Parameter String name,
ConfiguredStreamBuilder builder,
KStream<?, ?>... kStreams
) {
Topology topology = builder.build(builder.getConfiguration());
KafkaStreams kafkaStreams = new KafkaStreams(
builder.build(builder.getConfiguration()),
topology,
builder.getConfiguration()
);
eventPublisher.publishEvent(new BeforeKafkaStreamStart(kafkaStreams, kStreams));
streams.put(kafkaStreams, builder);
if(LOG.isDebugEnabled()) {
LOG.debug("Initializing Application {} with topology:\n{}", name, topology.describe().toString());
}
kafkaStreams.start();
eventPublisher.publishEvent(new AfterKafkaStreamsStart(kafkaStreams, kStreams));
return kafkaStreams;
Expand Down

0 comments on commit bc3b7c2

Please sign in to comment.