Skip to content

Commit

Permalink
Clean up imports
Browse files Browse the repository at this point in the history
  • Loading branch information
iyourshaw committed Dec 13, 2024
1 parent a25dc0d commit c1b7a98
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import java.util.ArrayList;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;


import com.google.common.collect.ImmutableList;
import org.apache.kafka.streams.KafkaStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -17,7 +15,6 @@

import org.springframework.stereotype.Controller;

import us.dot.its.jpo.conflictmonitor.monitor.algorithms.StreamsTopology;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.ConnectionOfTravelAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.LaneDirectionOfTravelAssessment;
import us.dot.its.jpo.conflictmonitor.monitor.models.assessments.StopLinePassageAssessment;
Expand Down Expand Up @@ -359,7 +356,7 @@ public APIServiceController(
topologyListBuilder.add(notificationTopology);

DataLoaderTopology<BsmEvent> bsmEventsTopology = new DataLoaderTopology<BsmEvent>(
"topic.CMBsmEvents",
"topic.CmBsmEvents",
JsonSerdes.BsmEvent(),
bsmEventRepo,
props.createStreamProperties("bsmEvents"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.AbstractSubProtocolEvent;
import org.springframework.web.socket.messaging.SessionConnectEvent;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import us.dot.its.jpo.ode.api.APIServiceController;
import us.dot.its.jpo.ode.api.topologies.RestartableTopology;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;


/**
* Component that keeps track of connected STOMP WebSocket clients. Starts Kafka Streams
Expand All @@ -37,9 +39,11 @@ public StompSessionController(APIServiceController apiServiceController) {
public void handleSessionConnectEvent(SessionConnectEvent event) {
String sessionId = getSessionIdFromHeader(event);
log.info("Session Connect Event, session ID: {}, event: {}", sessionId, event);

if (sessionId == null) {
throw new RuntimeException("Null session ID from connect event. This should not happen.");
}

// Update sessions set and start kafka streams in an atomic operation for thread safety
synchronized (sessions) {
final int beforeNumSessions = sessions.size();
Expand All @@ -54,10 +58,12 @@ public void handleSessionConnectEvent(SessionConnectEvent event) {
@EventListener(SessionDisconnectEvent.class)
public void handleSessionDisconnectEvent(SessionDisconnectEvent event) {
log.info("Session Disconnect Event, session ID: {}, event: {}", event.getSessionId(), event);

if (event.getSessionId() == null) {
throw new RuntimeException("Null session ID from disconnect event. This should not happen.");
}
// Update sessions set and start kafka streams in an atomic operation for thread safety

// Update sessions set and stop kafka streams in an atomic operation for thread safety
synchronized (sessions) {
final int beforeNumSessions = sessions.size();
sessions.remove(event.getSessionId());
Expand Down Expand Up @@ -96,11 +102,13 @@ private void stopKafkaStreams() {
log.info("Stopped all Kafka Streams");
}

private static final String SIMP_SESSION_ID = "simpSessionId";

private String getSessionIdFromHeader(AbstractSubProtocolEvent event) {
var message = event.getMessage();
MessageHeaders headers = message.getHeaders();
if (headers.containsKey("simpSessionId")) {
return headers.get("simpSessionId", String.class);
if (headers.containsKey(SIMP_SESSION_ID)) {
return headers.get(SIMP_SESSION_ID, String.class);
}
return null;
}
Expand Down

0 comments on commit c1b7a98

Please sign in to comment.