diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/uncaught/MyStreamsUncaughtExceptionHandler.java b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/uncaught/MyStreamsUncaughtExceptionHandler.java index ff4b28dd9..b4eea8845 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/uncaught/MyStreamsUncaughtExceptionHandler.java +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/uncaught/MyStreamsUncaughtExceptionHandler.java @@ -21,7 +21,12 @@ public void onApplicationEvent(BeforeKafkaStreamStart event) { @Override public StreamThreadExceptionResponse handle(Throwable exception) { - return StreamThreadExceptionResponse.REPLACE_THREAD; + if (exception instanceof MyException) { + return StreamThreadExceptionResponse.REPLACE_THREAD; + } + return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION; } } -// end::imports[] +// end::clazz[] + +interface MyException { } diff --git a/src/main/docs/guide/kafkaStreams/kafkaStreamExceptions.adoc b/src/main/docs/guide/kafkaStreams/kafkaStreamExceptions.adoc index 1f124ac17..bb093c4cb 100644 --- a/src/main/docs/guide/kafkaStreams/kafkaStreamExceptions.adoc +++ b/src/main/docs/guide/kafkaStreams/kafkaStreamExceptions.adoc @@ -4,16 +4,7 @@ There are three possible https://kafka.apache.org/28/javadoc/org/apache/kafka/st TIP: You can find more details about this mechanism https://developer.confluent.io/tutorials/error-handling/confluent.html[here]. -To implement your own handler, you can listen to the application event api:io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart[] and configure the streams with your own business logic: - -[source,java] ----- -include::{testskafkastreams}/uncaught/MyStreamsUncaughtExceptionHandler.java[tags=imports, indent=0] - -include::{testskafkastreams}/uncaught/MyStreamsUncaughtExceptionHandler.java[tags=clazz, indent=0] ----- - -However, if you just want to take the same action every time, you can set the application property `kafka.streams.[STREAM-NAME].uncaught-exception-handler` to a valid action, such as `REPLACE_THREAD`. +If you just want to take the same action every time, you can set the application property `kafka.streams.[STREAM-NAME].uncaught-exception-handler` to a valid action, such as `REPLACE_THREAD`. For example in `application-test.yml`: @@ -24,3 +15,12 @@ kafka: my-stream: uncaught-exception-handler: REPLACE_THREAD ---- + +To implement your own handler, you can listen to the application event api:io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart[] and configure the streams with your own business logic: + +[source,java] +---- +include::{testskafkastreams}/uncaught/MyStreamsUncaughtExceptionHandler.java[tags=imports, indent=0] + +include::{testskafkastreams}/uncaught/MyStreamsUncaughtExceptionHandler.java[tags=clazz, indent=0] +----