Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging and tests #506

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private Object returnSynchronous(MethodInvocationContext<Object, Object> context
for (Object o : batchValue) {
ProducerRecord record = buildProducerRecord(context, producerState, o);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record);
LOG.trace("@KafkaClient method [" + logMethod(context) + "] Sending producer record: " + record);
}

Object result;
Expand All @@ -213,7 +213,9 @@ private Object returnSynchronous(MethodInvocationContext<Object, Object> context
} else {
ProducerRecord record = buildProducerRecord(context, producerState, value);

LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [{}] Sending producer record: {}", logMethod(context), record);
}

Object result;
if (producerState.maxBlock != null) {
Expand Down Expand Up @@ -318,7 +320,7 @@ public void onComplete() {

ProducerRecord record = buildProducerRecord(context, producerState, value);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [" + context + "] Sending producer record: " + record);
LOG.trace("@KafkaClient method [" + logMethod(context) + "] Sending producer record: " + record);
}

boolean transactional = producerState.transactional;
Expand Down Expand Up @@ -422,8 +424,9 @@ private Flux<Object> buildSendFluxForReactiveValue(MethodInvocationContext<Objec
Argument<?> finalReturnType = returnType;
Flux<Object> sendFlowable = valueFlowable.flatMap(o -> {
ProducerRecord record = buildProducerRecord(context, producerState, o);

LOG.trace("@KafkaClient method [{}] Sending producer record: {}", context, record);
if (LOG.isTraceEnabled()) {
LOG.trace("@KafkaClient method [{}] Sending producer record: {}", logMethod(context), record);
}

return producerSend(kafkaProducer, record)
.map(metadata -> convertResult(metadata, finalReturnType, o, producerState.bodyArgument))
Expand Down Expand Up @@ -727,6 +730,10 @@ private ProducerState getProducer(MethodInvocationContext<?, ?> context) {
});
}

private static String logMethod(ExecutableMethod<?, ?> method) {
return method.getDeclaringType().getSimpleName() + "#" + method.getName();
}

private static final class ProducerState {

private final Producer<?, ?> kafkaProducer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,15 @@ private void debugDeserializationConfiguration(final ExecutableMethod<?, ?> meth
}
final Optional keyDeserializer = consumerConfiguration.getKeyDeserializer();
if (consumerConfiguration.getKeyDeserializer().isPresent()) {
LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), method);
LOG.debug("Using key deserializer [{}] for Kafka listener: {}", keyDeserializer.get(), logMethod(method));
} else {
LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), method);
LOG.debug("Using key deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), logMethod(method));
}
final Optional valueDeserializer = consumerConfiguration.getValueDeserializer();
if (valueDeserializer.isPresent()) {
LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), method);
LOG.debug("Using value deserializer [{}] for Kafka listener: {}", valueDeserializer.get(), logMethod(method));
} else {
LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), method);
LOG.debug("Using value deserializer [{}] for Kafka listener: {}", properties.getProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), logMethod(method));
}
}

Expand Down Expand Up @@ -504,7 +504,9 @@ private boolean processConsumerRecords(final ConsumerState consumerState,

for (final ConsumerRecord<?, ?> consumerRecord : consumerRecords) {

LOG.trace("Kafka consumer [{}] received record: {}", method, consumerRecord);
if (LOG.isTraceEnabled()) {
LOG.trace("Kafka consumer [{}] received record: {}", logMethod(method), consumerRecord);
}

if (trackPartitions) {
final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
Expand Down Expand Up @@ -658,7 +660,10 @@ private static void setupConsumerSubscription(final ExecutableMethod<?, ?> metho
} else {
kafkaConsumer.subscribe(topics);
}
LOG.info("Kafka listener [{}] subscribed to topics: {}", method, topics);

if (LOG.isInfoEnabled()) {
LOG.info("Kafka listener [{}] subscribed to topics: {}", logMethod(method), topics);
}
}

if (hasPatterns) {
Expand All @@ -674,7 +679,9 @@ private static void setupConsumerSubscription(final ExecutableMethod<?, ?> metho
} else {
kafkaConsumer.subscribe(compiledPattern);
}
LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", method, pattern);
if (LOG.isInfoEnabled()) {
LOG.info("Kafka listener [{}] subscribed to topics pattern: {}", logMethod(method), pattern);
}
}
}
}
Expand Down Expand Up @@ -846,7 +853,7 @@ record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.h
List<RecordMetadata> listRecords = recordMetadataProducer.collectList().block();
LOG.trace("Method [{}] produced record metadata: {}", method, listRecords);
} else {
recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", method, recordMetadata));
recordMetadataProducer.subscribe(recordMetadata -> LOG.trace("Method [{}] produced record metadata: {}", logMethod(method), recordMetadata));
}
}

