Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix exception handling for reactive consumers #455

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.h
});

recordMetadataProducer = recordMetadataProducer.onErrorResume((Function<Throwable, Publisher<RecordMetadata>>) throwable -> {
handleException(consumerState, new KafkaListenerException(
handleException(consumerState.consumerBean, new KafkaListenerException(
"Error occurred processing record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(),
throwable,
consumerState.consumerBean,
Expand Down Expand Up @@ -823,7 +823,7 @@ record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.h
);

return producerSend(consumerState, kafkaProducer, record).doOnError(ex -> {
handleException(consumerState, new KafkaListenerException(
handleException(consumerState.consumerBean, new KafkaListenerException(
"Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + method + "]: " + throwable.getMessage(),
throwable,
consumerState.consumerBean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.micronaut.configuration.kafka.exceptions.KafkaListenerException
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler
import io.micronaut.context.annotation.Requires
import org.apache.kafka.common.TopicPartition
import reactor.core.publisher.Mono

import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -37,6 +38,19 @@ class KafkaErrorHandlingSpec extends AbstractEmbeddedServerSpec {
}
}

void "test custom exception handler in reactive consumer"() {
when:"A reactive consumer with custom exception handler throws a Mono error"
ErrorClient myClient = context.getBean(ErrorClient)
myClient.sendMessage("One")

ErrorCausingReactiveConsumer myConsumer = context.getBean(ErrorCausingReactiveConsumer)

then:"The bean's exception handler is used"
conditions.eventually {
myConsumer.exceptionHandled
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorHandlingSpec')
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC)
static class ErrorCausingConsumer implements KafkaListenerExceptionHandler {
Expand All @@ -62,6 +76,22 @@ class KafkaErrorHandlingSpec extends AbstractEmbeddedServerSpec {
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorHandlingSpec')
@KafkaListener(offsetReset = EARLIEST, offsetStrategy = SYNC)
static class ErrorCausingReactiveConsumer implements KafkaListenerExceptionHandler {
boolean exceptionHandled = false

@Topic("errors")
Mono<Void> handleMessage(String message) {
return Mono.error(new RuntimeException())
}

@Override
void handle(KafkaListenerException exception) {
this.exceptionHandled = true
}
}

@Requires(property = 'spec.name', value = 'KafkaErrorHandlingSpec')
@KafkaClient
static interface ErrorClient {
Expand Down