From 979c3a25da18c0938880ec39bd19cf504b0d08b0 Mon Sep 17 00:00:00 2001
From: john-wiens <jwiens3141@gmail.com>
Date: Wed, 23 Oct 2024 15:31:12 -0600
Subject: [PATCH] Added configurable linger time to Conflict Visualizer

---
 .../ode/api/ConflictMonitorApiProperties.java    | 16 ++++++++++++++--
 .../src/main/resources/application.yaml          |  4 ++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java
index 12efc1890..1e0777020 100644
--- a/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java
+++ b/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/ConflictMonitorApiProperties.java
@@ -98,7 +98,7 @@ public class ConflictMonitorApiProperties {
     private String securitySvcsSignatureEndpoint = "sign";
 
 
-
+    private int lingerMs = 0;
     
 
     @Autowired
@@ -185,6 +185,15 @@ public void setKafkaBrokers(String kafkaBrokers) {
         this.kafkaBrokers = kafkaBrokers;
     }
 
+    @Value("${kafka.linger_ms}")
+    public void setKafkaLingerMs(int lingerMs) {
+        this.lingerMs = lingerMs;
+    }
+
+    public int getKafkaLingerMs() {
+        return lingerMs;
+    }
+
     public String getKafkaProducerType() {
         return kafkaProducerType;
     }
@@ -257,6 +266,8 @@ public void setKafkaTopicsDisabledSet(Set<String> kafkaTopicsDisabledSet) {
         this.kafkaTopicsDisabledSet = kafkaTopicsDisabledSet;
     }
 
+    
+
 
     @Bean
     public ObjectMapper defaultMapper() {
@@ -383,7 +394,8 @@ public Properties createStreamProperties(String name) {
         // streamProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
 
         streamProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
-        streamProps.put(ProducerConfig.LINGER_MS_CONFIG, 50);
+        streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getKafkaLingerMs());
+
 
         if (confluentCloudEnabled) {
             streamProps.put("ssl.endpoint.identification.algorithm", "https");
diff --git a/jpo-conflictvisualizer-api/src/main/resources/application.yaml b/jpo-conflictvisualizer-api/src/main/resources/application.yaml
index 51041b7fc..0da0937d6 100644
--- a/jpo-conflictvisualizer-api/src/main/resources/application.yaml
+++ b/jpo-conflictvisualizer-api/src/main/resources/application.yaml
@@ -21,6 +21,10 @@ schema.bsm: classpath:schemas/bsm.schema.json
 schema.map: classpath:schemas/map.schema.json
 schema.spat: classpath:schemas/spat.schema.json
 
+
+# Amount of time to wait to try and increase batching
+kafka.linger_ms: 50
+
 kafka.topics:
   autoCreateTopics: false
   numPartitions: 1