diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/APIServiceController.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/APIServiceController.java index a0e08ca9a..4a4af5cdb 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/APIServiceController.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/APIServiceController.java @@ -151,7 +151,7 @@ public APIServiceController( ); topologyListBuilder.add(bsmSocketForwardTopology); - + logger.info("load = {}", props.getLoad()); if (props.getLoad()) { ArrayList topics = new ArrayList<>(); @@ -377,13 +377,13 @@ public APIServiceController( // OdeRawEncodedSpatJson // OdeSpatJson - var topicDescMap = admin.describeTopics(topics.toArray(new String[topics.size()])); - System.out.println("Found Topics: "); - for (var entry : topicDescMap.entrySet()) { - String topicName = entry.getKey(); - var desc = entry.getValue(); - System.out.println("TopicName: " + topicName +" "+ desc); - } +// var topicDescMap = admin.describeTopics(topics.toArray(new String[topics.size()])); +// System.out.println("Found Topics: "); +// for (var entry : topicDescMap.entrySet()) { +// String topicName = entry.getKey(); +// var desc = entry.getValue(); +// System.out.println("TopicName: " + topicName +" "+ desc); +// } } diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java index 1e0777020..9d0027736 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java @@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; import us.dot.its.jpo.conflictmonitor.AlwaysContinueProductionExceptionHandler; import us.dot.its.jpo.geojsonconverter.DateJsonMapper; import org.springframework.context.annotation.Bean; @@ -52,7 +53,7 @@ import org.springframework.boot.info.BuildProperties; @Configuration -@ConfigurationProperties() +@ConfigurationProperties("conflict.monitor.api") public class ConflictMonitorApiProperties { private static int maximumResponseSize; @@ -70,7 +71,7 @@ public class ConflictMonitorApiProperties { private static final String DEFAULT_KAFKA_PORT = "9092"; private String kafkaProducerType = AppContext.DEFAULT_KAFKA_PRODUCER_TYPE; private Boolean verboseJson = false; - private Boolean load = false; + @Getter @Setter private Boolean load; private String cmServerURL = ""; private String emailBroker = ""; private String emailFromAddress = "noreply@cimms.com"; @@ -131,14 +132,14 @@ public void setCors(String cors) { this.cors = cors; } - public boolean getLoad() { - return load; - } - - @Value("${load}") - public void setLoad(boolean load) { - this.load = load; - } +// public boolean getLoad() { +// return load; +// } +// +// @Value("${load}") +// public void setLoad(boolean load) { +// this.load = load; +// } public String getCmServerURL() { return cmServerURL; diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/controllers/StompSessionController.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/controllers/StompSessionController.java index dd0a38479..5352aac77 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/controllers/StompSessionController.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/controllers/StompSessionController.java @@ -17,7 +17,7 @@ /** * Component that keeps track of connected STOMP WebSocket clients. Starts Kafka Streams - * topologies when any clients connect, and stops them when there are none. + * topologies when any clients connect, and stops them when there are 0 clients. */ @Component @Slf4j @@ -25,7 +25,7 @@ public class StompSessionController { final List topologies; - private final Set sessions = Collections.synchronizedSet(new HashSet(10)); + private final Set sessions = Collections.synchronizedSet(new HashSet<>(10)); @Autowired @@ -37,6 +37,10 @@ public StompSessionController(APIServiceController apiServiceController) { public void handleSessionConnectEvent(SessionConnectEvent event) { String sessionId = getSessionIdFromHeader(event); log.info("Session Connect Event, session ID: {}, event: {}", sessionId, event); + if (sessionId == null) { + throw new RuntimeException("Null session ID from connect event. This should not happen."); + } + // Update sessions set and start kafka streams in an atomic operation for thread safety synchronized (sessions) { final int beforeNumSessions = sessions.size(); sessions.add(sessionId); @@ -50,6 +54,10 @@ public void handleSessionConnectEvent(SessionConnectEvent event) { @EventListener(SessionDisconnectEvent.class) public void handleSessionDisconnectEvent(SessionDisconnectEvent event) { log.info("Session Disconnect Event, session ID: {}, event: {}", event.getSessionId(), event); + if (event.getSessionId() == null) { + throw new RuntimeException("Null session ID from disconnect event. This should not happen."); + } + // Update sessions set and start kafka streams in an atomic operation for thread safety synchronized (sessions) { final int beforeNumSessions = sessions.size(); sessions.remove(event.getSessionId()); diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BaseTopology.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BaseTopology.java new file mode 100644 index 000000000..f26acc201 --- /dev/null +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BaseTopology.java @@ -0,0 +1,68 @@ +package us.dot.its.jpo.ode.api.topologies; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Getter; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import us.dot.its.jpo.ode.api.controllers.StompController; + +import java.time.Duration; +import java.util.Properties; + +/** + * Default implementation of common functionality for topologies + */ +public abstract class BaseTopology implements RestartableTopology{ + + protected abstract Logger getLogger(); + protected abstract Topology buildTopology(); + + protected final Topology topology; + protected KafkaStreams streams; + + @Getter + protected final String topicName; + + protected final Properties streamsProperties; + + public BaseTopology(String topicName, Properties streamsProperties) { + this.topicName = topicName; + this.streamsProperties = streamsProperties; + topology = buildTopology(); + } + + @Override + public void start() { + if (streams != null && streams.state().isRunningOrRebalancing()) { + throw new IllegalStateException("Start called while streams is already running."); + } + + streams = new KafkaStreams(topology, streamsProperties); + if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler); + if (stateListener != null) streams.setStateListener(stateListener); + streams.start(); + } + + @Override + public void stop() { + getLogger().info("Stopping topology for {}", topicName); + if (streams != null) { + // Trigger streams to shut down without blocking + streams.close(Duration.ZERO); + } + } + + KafkaStreams.StateListener stateListener; + public void registerStateListener(KafkaStreams.StateListener stateListener) { + this.stateListener = stateListener; + } + + StreamsUncaughtExceptionHandler exceptionHandler; + public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + +} diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BsmSocketForwardTopology.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BsmSocketForwardTopology.java index 2a990c2b5..27de11331 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BsmSocketForwardTopology.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BsmSocketForwardTopology.java @@ -1,6 +1,7 @@ package us.dot.its.jpo.ode.api.topologies; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; @@ -20,74 +21,33 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Properties; -public class BsmSocketForwardTopology implements RestartableTopology { +public class BsmSocketForwardTopology extends BaseTopology { - private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class); + protected final StompController controller; + protected final ObjectMapper objectMapper; - Topology topology; - KafkaStreams streams; - - @Getter - String topicName; - - Properties streamsProperties; - StompController controller; - ObjectMapper objectMapper; + private static final Logger logger = LoggerFactory.getLogger(BsmSocketForwardTopology.class); public BsmSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties){ - this.topicName = topicName; - this.streamsProperties = streamsProperties; + super(topicName, streamsProperties); this.controller = controller; this.objectMapper = new ObjectMapper(); - topology = buildTopology(); } @Override - public void start() { - if (streams != null && streams.state().isRunningOrRebalancing()) { - throw new IllegalStateException("Start called while streams is already running."); - } - - streams = new KafkaStreams(topology, streamsProperties); - if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler); - if (stateListener != null) streams.setStateListener(stateListener); - streams.start(); - } - public Topology buildTopology() { StreamsBuilder builder = new StreamsBuilder(); KStream inputStream = builder.stream(topicName, Consumed.with(JsonSerdes.BsmIntersectionIdKey(), JsonSerdes.OdeBsm())); - inputStream.foreach((key, value) -> { - controller.broadcastBSM(key, value); - }); + inputStream.foreach(controller::broadcastBSM); return builder.build(); } @Override - public void stop() { - logger.info("Stopping SPaT Socket Broadcast Topology."); - if (streams != null) { - streams.close(); - streams.cleanUp(); - streams = null; - } - logger.info("Stopped SPaT Socket Broadcast Topology."); - } - - StateListener stateListener; - public void registerStateListener(StateListener stateListener) { - this.stateListener = stateListener; - } - - StreamsUncaughtExceptionHandler exceptionHandler; - public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; + protected Logger getLogger() { + return logger; } - - - } diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/DataLoaderTopology.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/DataLoaderTopology.java index 54c178a46..78a6a6f9b 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/DataLoaderTopology.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/DataLoaderTopology.java @@ -1,13 +1,9 @@ package us.dot.its.jpo.ode.api.topologies; -import lombok.Getter; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.KafkaStreams.StateListener; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; @@ -19,39 +15,25 @@ import java.util.Properties; -public class DataLoaderTopology implements RestartableTopology { +public class DataLoaderTopology extends BaseTopology { private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class); - Topology topology; - KafkaStreams streams; - - @Getter - String topicName; - Serde consumerSerde; DataLoader dataLoader; - Properties streamsProperties; public DataLoaderTopology(String topicName, Serde consumerSerde, DataLoader dataLoader, Properties streamsProperties){ - this.topicName = topicName; + super(topicName, streamsProperties); this.consumerSerde = consumerSerde; this.dataLoader = dataLoader; - this.streamsProperties = streamsProperties; - topology = buildTopology(); } @Override - public void start() { - if (streams != null && streams.state().isRunningOrRebalancing()) { - throw new IllegalStateException("Start called while streams is already running."); - } - streams = new KafkaStreams(topology, streamsProperties); - if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler); - if (stateListener != null) streams.setStateListener(stateListener); - streams.start(); + protected Logger getLogger() { + return logger; } + @Override public Topology buildTopology() { StreamsBuilder builder = new StreamsBuilder(); @@ -65,27 +47,4 @@ public Topology buildTopology() { } - @Override - public void stop() { - logger.info("Stopping Data Loading Topology."); - if (streams != null) { - streams.close(); - streams.cleanUp(); - streams = null; - } - logger.info("Stopped Data Loading Topology."); - } - - StateListener stateListener; - public void registerStateListener(StateListener stateListener) { - this.stateListener = stateListener; - } - - StreamsUncaughtExceptionHandler exceptionHandler; - public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } - - - } diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/EmailTopology.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/EmailTopology.java index 47eb84e7a..bd25e4717 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/EmailTopology.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/EmailTopology.java @@ -1,14 +1,10 @@ package us.dot.its.jpo.ode.api.topologies; -import lombok.Getter; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.KafkaStreams.StateListener; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; @@ -22,37 +18,25 @@ -public class EmailTopology implements RestartableTopology { +public class EmailTopology extends BaseTopology { - private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class); + private static final Logger logger = LoggerFactory.getLogger(EmailTopology.class); - Topology topology; - KafkaStreams streams; - @Getter - String topicName; Serde consumerSerde; DataLoader dataLoader; - Properties streamsProperties; + public EmailTopology(String topicName, Serde consumerSerde, DataLoader dataLoader, Properties streamsProperties){ - this.topicName = topicName; + super(topicName, streamsProperties); this.consumerSerde = consumerSerde; this.dataLoader = dataLoader; - this.streamsProperties = streamsProperties; - topology = buildTopology(); } @Override - public void start() { - if (streams != null && streams.state().isRunningOrRebalancing()) { - throw new IllegalStateException("Start called while streams is already running."); - } - streams = new KafkaStreams(topology, streamsProperties); - if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler); - if (stateListener != null) streams.setStateListener(stateListener); - streams.start(); + protected Logger getLogger() { + return logger; } public Topology buildTopology() { @@ -68,27 +52,5 @@ public Topology buildTopology() { } - @Override - public void stop() { - logger.info("Stopping Data Loading Topology."); - if (streams != null) { - streams.close(); - streams.cleanUp(); - streams = null; - } - logger.info("Stopped Data Loading Topology."); - } - - StateListener stateListener; - public void registerStateListener(StateListener stateListener) { - this.stateListener = stateListener; - } - - StreamsUncaughtExceptionHandler exceptionHandler; - public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } - - } diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/MapSocketForwardTopology.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/MapSocketForwardTopology.java index d5d90dbb3..ecc41a6f2 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/MapSocketForwardTopology.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/MapSocketForwardTopology.java @@ -20,35 +20,20 @@ import java.util.Properties; -public class MapSocketForwardTopology implements RestartableTopology { +public class MapSocketForwardTopology extends BaseTopology { - private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class); + private static final Logger logger = LoggerFactory.getLogger(MapSocketForwardTopology.class); - Topology topology; - KafkaStreams streams; - - @Getter - String topicName; - - Properties streamsProperties; StompController controller; public MapSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties){ - this.topicName = topicName; - this.streamsProperties = streamsProperties; + super(topicName, streamsProperties); this.controller = controller; - topology = buildTopology(); } @Override - public void start() { - if (streams != null && streams.state().isRunningOrRebalancing()) { - throw new IllegalStateException("Start called while streams is already running."); - } - streams = new KafkaStreams(topology, streamsProperties); - if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler); - if (stateListener != null) streams.setStateListener(stateListener); - streams.start(); + protected Logger getLogger() { + return logger; } public Topology buildTopology() { @@ -64,27 +49,4 @@ public Topology buildTopology() { } - @Override - public void stop() { - logger.info("Stopping SPaT Socket Broadcast Topology."); - if (streams != null) { - streams.close(); - streams.cleanUp(); - streams = null; - } - logger.info("Stopped SPaT Socket Broadcast Topology."); - } - - StateListener stateListener; - public void registerStateListener(StateListener stateListener) { - this.stateListener = stateListener; - } - - StreamsUncaughtExceptionHandler exceptionHandler; - public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } - - - } diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/RestartableTopology.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/RestartableTopology.java index 854320b9b..3cc281be1 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/RestartableTopology.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/RestartableTopology.java @@ -1,7 +1,7 @@ package us.dot.its.jpo.ode.api.topologies; /** - * Interface for a Kafka Streams topology that can be started and stopped + * Interface for a Kafka Streams topology that can be stopped and restarted */ public interface RestartableTopology { void start(); diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/SpatSocketForwardTopology.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/SpatSocketForwardTopology.java index 4de131df2..18f57cbb9 100644 --- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/SpatSocketForwardTopology.java +++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/SpatSocketForwardTopology.java @@ -17,35 +17,20 @@ import org.slf4j.LoggerFactory; import java.util.Properties; -public class SpatSocketForwardTopology implements RestartableTopology { +public class SpatSocketForwardTopology extends BaseTopology { - private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class); + private static final Logger logger = LoggerFactory.getLogger(SpatSocketForwardTopology.class); - Topology topology; - KafkaStreams streams; - - @Getter - String topicName; - - Properties streamsProperties; StompController controller; - public SpatSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties){ - this.topicName = topicName; - this.streamsProperties = streamsProperties; + public SpatSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties) { + super(topicName, streamsProperties); this.controller = controller; - topology = buildTopology(); } @Override - public void start() { - if (streams != null && streams.state().isRunningOrRebalancing()) { - throw new IllegalStateException("Start called while streams is already running."); - } - streams = new KafkaStreams(topology, streamsProperties); - if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler); - if (stateListener != null) streams.setStateListener(stateListener); - streams.start(); + protected Logger getLogger() { + return logger; } public Topology buildTopology() { @@ -61,27 +46,4 @@ public Topology buildTopology() { } - @Override - public void stop() { - logger.info("Stopping SPaT Socket Broadcast Topology."); - if (streams != null) { - streams.close(); - streams.cleanUp(); - streams = null; - } - logger.info("Stopped SPaT Socket Broadcast Topology."); - } - - StateListener stateListener; - public void registerStateListener(StateListener stateListener) { - this.stateListener = stateListener; - } - - StreamsUncaughtExceptionHandler exceptionHandler; - public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } - - - } diff --git a/jpo-conflictvisualizer-api/src/main/resources/application.properties b/jpo-conflictvisualizer-api/src/main/resources/application.properties index 970db4ded..6399d3bba 100644 --- a/jpo-conflictvisualizer-api/src/main/resources/application.properties +++ b/jpo-conflictvisualizer-api/src/main/resources/application.properties @@ -22,7 +22,7 @@ security.enabled=true # logging.level.org.keycloak=DEBUG -load=false +conflict.monitor.api.load=true server.compression.enabled=true server.compression.mime-types=application/json,application/xml,text/html,text/xml,text/plain,application/javascript,text/css diff --git a/jpo-conflictvisualizer-api/src/main/resources/application.yaml b/jpo-conflictvisualizer-api/src/main/resources/application.yaml index 0da0937d6..ad0671e6e 100644 --- a/jpo-conflictvisualizer-api/src/main/resources/application.yaml +++ b/jpo-conflictvisualizer-api/src/main/resources/application.yaml @@ -36,3 +36,4 @@ kafka.topics: - name: topic.ProcessedMap cleanupPolicy: delete retentionMs: 300000 +