Skip to content

Commit

Permalink
Improve logging and tests (#506)
Browse files Browse the repository at this point in the history
  • Loading branch information
dstepanov authored Mar 21, 2022
1 parent 747c981 commit 5561ca7
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private Object returnSynchronous(MethodInvocationContext<Object, Object> context
for (Object o : batchValue) {
ProducerRecord record = buildProducerRecord(context, producerState, o);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record);
LOG.trace("@KafkaClient method [" + logMethod(context) + "] Sending producer record: " + record);
}

Object result;
Expand All @@ -213,7 +213,9 @@ private Object returnSynchronous(MethodInvocationContext<Object, Object> context
} else {
ProducerRecord record = buildProducerRecord(context, producerState, value);

LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [{}] Sending producer record: {}", logMethod(context), record);
}

Object result;
if (producerState.maxBlock != null) {
Expand Down Expand Up @@ -318,7 +320,7 @@ public void onComplete() {

ProducerRecord record = buildProducerRecord(context, producerState, value);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record);
LOG.trace("@KafkaClient method [" + logMethod(context) + "] Sending producer record: " + record);
}

boolean transactional = producerState.transactional;
Expand Down Expand Up @@ -422,8 +424,9 @@ private Flux<Object> buildSendFluxForReactiveValue(MethodInvocationContext<Objec
Argument<?> finalReturnType = returnType;
Flux<Object> sendFlowable = valueFlowable.flatMap(o -> {
ProducerRecord record = buildProducerRecord(context, producerState, o);

LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [{}] Sending producer record: {}", logMethod(context), record);
}

return producerSend(kafkaProducer, record)
.map(metadata -> convertResult(metadata, finalReturnType, o, producerState.bodyArgument))
Expand Down Expand Up @@ -727,6 +730,10 @@ private ProducerState getProducer(MethodInvocationContext<?, ?> context) {
});
}

private static String logMethod(ExecutableMethod<?, ?> method) {
return method.getDeclaringType().getSimpleName() + "#" + method.getName();
}

private static final class ProducerState {

private final Producer<?, ?> kafkaProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,15 @@ private void debugDeserializationConfiguration(final ExecutableMethod<?, ?> meth
}
final Optional keyDeserializer = consumerConfiguration.getKeyDeserializer();
if (consumerConfiguration.getKeyDeserializer().isPresent()) {
LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), method);
LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), logMethod(method));
} else {
LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), method);
LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), logMethod(method));
}
final Optional valueDeserializer = consumerConfiguration.getValueDeserializer();
if (valueDeserializer.isPresent()) {
LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), method);
LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), logMethod(method));
} else {
LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), method);
LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), logMethod(method));
}
}

Expand Down Expand Up @@ -504,7 +504,9 @@ private boolean processConsumerRecords(final ConsumerState consumerState,

for (final ConsumerRecord<?, ?> consumerRecord : consumerRecords) {

LOG.trace("Kafka consumer [{}] received record: {}", method, consumerRecord);
if (LOG.isTraceEnabled()) {
LOG.trace("Kafka consumer [{}] received record: {}", logMethod(method), consumerRecord);
}

if (trackPartitions) {
final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
Expand Down Expand Up @@ -658,7 +660,10 @@ private static void setupConsumerSubscription(final ExecutableMethod<?, ?> metho
} else {
kafkaConsumer.subscribe(topics);
}
LOG.info("Kafka listener [{}] subscribed to topics: {}", method, topics);

if (LOG.isInfoEnabled()) {
LOG.info("Kafka listener [{}] subscribed to topics: {}", logMethod(method), topics);
}
}

if (hasPatterns) {
Expand All @@ -674,7 +679,9 @@ private static void setupConsumerSubscription(final ExecutableMethod<?, ?> metho
} else {
kafkaConsumer.subscribe(compiledPattern);
}
LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", method, pattern);
if (LOG.isInfoEnabled()) {
LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", logMethod(method), pattern);
}
}
}
}
Expand Down Expand Up @@ -846,7 +853,7 @@ record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.h
List<RecordMetadata> listRecords = recordMetadataProducer.collectList().block();
LOG.trace("Method [{}] produced record metadata: {}", method, listRecords);
} else {
recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", method, recordMetadata));
recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", logMethod(method), recordMetadata));
}
}

