Skip to content

Commit

Permalink
Added configurable linger time to Conflict Visualizer
Browse files Browse the repository at this point in the history
  • Loading branch information
John-Wiens committed Oct 23, 2024
1 parent 9753d0a commit 979c3a2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class ConflictMonitorApiProperties {
private String securitySvcsSignatureEndpoint = "sign";



private int lingerMs = 0;


@Autowired
Expand Down Expand Up @@ -185,6 +185,15 @@ public void setKafkaBrokers(String kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
}

@Value("${kafka.linger_ms}")
public void setKafkaLingerMs(int lingerMs) {
this.lingerMs = lingerMs;
}

public int getKafkaLingerMs() {
return lingerMs;
}

public String getKafkaProducerType() {
return kafkaProducerType;
}
Expand Down Expand Up @@ -257,6 +266,8 @@ public void setKafkaTopicsDisabledSet(Set<String> kafkaTopicsDisabledSet) {
this.kafkaTopicsDisabledSet = kafkaTopicsDisabledSet;
}




@Bean
public ObjectMapper defaultMapper() {
Expand Down Expand Up @@ -383,7 +394,8 @@ public Properties createStreamProperties(String name) {
// streamProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);

streamProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
streamProps.put(ProducerConfig.LINGER_MS_CONFIG, 50);
streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getKafkaLingerMs());


if (confluentCloudEnabled) {
streamProps.put("ssl.endpoint.identification.algorithm", "https");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ schema.bsm: classpath:schemas/bsm.schema.json
schema.map: classpath:schemas/map.schema.json
schema.spat: classpath:schemas/spat.schema.json


# Amount of time to wait to try and increase batching
kafka.linger_ms: 50

kafka.topics:
autoCreateTopics: false
numPartitions: 1
Expand Down

0 comments on commit 979c3a2

Please sign in to comment.