Expand Down Expand Up @@ -957,6 +964,10 @@ private static OffsetCommitCallback resolveCommitCallback(final Object consumerB
};
}

private static String logMethod(ExecutableMethod<?, ?> method) {
return method.getDeclaringType().getSimpleName() + "#" + method.getName();
}

/**
* The internal state of the consumer.
*
Expand Down Expand Up @@ -1087,8 +1098,10 @@ synchronized void resumeTopicPartitions() {
final List<TopicPartition> toResume = paused.stream()
.filter(topicPartition -> _pauseRequests == null || !_pauseRequests.contains(topicPartition))
.collect(Collectors.toList());
LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", clientId, toResume);
kafkaConsumer.resume(toResume);
if (!toResume.isEmpty()) {
LOG.debug("Resuming Kafka consumption for Consumer [{}] from topic partition: {}", clientId, toResume);
kafkaConsumer.resume(toResume);
}
if (_pausedTopicPartitions != null) {
toResume.forEach(_pausedTopicPartitions::remove);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,44 +1,22 @@
package io.micronaut.configuration.kafka.annotation

import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.configuration.kafka.ConsumerRegistry
import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.messaging.annotation.MessageBody
import io.micronaut.runtime.server.EmbeddedServer
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.TopicPartition
import org.testcontainers.containers.KafkaContainer
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import java.util.concurrent.ConcurrentSkipListSet

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST
import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS

class ConsumerRegistrySpec extends Specification {

@Shared
@AutoCleanup
KafkaContainer kafkaContainer = new KafkaContainer()

@Shared
@AutoCleanup
EmbeddedServer embeddedServer

@Shared
@AutoCleanup
ApplicationContext context

void setupSpec() {
kafkaContainer.start()
embeddedServer = ApplicationContext.run(EmbeddedServer,
['kafka.bootstrap.servers' : kafkaContainer.bootstrapServers,
'micrometer.metrics.enabled' : true,
'endpoints.metrics.sensitive': false,
(EMBEDDED_TOPICS) : ['fruits']])
context = embeddedServer.applicationContext

class ConsumerRegistrySpec extends AbstractKafkaContainerSpec {

@Override
protected Map<String, Object> getConfiguration() {
return super.getConfiguration() + ['micrometer.metrics.enabled' : true, 'endpoints.metrics.sensitive': false]
}

void 'test consumer registry'() {
Expand Down Expand Up @@ -87,12 +65,14 @@ class ConsumerRegistrySpec extends Specification {
topicPartitions[0].partition() == 0
}

@Requires(property = 'spec.name', value = 'ConsumerRegistrySpec')
@KafkaClient
static interface BicycleClient {
@Topic('bicycles')
void send(@KafkaKey String make, @MessageBody String model)
}

@Requires(property = 'spec.name', value = 'ConsumerRegistrySpec')
@KafkaListener(clientId = 'bicycle-client', offsetReset = EARLIEST)
static class BicycleListener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,28 +113,32 @@ class KafkaErrorsSpec extends AbstractEmbeddedServerSpec {
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RESUME_AT_NEXT_RECORD))
static class TestListenerWithErrorStrategyResumeAtNextRecord extends AbstractTestListener {
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR))
static class TestListenerWithErrorStrategyRetryOnError extends AbstractTestListener {
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 10))
static class TestListenerWithErrorStrategyRetryOnError10Times extends AbstractTestListener {
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST)
static class TestListenerWithErrorStrategyNone extends AbstractTestListener {
}

@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = OffsetStrategy.SYNC_PER_RECORD, errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 10))
static class TestListenerSyncPerRecordWithErrorStrategyRetryOnError10Times extends AbstractTestListener {
}

@Slf4j
@Requires(property = 'spec.name', value = 'KafkaErrorsSpec')
static abstract class AbstractTestListener implements KafkaListenerExceptionHandler {

TreeSet<Integer> partitions = []
Expand Down
2 changes: 2 additions & 0 deletions kafka/src/test/groovy/io/micronaut/test/DisabledConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import io.micronaut.configuration.kafka.annotation.KafkaKey;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;

@Requires(property = "spec.name", value = "DisabledSpec")
@Singleton
public class DisabledConsumer {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class DisabledSpec extends Specification {

void "Starting app with kafka disabled works correctly"() {
given:
ApplicationContext ctx = ApplicationContext.run("kafka.enabled": "false")
ApplicationContext ctx = ApplicationContext.run("kafka.enabled": "false", "spec.name": getClass().getSimpleName())

when: "test that the service has been created correctly"
DisabledTestService service = ctx.getBean(DisabledTestService)
Expand Down