Expand Down Expand Up @@ -957,6 +964,10 @@ private static OffsetCommitCallback resolveCommitCallback(final Object consumerB
};
}

private static String logMethod(ExecutableMethod<?, ?> method) {
return method.getDeclaringType().getSimpleName() + "#" + method.getName();
}

/**
* The internal state of the consumer.
*
Expand Down Expand Up @@ -1087,8 +1098,10 @@ synchronized void resumeTopicPartitions() {
final List<TopicPartition> toResume = paused.stream()
.filter(topicPartition -> _pauseRequests == null || !_pauseRequests.contains(topicPartition))
.collect(Collectors.toList());
LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", clientId, toResume);
kafkaConsumer.resume(toResume);
if (!toResume.isEmpty()) {
LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", clientId, toResume);
kafkaConsumer.resume(toResume);
}
if (_pausedTopicPartitions != null) {
toResume.forEach(_pausedTopicPartitions::remove);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,22 @@
package io.micronaut.configuration.kafka.annotation

import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.configuration.kafka.ConsumerRegistry
import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.runtime.server.EmbeddedServer
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.TopicPartition
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import java.util.concurrent.ConcurrentSkipListSet

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS

class ConsumerRegistrySpec extends Specification {

@Shared
@AutoCleanup
KafkaContainer kafkaContainer = new KafkaContainer()

@Shared
@AutoCleanup
EmbeddedServer embeddedServer

@Shared
@AutoCleanup
ApplicationContext context

void setupSpec() {
kafkaContainer.start()
embeddedServer = ApplicationContext.run(EmbeddedServer,
['kafka.bootstrap.servers' : kafkaContainer.bootstrapServers,
'micrometer.metrics.enabled' : true,
'endpoints.metrics.sensitive': false,
(EMBEDDED_TOPICS) : ['fruits']])
context = embeddedServer.applicationContext

class ConsumerRegistrySpec extends AbstractKafkaContainerSpec {

@Override
protected Map<String, Object> getConfiguration() {
return super.getConfiguration() + ['micrometer.metrics.enabled' : true, 'endpoints.metrics.sensitive': false]
}

void 'test consumer registry'() {
Expand Down Expand Up @@ -87,12 +65,14 @@ class ConsumerRegistrySpec extends Specification {
topicPartitions[0].partition() == 0
}

@Requires(property = 'spec.name', value = 'ConsumerRegistrySpec')
@KafkaClient
static interface BicycleClient {
@Topic('bicycles')
void send(@KafkaKey String make, @MessageBody String model)
}

@Requires(property = 'spec.name', value = 'ConsumerRegistrySpec')
@KafkaListener(clientId = 'bicycle-client', offsetReset = EARLIEST)
static class BicycleListener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,28 +113,32 @@ class KafkaErrorsSpec extends AbstractEmbeddedServerSpec {
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RESUME_AT_NEXT_RECORD))
static class TestListenerWithErrorStrategyResumeAtNextRecord extends AbstractTestListener {
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR))
static class TestListenerWithErrorStrategyRetryOnError extends AbstractTestListener {
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 10))
static class TestListenerWithErrorStrategyRetryOnError10Times extends AbstractTestListener {
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST)
static class TestListenerWithErrorStrategyNone extends AbstractTestListener {
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = OffsetStrategy.SYNC_PER_RECORD, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 10))
static class TestListenerSyncPerRecordWithErrorStrategyRetryOnError10Times extends AbstractTestListener {
}

@Slf4j
@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
static abstract class AbstractTestListener implements KafkaListenerExceptionHandler {

TreeSet<Integer> partitions = []
Expand Down
2 changes: 2 additions & 0 deletions kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;

@Requires(property = "spec.name", value = "DisabledSpec")
@Singleton
public class DisabledConsumer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class DisabledSpec extends Specification {

void "Starting app with kafka disabled works correctly"() {
given:
ApplicationContext ctx = ApplicationContext.run("kafka.enabled": "false")
ApplicationContext ctx = ApplicationContext.run("kafka.enabled": "false", "spec.name": getClass().getSimpleName())

when: "test that the service has been created correctly"
DisabledTestService service = ctx.getBean(DisabledTestService)
Expand Down

0 comments on commit 5561ca7

Please sign in to comment.