diff --git a/gravitee-am-reporter/gravitee-am-reporter-kafka/src/main/java/io/gravitee/am/reporter/kafka/audit/KafkaAuditReporter.java b/gravitee-am-reporter/gravitee-am-reporter-kafka/src/main/java/io/gravitee/am/reporter/kafka/audit/KafkaAuditReporter.java index 1416f3ba9b3..d931000a34e 100644 --- a/gravitee-am-reporter/gravitee-am-reporter-kafka/src/main/java/io/gravitee/am/reporter/kafka/audit/KafkaAuditReporter.java +++ b/gravitee-am-reporter/gravitee-am-reporter-kafka/src/main/java/io/gravitee/am/reporter/kafka/audit/KafkaAuditReporter.java @@ -32,10 +32,12 @@ import io.gravitee.reporter.api.Reporter; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; +import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.kafka.client.producer.KafkaProducer; import io.vertx.kafka.client.producer.KafkaProducerRecord; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.util.StringUtils; @@ -115,10 +117,18 @@ protected void doStart() { protected void doStop() throws Exception { super.doStop(); if (this.producer != null) { - this.producer.close(); + // Need to do unwrap producer because of bug in kafka vertx library: https://github.com/vert-x3/vertx-kafka-client/issues/271 + Producer producer = this.producer.unwrap(); + Context ctx = vertx.getOrCreateContext(); + ctx.executeBlocking(() -> { + producer.close(); + return null; + }); } + log.info("Kafka producer closed"); } + @Override public boolean canSearch() { return false;