Skip to content

Commit

Permalink
fix: close kafka producer connection without delay.
Browse files Browse the repository at this point in the history
  • Loading branch information
RafalPodlesSpyrosoft authored and podlesrafal committed Jul 9, 2024
1 parent a57f1c7 commit 9f434e5
Showing 1 changed file with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import io.gravitee.reporter.api.Reporter;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -115,10 +117,18 @@ protected void doStart() {
protected void doStop() throws Exception {
super.doStop();
if (this.producer != null) {
this.producer.close();
// Need to do unwrap producer because of bug in kafka vertx library: https://github.com/vert-x3/vertx-kafka-client/issues/271
Producer<String, AuditMessageValueDto> producer = this.producer.unwrap();
Context ctx = vertx.getOrCreateContext();
ctx.executeBlocking(() -> {
producer.close();
return null;
});
}
log.info("Kafka producer closed");
}


@Override
public boolean canSearch() {
return false;
Expand Down

0 comments on commit 9f434e5

Please sign in to comment.