Skip to content

Commit

Permalink
Add guide section to document the new feature
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo committed Jul 19, 2023
1 parent eb12223 commit de8b0f3
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.micronaut.configuration.kafka.streams.uncaught;

// tag::imports[]
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
// end::imports[]

@Requires(property = "do.not.enable", value = "just for documentation purposes")
// tag::clazz[]
@Singleton
public class MyStreamsUncaughtExceptionHandler
implements ApplicationEventListener<BeforeKafkaStreamStart>, StreamsUncaughtExceptionHandler {

@Override
public void onApplicationEvent(BeforeKafkaStreamStart event) {
event.getKafkaStreams().setUncaughtExceptionHandler(this);
}

@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
// end::imports[]
26 changes: 26 additions & 0 deletions src/main/docs/guide/kafkaStreams/kafkaStreamExceptions.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Since version 2.8.0, Kafka allows you to https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.html[handle uncaught exceptions] that may be thrown from your streams. This handler must return the action that must be taken, depending on the thrown exception.

There are three possible https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.html[responses]: `REPLACE_THREAD`, `SHUTDOWN_CLIENT`, or `SHUTDOWN_APPLICATION`.

TIP: You can find more details about this mechanism https://developer.confluent.io/tutorials/error-handling/confluent.html[here].

To implement your own handler, you can listen to the application event api:io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart[] and configure the streams with your own business logic:

[source,java]
----
include::{testskafkastreams}/uncaught/MyStreamsUncaughtExceptionHandler.java[tags=imports, indent=0]
include::{testskafkastreams}/uncaught/MyStreamsUncaughtExceptionHandler.java[tags=clazz, indent=0]
----

However, if you just want to take the same action every time, you can set the application property `kafka.streams.[STREAM-NAME].uncaught-exception-handler` to a valid action, such as `REPLACE_THREAD`.

For example in `application-test.yml`:

[source,yaml]
----
kafka:
streams:
my-stream:
uncaught-exception-handler: REPLACE_THREAD
----
1 change: 1 addition & 0 deletions src/main/docs/guide/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ kafkaStreams:
title: Kafka Streams
kafkaStreamInteractiveQuery: Interactive Query Service
kafkaStreamHealth: Kafka Stream Health Checks
kafkaStreamExceptions: Handling Uncaught Exceptions
repository: Repository

0 comments on commit de8b0f3

Please sign in to comment.