From 4a4d4cb545a525d631117fc0fe2e5e9bdc14f4be Mon Sep 17 00:00:00 2001 From: Jeremy Grelle Date: Mon, 4 Mar 2024 17:13:58 -0500 Subject: [PATCH] Allow reactive consumer methods with error strategy Processing of @KafkaListener methods is enhanced to allow reactive consumer methods annotated with @Blocking to work the same as non-reactive consumer methods in conjunction with retry error strategies. Resolves #967 --- .../kafka/processor/ConsumerState.java | 6 +- .../errors/KafkaErrorStrategySpec.groovy | 58 +++++++++++++++++++ .../docs/guide/kafkaListener/kafkaErrors.adoc | 2 + 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java index 46233a3ff..60614f156 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/ConsumerState.java @@ -267,8 +267,7 @@ protected void handleResultFlux( boolean isBlocking ) { final Flux recordMetadataProducer = publisher - .flatMap(value -> sendToDestination(value, consumerRecord, consumerRecords)) - .onErrorResume(error -> handleSendToError(error, consumerRecords, consumerRecord)); + .flatMap(value -> sendToDestination(value, consumerRecord, consumerRecords)); if (isBlocking) { List listRecords = recordMetadataProducer.collectList().block(); @@ -298,7 +297,8 @@ private Publisher sendToDestination(Object value, ConsumerRecord value.getClass() ); } - return Flux.create(emitter -> sendToDestination(emitter, kafkaProducer, key, value, consumerRecord, consumerRecords)); + Flux result = Flux.create(emitter -> sendToDestination(emitter, kafkaProducer, key, value, consumerRecord, consumerRecords)); + return result.onErrorResume(error -> handleSendToError(error, consumerRecords, consumerRecord)); } private void sendToDestination(FluxSink emitter, Producer kafkaProducer, Object key, Object value, ConsumerRecord consumerRecord, ConsumerRecords consumerRecords) { diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorStrategySpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorStrategySpec.groovy index dfe2627a5..8cd0fdab6 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorStrategySpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorStrategySpec.groovy @@ -11,10 +11,12 @@ import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler import io.micronaut.configuration.kafka.retry.ConditionalRetryBehaviourHandler import io.micronaut.context.annotation.Property import io.micronaut.context.annotation.Requires +import io.micronaut.core.annotation.Blocking import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.TopicPartition import org.slf4j.Logger import org.slf4j.LoggerFactory +import reactor.core.publisher.Mono import spock.lang.Unroll import java.util.concurrent.atomic.AtomicInteger @@ -269,6 +271,28 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec { myConsumer.errors[3].message == "Three #6" } + void "test reactive consumer when error strategy is 'retry on error' and 'handle all exceptions' is true"() { + when: "A reactive consumer signals an error" + RetryReactiveHandleAllErrorClient myClient = context.getBean(RetryReactiveHandleAllErrorClient) + myClient.sendMessage("One") + myClient.sendMessage("Two") + myClient.sendMessage("Three") + + RetryReactiveHandleAllErrorCausingConsumer myConsumer = context.getBean(RetryReactiveHandleAllErrorCausingConsumer) + + then: "Messages are consumed eventually" + conditions.eventually { + myConsumer.received == ["One", "Two", "Two", "Three", "Three", "Three"] + myConsumer.count.get() == 6 + } + and: "messages were retried and all exceptions were handled" + myConsumer.errors.size() == 4 + myConsumer.errors[0].message == "Two #2" + myConsumer.errors[1].message == "Three #4" + myConsumer.errors[2].message == "Three #5" + myConsumer.errors[3].message == "Three #6" + } + @Unroll void "test when error strategy is 'retry on error' with #type retry count"(String type) { when: "A consumer throws an exception" @@ -576,6 +600,33 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec { } } + @Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec') + @KafkaListener( + offsetReset = EARLIEST, + offsetStrategy = SYNC, + errorStrategy = @ErrorStrategy(value = RETRY_ON_ERROR, retryCount = 2, handleAllExceptions = true) + ) + static class RetryReactiveHandleAllErrorCausingConsumer implements KafkaListenerExceptionHandler { + AtomicInteger count = new AtomicInteger(0) + List received = [] + List errors = [] + + @Blocking + @Topic("errors-retry-reactive-handle-all-exceptions") + Mono handleMessage(String message) { + received << message + if (count.getAndIncrement() == 1 || message == 'Three') { + return Mono.error(new RuntimeException("${message} #${count}")) + } + return Mono.just(Boolean.TRUE) + } + + @Override + void handle(KafkaListenerException exception) { + errors << exception + } + } + @Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec') @KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC, errorStrategy = @ErrorStrategy(value = NONE)) static class PollNextErrorCausingConsumer implements KafkaListenerExceptionHandler { @@ -852,6 +903,13 @@ class KafkaErrorStrategySpec extends AbstractEmbeddedServerSpec { void sendMessage(String message) } + @Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec') + @KafkaClient + static interface RetryReactiveHandleAllErrorClient { + @Topic("errors-retry-reactive-handle-all-exceptions") + void sendMessage(String message) + } + @Requires(property = 'spec.name', value = 'KafkaErrorStrategySpec') @KafkaClient static interface PollNextErrorClient { diff --git a/src/main/docs/guide/kafkaListener/kafkaErrors.adoc b/src/main/docs/guide/kafkaListener/kafkaErrors.adoc index a3fc7a448..9df018cf5 100644 --- a/src/main/docs/guide/kafkaListener/kafkaErrors.adoc +++ b/src/main/docs/guide/kafkaListener/kafkaErrors.adoc @@ -23,6 +23,8 @@ You can choose one of the error strategies: NOTE: The error strategies apply only for non-batch messages processing. +NOTE: When using retry error strategies in combination with reactive consumer methods, it is necessary to add the `@Blocking` annotation to the reactive consumer method. + You can also make the number of retries configurable by using `retryCountValue`: .Dynamically Configuring Retries