From bd4218c7b57d90d4a45bc7bb1c1eb5d8aac6125c Mon Sep 17 00:00:00 2001
From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com>
Date: Tue, 17 Dec 2024 20:42:09 -0700
Subject: [PATCH] Create BaseSeekToEndListener
---
.../ode/api/ConflictMonitorApiProperties.java | 4 +-
.../ode/api/kafka/BaseSeekToEndListener.java | 33 ++++++++++
.../api/kafka/BsmSocketForwardListener.java | 10 +--
.../ode/api/kafka/KafkaConsumerConfig.java | 65 +++++++++++++------
.../api/kafka/MapSocketForwardListener.java | 10 +--
.../api/kafka/SpatSocketForwardListener.java | 23 +------
.../src/main/resources/logback.xml | 1 +
7 files changed, 94 insertions(+), 52 deletions(-)
create mode 100644 jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/BaseSeekToEndListener.java
diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java
index 4dad10b44..415497a80 100644
--- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java
+++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java
@@ -62,8 +62,8 @@ public class ConflictMonitorApiProperties {
private static final Logger logger = LoggerFactory.getLogger(ConflictMonitorApiProperties.class);
private boolean confluentCloudEnabled = false;
- private String confluentKey = null;
- private String confluentSecret = null;
+ @Getter private String confluentKey = null;
+ @Getter private String confluentSecret = null;
private String version;
public static final int OUTPUT_SCHEMA_VERSION = 6;
diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/BaseSeekToEndListener.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/BaseSeekToEndListener.java
new file mode 100644
index 000000000..69a8d8bf0
--- /dev/null
+++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/BaseSeekToEndListener.java
@@ -0,0 +1,33 @@
+package us.dot.its.jpo.ode.api.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.listener.AbstractConsumerSeekAware;
+import us.dot.its.jpo.ode.api.controllers.StompController;
+
+import java.util.Map;
+
+/**
+ * Base class for Kafka listeners that seek to the last offset before consuming when starting up.
+ * See
+ * Spring Kafka: Seeking to a specific offset
+ */
+@Slf4j
+public class BaseSeekToEndListener extends AbstractConsumerSeekAware {
+
+ protected final StompController stompController;
+
+ public BaseSeekToEndListener(StompController stompController) {
+ this.stompController = stompController;
+ }
+
+ @Override
+ public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) {
+ log.info("Seek to end for TopicPartitions {}", assignments.keySet());
+ callback.seekToEnd(assignments.keySet());
+ super.onPartitionsAssigned(assignments, callback);
+ }
+
+}
diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/BsmSocketForwardListener.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/BsmSocketForwardListener.java
index 9f6d0f60c..f690d4f64 100644
--- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/BsmSocketForwardListener.java
+++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/BsmSocketForwardListener.java
@@ -1,22 +1,21 @@
package us.dot.its.jpo.ode.api.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey;
import us.dot.its.jpo.ode.api.controllers.StompController;
import us.dot.its.jpo.ode.model.OdeBsmData;
@Component
-public class BsmSocketForwardListener extends AbstractConsumerSeekAware {
-
- final StompController stompController;
+@Slf4j
+public class BsmSocketForwardListener extends BaseSeekToEndListener {
@Autowired
public BsmSocketForwardListener(StompController stompController) {
- this.stompController = stompController;
+ super(stompController);
}
@KafkaListener(
@@ -28,6 +27,7 @@ public BsmSocketForwardListener(StompController stompController) {
autoStartup = "false")
public void listen(ConsumerRecord record) {
stompController.broadcastBSM(record.key(), record.value());
+ log.trace("Received bsm with offset {}", record.offset());
}
diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaConsumerConfig.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaConsumerConfig.java
index ca176ca59..d7aa11c82 100644
--- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaConsumerConfig.java
+++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaConsumerConfig.java
@@ -1,7 +1,10 @@
package us.dot.its.jpo.ode.api.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -12,9 +15,9 @@
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
-import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;
import us.dot.its.jpo.geojsonconverter.serialization.deserializers.JsonDeserializer;
import us.dot.its.jpo.geojsonconverter.serialization.deserializers.ProcessedMapDeserializer;
+import us.dot.its.jpo.ode.api.ConflictMonitorApiProperties;
import us.dot.its.jpo.ode.model.OdeBsmData;
import java.util.HashMap;
@@ -23,11 +26,15 @@
// Ref. https://github.com/eugenp/tutorials/blob/master/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/KafkaConsumerConfig.java
@EnableKafka
@Configuration
+@Slf4j
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
+ @Autowired
+ ConflictMonitorApiProperties properties;
+
@Bean
public ConcurrentKafkaListenerContainerFactory spatListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
@@ -54,37 +61,55 @@ public ConcurrentKafkaListenerContainerFactory
@Bean
public DefaultKafkaConsumerFactory spatConsumerFactory() {
- Map props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new StringDeserializer());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(ProcessedSpat.class));
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- return new DefaultKafkaConsumerFactory<>(props,
- new StringDeserializer(),
- new JsonDeserializer<>(ProcessedSpat.class));
+ return consumerFactory(new StringDeserializer(), new JsonDeserializer<>(ProcessedSpat.class));
}
@Bean
public DefaultKafkaConsumerFactory> mapConsumerFactory() {
- Map props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new StringDeserializer());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new ProcessedMapDeserializer<>(LineString.class));
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- return new DefaultKafkaConsumerFactory<>(props,
+ return consumerFactory(
new StringDeserializer(),
new ProcessedMapDeserializer<>(LineString.class));
}
@Bean
public DefaultKafkaConsumerFactory bsmConsumerFactory() {
+ return consumerFactory(
+ new JsonDeserializer<>(BsmIntersectionIdKey.class),
+ new JsonDeserializer<>(OdeBsmData.class));
+ }
+
+ private DefaultKafkaConsumerFactory consumerFactory(
+ Deserializer keyDeserializer, Deserializer valueDeserializer) {
+ Map props = commonProps();
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
+ return new DefaultKafkaConsumerFactory(props,
+ keyDeserializer,
+ valueDeserializer);
+ }
+
+ private Map commonProps() {
Map props = new HashMap<>();
+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(BsmIntersectionIdKey.class));
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(OdeBsmData.class));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- return new DefaultKafkaConsumerFactory<>(props,
- new JsonDeserializer<>(BsmIntersectionIdKey.class),
- new JsonDeserializer<>(OdeBsmData.class));
+ props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
+
+ if (properties.getConfluentCloudStatus()) {
+ props.put("ssl.endpoint.identification.algorithm", "https");
+ props.put("security.protocol", "SASL_SSL");
+ props.put("sasl.mechanism", "PLAIN");
+
+ if (properties.getConfluentKey() != null && properties.getConfluentSecret() != null) {
+ String auth = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
+ "username=\"" + properties.getConfluentKey() + "\" " +
+ "password=\"" + properties.getConfluentSecret() + "\";";
+ props.put("sasl.jaas.config", auth);
+ } else {
+ log.error(
+ "Environment variables CONFLUENT_KEY and CONFLUENT_SECRET are not set. Set these in the .env file to use Confluent Cloud");
+ }
+ }
+ return props;
}
}
diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/MapSocketForwardListener.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/MapSocketForwardListener.java
index bb4498268..09af7dd71 100644
--- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/MapSocketForwardListener.java
+++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/MapSocketForwardListener.java
@@ -1,22 +1,21 @@
package us.dot.its.jpo.ode.api.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.ode.api.controllers.StompController;
@Component
-public class MapSocketForwardListener extends AbstractConsumerSeekAware {
-
- final StompController stompController;
+@Slf4j
+public class MapSocketForwardListener extends BaseSeekToEndListener {
@Autowired
public MapSocketForwardListener(StompController stompController) {
- this.stompController = stompController;
+ super(stompController);
}
@KafkaListener(id = ListenerIds.MAP,
@@ -27,5 +26,6 @@ public MapSocketForwardListener(StompController stompController) {
autoStartup = "false")
public void listen(ConsumerRecord> record) {
stompController.broadcastMap(record.value());
+ log.trace("Received map with offset {}", record.offset());
}
}
diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/SpatSocketForwardListener.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/SpatSocketForwardListener.java
index 1ea78ad58..1a4f707f7 100644
--- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/SpatSocketForwardListener.java
+++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/SpatSocketForwardListener.java
@@ -2,37 +2,20 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.ode.api.controllers.StompController;
-import java.util.Map;
-/**
- * Kafka listener that can seek to the latest offset.
- * See
- * Spring Kafka: Seeking to a specific offset
- */
@Component
@Slf4j
-public class SpatSocketForwardListener extends AbstractConsumerSeekAware {
-
- final StompController stompController;
+public class SpatSocketForwardListener extends BaseSeekToEndListener {
@Autowired
public SpatSocketForwardListener(StompController stompController) {
- this.stompController = stompController;
- }
-
- @Override
- public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) {
- log.info("Seek to end for TopicPartitions {}", assignments.keySet());
- callback.seekToEnd(assignments.keySet());
- super.onPartitionsAssigned(assignments, callback);
+ super(stompController);
}
@KafkaListener(id = ListenerIds.SPAT,
@@ -43,7 +26,7 @@ public void onPartitionsAssigned(Map assignments, Consumer
autoStartup = "false")
public void listen(ConsumerRecord record) {
stompController.broadcastSpat(record.value());
- log.debug("Received spat with offset {}", record.offset());
+ log.trace("Received spat with offset {}", record.offset());
}
}
diff --git a/jpo-conflictvisualizer-api/src/main/resources/logback.xml b/jpo-conflictvisualizer-api/src/main/resources/logback.xml
index d265443a6..008136d9b 100644
--- a/jpo-conflictvisualizer-api/src/main/resources/logback.xml
+++ b/jpo-conflictvisualizer-api/src/main/resources/logback.xml
@@ -10,6 +10,7 @@
+