Skip to content

Commit

Permalink
Fix recreate topology each time started
Browse files Browse the repository at this point in the history
  • Loading branch information
iyourshaw committed Dec 13, 2024
1 parent 9795a50 commit b821ae5
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class BaseTopology implements RestartableTopology{
protected abstract Logger getLogger();
protected abstract Topology buildTopology();

protected final Topology topology;
protected Topology topology;
protected KafkaStreams streams;

@Getter
Expand All @@ -31,15 +31,14 @@ public abstract class BaseTopology implements RestartableTopology{
public BaseTopology(String topicName, Properties streamsProperties) {
this.topicName = topicName;
this.streamsProperties = streamsProperties;
topology = buildTopology();
}

@Override
public void start() {
if (streams != null && streams.state().isRunningOrRebalancing()) {
throw new IllegalStateException("Start called while streams is already running.");
}

topology = buildTopology();
streams = new KafkaStreams(topology, streamsProperties);
if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler);
if (stateListener != null) streams.setStateListener(stateListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public Topology buildTopology() {

KStream<BsmIntersectionIdKey, OdeBsmData> inputStream = builder.stream(topicName, Consumed.with(JsonSerdes.BsmIntersectionIdKey(), JsonSerdes.OdeBsm()));

inputStream.foreach(controller::broadcastBSM);
inputStream.foreach((key, value) -> {
controller.broadcastBSM(key, value);
});

return builder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
Expand Down Expand Up @@ -39,14 +40,14 @@ protected Logger getLogger() {
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();

KStream<String, ProcessedMap<LineString>> inputStream = builder.stream(topicName, Consumed.with(Serdes.String(), JsonSerdes.ProcessedMapGeoJson()));
KStream<String, ProcessedMap<LineString>> inputStream
= builder.stream(topicName, Consumed.with(Serdes.String(), JsonSerdes.ProcessedMapGeoJson()));

inputStream.foreach((key, value) -> {
controller.broadcastMap(value);
});

return builder.build();

}

}

0 comments on commit b821ae5

Please sign in to comment.