From 022eadb9e78ef79488e15e06681a6abc200104b6 Mon Sep 17 00:00:00 2001 From: Denis Stepanov Date: Mon, 21 Mar 2022 19:44:47 +0800 Subject: [PATCH] Improve logging and tests --- .../KafkaClientIntroductionAdvice.java | 17 +++++--- .../processor/KafkaConsumerProcessor.java | 33 ++++++++++----- .../annotation/ConsumerRegistrySpec.groovy | 40 +++++-------------- .../kafka/errors/KafkaErrorsSpec.groovy | 6 ++- .../io/micronaut/test/DisabledConsumer.java | 2 + .../io/micronaut/test/DisabledSpec.groovy | 2 +- 6 files changed, 53 insertions(+), 47 deletions(-) diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java index eba4ab2bf..fece5f8f4 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/intercept/KafkaClientIntroductionAdvice.java @@ -192,7 +192,7 @@ private Object returnSynchronous(MethodInvocationContext context for (Object o : batchValue) { ProducerRecord record = buildProducerRecord(context, producerState, o); if (LOG.isTraceEnabled()) { - LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record); + LOG.trace("@KafkaClient method [" + logMethod(context) + "] Sending producer record: " + record); } Object result; @@ -213,7 +213,9 @@ private Object returnSynchronous(MethodInvocationContext context } else { ProducerRecord record = buildProducerRecord(context, producerState, value); - LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record); + if (LOG.isTraceEnabled()) { + LOG.trace("@KafkaClient method [{}] Sending producer record: {}", logMethod(context), record); + } Object result; if (producerState.maxBlock != null) { @@ -318,7 +320,7 @@ public void onComplete() { ProducerRecord record = buildProducerRecord(context, producerState, value); if (LOG.isTraceEnabled()) { - LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record); + LOG.trace("@KafkaClient method [" + logMethod(context) + "] Sending producer record: " + record); } boolean transactional = producerState.transactional; @@ -422,8 +424,9 @@ private Flux buildSendFluxForReactiveValue(MethodInvocationContext finalReturnType = returnType; Flux sendFlowable = valueFlowable.flatMap(o -> { ProducerRecord record = buildProducerRecord(context, producerState, o); - - LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record); + if (LOG.isTraceEnabled()) { + LOG.trace("@KafkaClient method [{}] Sending producer record: {}", logMethod(context), record); + } return producerSend(kafkaProducer, record) .map(metadata -> convertResult(metadata, finalReturnType, o, producerState.bodyArgument)) @@ -727,6 +730,10 @@ private ProducerState getProducer(MethodInvocationContext context) { }); } + private static String logMethod(ExecutableMethod method) { + return method.getDeclaringType().getSimpleName() + "#" + method.getName(); + } + private static final class ProducerState { private final Producer kafkaProducer; diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index 584a488a8..28f7d2cd0 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -363,15 +363,15 @@ private void debugDeserializationConfiguration(final ExecutableMethod meth } final Optional keyDeserializer = consumerConfiguration.getKeyDeserializer(); if (consumerConfiguration.getKeyDeserializer().isPresent()) { - LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), method); + LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), logMethod(method)); } else { - LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), method); + LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), logMethod(method)); } final Optional valueDeserializer = consumerConfiguration.getValueDeserializer(); if (valueDeserializer.isPresent()) { - LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), method); + LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), logMethod(method)); } else { - LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), method); + LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), logMethod(method)); } } @@ -504,7 +504,9 @@ private boolean processConsumerRecords(final ConsumerState consumerState, for (final ConsumerRecord consumerRecord : consumerRecords) { - LOG.trace("Kafka consumer [{}] received record: {}", method, consumerRecord); + if (LOG.isTraceEnabled()) { + LOG.trace("Kafka consumer [{}] received record: {}", logMethod(method), consumerRecord); + } if (trackPartitions) { final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition()); @@ -658,7 +660,10 @@ private static void setupConsumerSubscription(final ExecutableMethod metho } else { kafkaConsumer.subscribe(topics); } - LOG.info("Kafka listener [{}] subscribed to topics: {}", method, topics); + + if (LOG.isInfoEnabled()) { + LOG.info("Kafka listener [{}] subscribed to topics: {}", logMethod(method), topics); + } } if (hasPatterns) { @@ -674,7 +679,9 @@ private static void setupConsumerSubscription(final ExecutableMethod metho } else { kafkaConsumer.subscribe(compiledPattern); } - LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", method, pattern); + if (LOG.isInfoEnabled()) { + LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", logMethod(method), pattern); + } } } } @@ -846,7 +853,7 @@ record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.h List listRecords = recordMetadataProducer.collectList().block(); LOG.trace("Method [{}] produced record metadata: {}", method, listRecords); } else { - recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", method, recordMetadata)); + recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", logMethod(method), recordMetadata)); } } @@ -957,6 +964,10 @@ private static OffsetCommitCallback resolveCommitCallback(final Object consumerB }; } + private static String logMethod(ExecutableMethod method) { + return method.getDeclaringType().getSimpleName() + "#" + method.getName(); + } + /** * The internal state of the consumer. * @@ -1087,8 +1098,10 @@ synchronized void resumeTopicPartitions() { final List toResume = paused.stream() .filter(topicPartition -> _pauseRequests == null || !_pauseRequests.contains(topicPartition)) .collect(Collectors.toList()); - LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", clientId, toResume); - kafkaConsumer.resume(toResume); + if (!toResume.isEmpty()) { + LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", clientId, toResume); + kafkaConsumer.resume(toResume); + } if (_pausedTopicPartitions != null) { toResume.forEach(_pausedTopicPartitions::remove); } diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/ConsumerRegistrySpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/ConsumerRegistrySpec.groovy index ba49ff691..b50a658a7 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/ConsumerRegistrySpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/annotation/ConsumerRegistrySpec.groovy @@ -1,44 +1,22 @@ package io.micronaut.configuration.kafka.annotation +import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec import io.micronaut.configuration.kafka.ConsumerRegistry -import io.micronaut.context.ApplicationContext +import io.micronaut.context.annotation.Requires import io.micronaut.messaging.annotation.MessageBody -import io.micronaut.runtime.server.EmbeddedServer import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.common.TopicPartition -import org.testcontainers.containers.KafkaContainer -import spock.lang.AutoCleanup -import spock.lang.Shared -import spock.lang.Specification import spock.util.concurrent.PollingConditions import java.util.concurrent.ConcurrentSkipListSet import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST -import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS - -class ConsumerRegistrySpec extends Specification { - - @Shared - @AutoCleanup - KafkaContainer kafkaContainer = new KafkaContainer() - - @Shared - @AutoCleanup - EmbeddedServer embeddedServer - - @Shared - @AutoCleanup - ApplicationContext context - - void setupSpec() { - kafkaContainer.start() - embeddedServer = ApplicationContext.run(EmbeddedServer, - ['kafka.bootstrap.servers' : kafkaContainer.bootstrapServers, - 'micrometer.metrics.enabled' : true, - 'endpoints.metrics.sensitive': false, - (EMBEDDED_TOPICS) : ['fruits']]) - context = embeddedServer.applicationContext + +class ConsumerRegistrySpec extends AbstractKafkaContainerSpec { + + @Override + protected Map getConfiguration() { + return super.getConfiguration() + ['micrometer.metrics.enabled' : true, 'endpoints.metrics.sensitive': false] } void 'test consumer registry'() { @@ -87,12 +65,14 @@ class ConsumerRegistrySpec extends Specification { topicPartitions[0].partition() == 0 } + @Requires(property = 'spec.name', value = 'ConsumerRegistrySpec') @KafkaClient static interface BicycleClient { @Topic('bicycles') void send(@KafkaKey String make, @MessageBody String model) } + @Requires(property = 'spec.name', value = 'ConsumerRegistrySpec') @KafkaListener(clientId = 'bicycle-client', offsetReset = EARLIEST) static class BicycleListener { diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorsSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorsSpec.groovy index 0fce4fc37..529a99455 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorsSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorsSpec.groovy @@ -113,28 +113,32 @@ class KafkaErrorsSpec extends AbstractEmbeddedServerSpec { } } + @Requires(property = 'spec.name', value = 'KafkaErrorsSpec') @KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RESUME_AT_NEXT_RECORD)) static class TestListenerWithErrorStrategyResumeAtNextRecord extends AbstractTestListener { } + @Requires(property = 'spec.name', value = 'KafkaErrorsSpec') @KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR)) static class TestListenerWithErrorStrategyRetryOnError extends AbstractTestListener { } + @Requires(property = 'spec.name', value = 'KafkaErrorsSpec') @KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 10)) static class TestListenerWithErrorStrategyRetryOnError10Times extends AbstractTestListener { } + @Requires(property = 'spec.name', value = 'KafkaErrorsSpec') @KafkaListener(offsetReset = EARLIEST) static class TestListenerWithErrorStrategyNone extends AbstractTestListener { } + @Requires(property = 'spec.name', value = 'KafkaErrorsSpec') @KafkaListener(offsetReset = EARLIEST, offsetStrategy = OffsetStrategy.SYNC_PER_RECORD, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 10)) static class TestListenerSyncPerRecordWithErrorStrategyRetryOnError10Times extends AbstractTestListener { } @Slf4j - @Requires(property = 'spec.name', value = 'KafkaErrorsSpec') static abstract class AbstractTestListener implements KafkaListenerExceptionHandler { TreeSet partitions = [] diff --git a/kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java b/kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java index 2311b0783..3e2fd0b54 100644 --- a/kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java +++ b/kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java @@ -3,8 +3,10 @@ import io.micronaut.configuration.kafka.annotation.KafkaKey; import io.micronaut.configuration.kafka.annotation.KafkaListener; import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; import jakarta.inject.Singleton; +@Requires(property = "spec.name", value = "DisabledSpec") @Singleton public class DisabledConsumer { diff --git a/kafka/src/test/groovy/io/micronaut/test/DisabledSpec.groovy b/kafka/src/test/groovy/io/micronaut/test/DisabledSpec.groovy index 8fccd6904..ac86b6b90 100644 --- a/kafka/src/test/groovy/io/micronaut/test/DisabledSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/test/DisabledSpec.groovy @@ -14,7 +14,7 @@ class DisabledSpec extends Specification { void "Starting app with kafka disabled works correctly"() { given: - ApplicationContext ctx = ApplicationContext.run("kafka.enabled": "false") + ApplicationContext ctx = ApplicationContext.run("kafka.enabled": "false", "spec.name": getClass().getSimpleName()) when: "test that the service has been created correctly" DisabledTestService service = ctx.getBean(DisabledTestService)