Skip to content

Commit

Permalink
Create base class for topologies
Browse files Browse the repository at this point in the history
  • Loading branch information
iyourshaw committed Dec 13, 2024
1 parent b8f258f commit 9795a50
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public APIServiceController(
);
topologyListBuilder.add(bsmSocketForwardTopology);


logger.info("load = {}", props.getLoad());
if (props.getLoad()) {

ArrayList<String> topics = new ArrayList<>();
Expand Down Expand Up @@ -377,13 +377,13 @@ public APIServiceController(
// OdeRawEncodedSpatJson
// OdeSpatJson

var topicDescMap = admin.describeTopics(topics.toArray(new String[topics.size()]));
System.out.println("Found Topics: ");
for (var entry : topicDescMap.entrySet()) {
String topicName = entry.getKey();
var desc = entry.getValue();
System.out.println("TopicName: " + topicName +" "+ desc);
}
// var topicDescMap = admin.describeTopics(topics.toArray(new String[topics.size()]));
// System.out.println("Found Topics: ");
// for (var entry : topicDescMap.entrySet()) {
// String topicName = entry.getKey();
// var desc = entry.getValue();
// System.out.println("TopicName: " + topicName +" "+ desc);
// }


}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import us.dot.its.jpo.conflictmonitor.AlwaysContinueProductionExceptionHandler;
import us.dot.its.jpo.geojsonconverter.DateJsonMapper;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -52,7 +53,7 @@
import org.springframework.boot.info.BuildProperties;

@Configuration
@ConfigurationProperties()
@ConfigurationProperties("conflict.monitor.api")
public class ConflictMonitorApiProperties {

private static int maximumResponseSize;
Expand All @@ -70,7 +71,7 @@ public class ConflictMonitorApiProperties {
private static final String DEFAULT_KAFKA_PORT = "9092";
private String kafkaProducerType = AppContext.DEFAULT_KAFKA_PRODUCER_TYPE;
private Boolean verboseJson = false;
private Boolean load = false;
@Getter @Setter private Boolean load;
private String cmServerURL = "";
private String emailBroker = "";
private String emailFromAddress = "noreply@cimms.com";
Expand Down Expand Up @@ -131,14 +132,14 @@ public void setCors(String cors) {
this.cors = cors;
}

public boolean getLoad() {
return load;
}

@Value("${load}")
public void setLoad(boolean load) {
this.load = load;
}
// public boolean getLoad() {
// return load;
// }
//
// @Value("${load}")
// public void setLoad(boolean load) {
// this.load = load;
// }

public String getCmServerURL() {
return cmServerURL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

/**
* Component that keeps track of connected STOMP WebSocket clients. Starts Kafka Streams
* topologies when any clients connect, and stops them when there are none.
* topologies when any clients connect, and stops them when there are 0 clients.
*/
@Component
@Slf4j
public class StompSessionController {

final List<RestartableTopology> topologies;

private final Set<String> sessions = Collections.synchronizedSet(new HashSet<String>(10));
private final Set<String> sessions = Collections.synchronizedSet(new HashSet<>(10));


@Autowired
Expand All @@ -37,6 +37,10 @@ public StompSessionController(APIServiceController apiServiceController) {
public void handleSessionConnectEvent(SessionConnectEvent event) {
String sessionId = getSessionIdFromHeader(event);
log.info("Session Connect Event, session ID: {}, event: {}", sessionId, event);
if (sessionId == null) {
throw new RuntimeException("Null session ID from connect event. This should not happen.");
}
// Update sessions set and start kafka streams in an atomic operation for thread safety
synchronized (sessions) {
final int beforeNumSessions = sessions.size();
sessions.add(sessionId);
Expand All @@ -50,6 +54,10 @@ public void handleSessionConnectEvent(SessionConnectEvent event) {
@EventListener(SessionDisconnectEvent.class)
public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
log.info("Session Disconnect Event, session ID: {}, event: {}", event.getSessionId(), event);
if (event.getSessionId() == null) {
throw new RuntimeException("Null session ID from disconnect event. This should not happen.");
}
// Update sessions set and start kafka streams in an atomic operation for thread safety
synchronized (sessions) {
final int beforeNumSessions = sessions.size();
sessions.remove(event.getSessionId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package us.dot.its.jpo.ode.api.topologies;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.dot.its.jpo.ode.api.controllers.StompController;

import java.time.Duration;
import java.util.Properties;

/**
* Default implementation of common functionality for topologies
*/
public abstract class BaseTopology implements RestartableTopology{

protected abstract Logger getLogger();
protected abstract Topology buildTopology();

protected final Topology topology;
protected KafkaStreams streams;

@Getter
protected final String topicName;

protected final Properties streamsProperties;

public BaseTopology(String topicName, Properties streamsProperties) {
this.topicName = topicName;
this.streamsProperties = streamsProperties;
topology = buildTopology();
}

@Override
public void start() {
if (streams != null && streams.state().isRunningOrRebalancing()) {
throw new IllegalStateException("Start called while streams is already running.");
}

streams = new KafkaStreams(topology, streamsProperties);
if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler);
if (stateListener != null) streams.setStateListener(stateListener);
streams.start();
}

@Override
public void stop() {
getLogger().info("Stopping topology for {}", topicName);
if (streams != null) {
// Trigger streams to shut down without blocking
streams.close(Duration.ZERO);
}
}

KafkaStreams.StateListener stateListener;
public void registerStateListener(KafkaStreams.StateListener stateListener) {
this.stateListener = stateListener;
}

StreamsUncaughtExceptionHandler exceptionHandler;
public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package us.dot.its.jpo.ode.api.topologies;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
Expand All @@ -20,74 +21,33 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Properties;

public class BsmSocketForwardTopology implements RestartableTopology {
public class BsmSocketForwardTopology extends BaseTopology {

private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class);
protected final StompController controller;
protected final ObjectMapper objectMapper;

Topology topology;
KafkaStreams streams;

@Getter
String topicName;

Properties streamsProperties;
StompController controller;
ObjectMapper objectMapper;
private static final Logger logger = LoggerFactory.getLogger(BsmSocketForwardTopology.class);

public BsmSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties){
this.topicName = topicName;
this.streamsProperties = streamsProperties;
super(topicName, streamsProperties);
this.controller = controller;
this.objectMapper = new ObjectMapper();
topology = buildTopology();
}

@Override
public void start() {
if (streams != null && streams.state().isRunningOrRebalancing()) {
throw new IllegalStateException("Start called while streams is already running.");
}

streams = new KafkaStreams(topology, streamsProperties);
if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler);
if (stateListener != null) streams.setStateListener(stateListener);
streams.start();
}

public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();

KStream<BsmIntersectionIdKey, OdeBsmData> inputStream = builder.stream(topicName, Consumed.with(JsonSerdes.BsmIntersectionIdKey(), JsonSerdes.OdeBsm()));

inputStream.foreach((key, value) -> {
controller.broadcastBSM(key, value);
});
inputStream.foreach(controller::broadcastBSM);

return builder.build();

}

@Override
public void stop() {
logger.info("Stopping SPaT Socket Broadcast Topology.");
if (streams != null) {
streams.close();
streams.cleanUp();
streams = null;
}
logger.info("Stopped SPaT Socket Broadcast Topology.");
}

StateListener stateListener;
public void registerStateListener(StateListener stateListener) {
this.stateListener = stateListener;
}

StreamsUncaughtExceptionHandler exceptionHandler;
public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
protected Logger getLogger() {
return logger;
}



}
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package us.dot.its.jpo.ode.api.topologies;

import lombok.Getter;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.KafkaStreams.StateListener;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;

Expand All @@ -19,39 +15,25 @@

import java.util.Properties;

public class DataLoaderTopology<T> implements RestartableTopology {
public class DataLoaderTopology<T> extends BaseTopology {

private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class);

Topology topology;
KafkaStreams streams;

@Getter
String topicName;

Serde<T> consumerSerde;
DataLoader<T> dataLoader;
Properties streamsProperties;

public DataLoaderTopology(String topicName, Serde<T> consumerSerde, DataLoader<T> dataLoader, Properties streamsProperties){
this.topicName = topicName;
super(topicName, streamsProperties);
this.consumerSerde = consumerSerde;
this.dataLoader = dataLoader;
this.streamsProperties = streamsProperties;
topology = buildTopology();
}

@Override
public void start() {
if (streams != null && streams.state().isRunningOrRebalancing()) {
throw new IllegalStateException("Start called while streams is already running.");
}
streams = new KafkaStreams(topology, streamsProperties);
if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler);
if (stateListener != null) streams.setStateListener(stateListener);
streams.start();
protected Logger getLogger() {
return logger;
}

@Override
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();

Expand All @@ -65,27 +47,4 @@ public Topology buildTopology() {

}

@Override
public void stop() {
logger.info("Stopping Data Loading Topology.");
if (streams != null) {
streams.close();
streams.cleanUp();
streams = null;
}
logger.info("Stopped Data Loading Topology.");
}

StateListener stateListener;
public void registerStateListener(StateListener stateListener) {
this.stateListener = stateListener;
}

StreamsUncaughtExceptionHandler exceptionHandler;
public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}



}
Loading

0 comments on commit 9795a50

Please sign in to comment.