diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java index e1f1eb4f0..0293f08e9 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/processor/KafkaConsumerProcessor.java @@ -793,7 +793,7 @@ record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.h }); recordMetadataProducer = recordMetadataProducer.onErrorResume((Function>) throwable -> { - handleException(consumerState, new KafkaListenerException( + handleException(consumerState.consumerBean, new KafkaListenerException( "Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), throwable, consumerState.consumerBean, @@ -823,7 +823,7 @@ record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.h ); return producerSend(consumerState, kafkaProducer, record).doOnError(ex -> { - handleException(consumerState, new KafkaListenerException( + handleException(consumerState.consumerBean, new KafkaListenerException( "Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(), throwable, consumerState.consumerBean, diff --git a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorHandlingSpec.groovy b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorHandlingSpec.groovy index 94a67fb95..42540008c 100644 --- a/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorHandlingSpec.groovy +++ b/kafka/src/test/groovy/io/micronaut/configuration/kafka/errors/KafkaErrorHandlingSpec.groovy @@ -8,6 +8,7 @@ import io.micronaut.configuration.kafka.exceptions.KafkaListenerException import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler import io.micronaut.context.annotation.Requires import org.apache.kafka.common.TopicPartition +import reactor.core.publisher.Mono import java.util.concurrent.atomic.AtomicInteger @@ -37,6 +38,19 @@ class KafkaErrorHandlingSpec extends AbstractEmbeddedServerSpec { } } + void "test custom exception handler in reactive consumer"() { + when:"A reactive consumer with custom exception handler throws a Mono error" + ErrorClient myClient = context.getBean(ErrorClient) + myClient.sendMessage("One") + + ErrorCausingReactiveConsumer myConsumer = context.getBean(ErrorCausingReactiveConsumer) + + then:"The bean's exception handler is used" + conditions.eventually { + myConsumer.exceptionHandled + } + } + @Requires(property = 'spec.name', value = 'KafkaErrorHandlingSpec') @KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC) static class ErrorCausingConsumer implements KafkaListenerExceptionHandler { @@ -62,6 +76,22 @@ class KafkaErrorHandlingSpec extends AbstractEmbeddedServerSpec { } } + @Requires(property = 'spec.name', value = 'KafkaErrorHandlingSpec') + @KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC) + static class ErrorCausingReactiveConsumer implements KafkaListenerExceptionHandler { + boolean exceptionHandled = false + + @Topic("errors") + Mono handleMessage(String message) { + return Mono.error(new RuntimeException()) + } + + @Override + void handle(KafkaListenerException exception) { + this.exceptionHandled = true + } + } + @Requires(property = 'spec.name', value = 'KafkaErrorHandlingSpec') @KafkaClient static interface ErrorClient {