diff --git a/jpo-geojsonconverter/src/main/java/us/dot/its/jpo/geojsonconverter/GeoJsonConverterProperties.java b/jpo-geojsonconverter/src/main/java/us/dot/its/jpo/geojsonconverter/GeoJsonConverterProperties.java index 311b421..cf2e26e 100644 --- a/jpo-geojsonconverter/src/main/java/us/dot/its/jpo/geojsonconverter/GeoJsonConverterProperties.java +++ b/jpo-geojsonconverter/src/main/java/us/dot/its/jpo/geojsonconverter/GeoJsonConverterProperties.java @@ -64,6 +64,8 @@ public class GeoJsonConverterProperties implements EnvironmentAware { //BSM private String kafkaTopicOdeBsmJson = "topic.OdeBsmJson"; private String kafkaTopicProcessedBsm = "topic.ProcessedBsm"; + + private int lingerMs = 0; private GeometryOutputMode geometryOutputMode = GeometryOutputMode.GEOJSON_ONLY; @@ -119,6 +121,10 @@ public Properties createStreamProperties(String name) { // Configure the state store location streamProps.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/ode/kafka-streams"); + streamProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd"); + + streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getKafkaLingerMs()); + if (confluentCloudEnabled) { streamProps.put("ssl.endpoint.identification.algorithm", "https"); streamProps.put("security.protocol", "SASL_SSL"); @@ -231,4 +237,14 @@ public void setGeometryOutputMode(String gomString) { public GeometryOutputMode getGeometryOutputMode() { return geometryOutputMode; } + + + @Value("${kafka.linger_ms}") + public void setKafkaLingerMs(int lingerMs) { + this.lingerMs = lingerMs; + } + + public int getKafkaLingerMs() { + return lingerMs; + } } \ No newline at end of file diff --git a/jpo-geojsonconverter/src/main/resources/application.yaml b/jpo-geojsonconverter/src/main/resources/application.yaml index b5c9b9c..5ca64b4 100644 --- a/jpo-geojsonconverter/src/main/resources/application.yaml +++ b/jpo-geojsonconverter/src/main/resources/application.yaml @@ -37,4 +37,6 @@ kafka.topics: retentionMs: 300000 # Jackson Properties -spring.jackson.serialization.write-dates-as-timestamps: false \ No newline at end of file +spring.jackson.serialization.write-dates-as-timestamps: false + +kafka.linger_ms: 50 \ No newline at end of file