diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index b0419066d..a22c86bf5 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -102,6 +102,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.time.Instant; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -328,8 +329,22 @@ public void close() { consumerState.kafkaConsumer.wakeup(); } for (ConsumerState consumerState : consumers.values()) { - while (consumerState.closedState == ConsumerCloseState.POLLING) { - LOG.trace("consumer not closed yet"); + if (consumerState.closedState == ConsumerCloseState.POLLING) { + final Instant start = Instant.now(); + Instant silentTime = start; + do { + if (LOG.isTraceEnabled()) { + final Instant now = Instant.now(); + if (now.isAfter(silentTime)) { + LOG.trace("Consumer {} is not closed yet (waiting {})", consumerState.clientId, Duration.between(start, now)); + // Inhibit TRACE messages for a while to avoid polluting the logs + silentTime = now.plusSeconds(5); + } + } + } while (consumerState.closedState == ConsumerCloseState.POLLING); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Consumer {} is closed", consumerState.clientId); } } consumers.clear(); diff --git a/kafka/src/test/resources/logback.xml b/kafka/src/test/resources/logback.xml index cb4676ec9..31ce8cd88 100644 --- a/kafka/src/test/resources/logback.xml +++ b/kafka/src/test/resources/logback.xml @@ -14,5 +14,5 @@ - +