Skip to content

Commit

Permalink
Support for setting uncaught exception handler (#780)
Browse files Browse the repository at this point in the history
* Implement mechanism to set uncaught exception handler

* Add unit tests for KafkaStreamsFactory

* Test uncaught exceptions against an actual kafka instance

* Add guide section to document the new feature

* Reverse the order of the approaches to show the simpler option first

* Refactor unit tests to try and fix SonarQube

* Small refactor

* Add tests to check the custom exception handler

* Use `configuration` macro instead of `source`

---------

Co-authored-by: Sergio del Amo <sergio.delamo@softamo.com>
  • Loading branch information
guillermocalvo and sdelamo authored Aug 28, 2023
1 parent 2056ac0 commit e75ed8e
Show file tree
Hide file tree
Showing 14 changed files with 457 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.slf4j.Logger;
Expand All @@ -36,8 +38,12 @@
import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

import static java.util.Arrays.asList;
import static java.util.function.Predicate.not;

/**
* A factory that constructs the {@link KafkaStreams} bean.
Expand All @@ -51,6 +57,7 @@ public class KafkaStreamsFactory implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsFactory.class);

private static final String START_KAFKA_STREAMS_PROPERTY = "start-kafka-streams";
private static final String UNCAUGHT_EXCEPTION_HANDLER_PROPERTY = "uncaught-exception-handler";

private final Map<KafkaStreams, ConfiguredStreamBuilder> streams = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -108,6 +115,7 @@ KafkaStreams kafkaStreams(
builder.getConfiguration(),
kafkaClientSupplier
);
makeUncaughtExceptionHandler(builder.getConfiguration()).ifPresent(kafkaStreams::setUncaughtExceptionHandler);
final String startKafkaStreamsValue = builder.getConfiguration().getProperty(
START_KAFKA_STREAMS_PROPERTY, Boolean.TRUE.toString());
final boolean startKafkaStreams = Boolean.parseBoolean(startKafkaStreamsValue);
Expand Down Expand Up @@ -158,4 +166,31 @@ public void close() {
}
}

/**
* Make an uncaught exception handler for a given kafka streams configuration.
*
* @param properties The kafka streams configuration.
* @return An optional exception handler if {@code uncaught-exception-handler} was configured.
*/
Optional<StreamsUncaughtExceptionHandler> makeUncaughtExceptionHandler(Properties properties) {
return Optional.ofNullable(properties.getProperty(UNCAUGHT_EXCEPTION_HANDLER_PROPERTY))
.filter(not(String::isBlank))
.map(action -> {
try {
final StreamThreadExceptionResponse response = StreamThreadExceptionResponse.valueOf(action.toUpperCase());
return exception -> {
if (LOG.isWarnEnabled()) {
LOG.warn("Responding with {} to unexpected exception thrown by kafka stream thread", response, exception);
}
return response;
};
} catch (IllegalArgumentException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Ignoring illegal exception handler: {}. Please use one of: {}", action,
asList(StreamThreadExceptionResponse.values()));
}
return null;
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.micronaut.configuration.kafka.streams

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.configuration.kafka.streams.uncaught.CustomUncaughtHandlerStreamFactory
import io.micronaut.configuration.kafka.streams.uncaught.MyStreamsUncaughtExceptionHandler
import io.micronaut.context.annotation.Property
import io.micronaut.context.annotation.Requires
import io.micronaut.inject.qualifiers.Qualifiers
import org.apache.kafka.streams.KafkaStreams
import spock.lang.Shared

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST

@Property(name = 'spec.name', value = 'CustomUncaughtHandlerSpec')
class CustomUncaughtHandlerSpec extends AbstractTestContainersSpec {

protected Map<String, Object> getConfiguration() {
return super.getConfiguration() + [
'kafka.streams.my-custom-handler.client.id' : UUID.randomUUID(),
'kafka.streams.my-custom-handler.application.id' : 'my-custom-handler-' + UUID.randomUUID().toString(),
'kafka.streams.my-custom-handler.group.id' : UUID.randomUUID()
]
}

void "test uncaught exception with custom handler"() {
given: "a stream configured with custom handler"
def stream = context.getBean(KafkaStreams, Qualifiers.byName(CustomUncaughtHandlerStreamFactory.CUSTOM_HANDLER))
def client = context.getBean(MyCustomHandlerClient)
def listener = context.getBean(MyCustomHandlerListener)

when: "the stream thread throws an uncaught exception"
client.send('ERROR')
client.send('hello')

then: "the stream replaces the thread and keeps running"
conditions.eventually {
listener.received == 'HELLO'
}
stream.state() == KafkaStreams.State.RUNNING
stream.metadataForLocalThreads().empty == false

and: "the custom uncaught exception handler avoided the danger"
context.getBean(MyStreamsUncaughtExceptionHandler).dangerAvoided
}

@Requires(property = "spec.name", value = "CustomUncaughtHandlerSpec")
@KafkaClient
static interface MyCustomHandlerClient {
@Topic(CustomUncaughtHandlerStreamFactory.CUSTOM_HANDLER_INPUT)
void send(String message)
}

@Requires(property = "spec.name", value = "CustomUncaughtHandlerSpec")
@KafkaListener(offsetReset = EARLIEST, groupId = "MyCustomHandlerListener", uniqueGroupId = true)
static class MyCustomHandlerListener {
String received
@Topic(CustomUncaughtHandlerStreamFactory.CUSTOM_HANDLER_OUTPUT)
void receive(String message) {
received = message
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.micronaut.configuration.kafka.streams

import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler
import spock.lang.Unroll

import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*

class KafkaStreamsFactorySpec extends AbstractTestContainersSpec {

void "set exception handler when no config is given"() {
given:
KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory)
Properties props = new Properties()

when:
Optional<StreamsUncaughtExceptionHandler> handler = kafkaStreamsFactory.makeUncaughtExceptionHandler(props)

then:
handler.empty
}

void "set exception handler when no valid config is given"() {
given:
KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory)
Properties props = ['uncaught-exception-handler': config]

when:
Optional<StreamsUncaughtExceptionHandler> handler = kafkaStreamsFactory.makeUncaughtExceptionHandler(props)

then:
handler.empty

where:
config << ['', ' ', 'ILLEGAL_VALUE', '!!REPLACE_THREAD!!']
}

@Unroll
void "set exception handler when given config is #config"(String config) {
given:
KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory)
Properties props = ['uncaught-exception-handler': config]

when:
Optional<StreamsUncaughtExceptionHandler> handler = kafkaStreamsFactory.makeUncaughtExceptionHandler(props)

then:
handler.present
handler.get().handle(null) == expected

where:
config | expected
'replace_thread' | REPLACE_THREAD
'shutdown_CLIENT' | SHUTDOWN_CLIENT
'SHUTDOWN_APPLICATION' | SHUTDOWN_APPLICATION
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.micronaut.configuration.kafka.streams.optimization.OptimizationStream;
import io.micronaut.configuration.kafka.streams.startkafkastreams.StartKafkaStreamsOff;
import io.micronaut.configuration.kafka.streams.uncaught.CustomUncaughtHandlerStreamFactory;
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorStreamFactory;
import io.micronaut.configuration.kafka.streams.wordcount.WordCountStream;
import io.micronaut.context.annotation.BootstrapContextCompatible;
import io.micronaut.context.annotation.Value;
Expand Down Expand Up @@ -39,6 +41,12 @@ void initializeTopics() {
WordCountStream.OUTPUT,
WordCountStream.NAMED_WORD_COUNT_INPUT,
WordCountStream.NAMED_WORD_COUNT_OUTPUT,
OnErrorStreamFactory.ON_ERROR_NO_CONFIG_INPUT,
OnErrorStreamFactory.ON_ERROR_NO_CONFIG_OUTPUT,
OnErrorStreamFactory.ON_ERROR_REPLACE_INPUT,
OnErrorStreamFactory.ON_ERROR_REPLACE_OUTPUT,
CustomUncaughtHandlerStreamFactory.CUSTOM_HANDLER_INPUT,
CustomUncaughtHandlerStreamFactory.CUSTOM_HANDLER_OUTPUT,
StartKafkaStreamsOff.STREAMS_OFF_INPUT,
StartKafkaStreamsOff.STREAMS_OFF_OUTPUT,
OptimizationStream.OPTIMIZATION_ON_INPUT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.micronaut.configuration.kafka.streams

import groovy.util.logging.Slf4j
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorNoConfigClient
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorNoConfigListener
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorReplaceClient
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorReplaceListener
import io.micronaut.context.annotation.Property
import io.micronaut.inject.qualifiers.Qualifiers
import org.apache.kafka.streams.KafkaStreams
import spock.lang.Shared

@Slf4j
@Property(name = 'spec.name', value = 'UncaughtExceptionsSpec')
class UncaughtExceptionsSpec extends AbstractTestContainersSpec {

@Shared
String onErrorNoConfigAppId = 'kafka-on-error-no-config-' + UUID.randomUUID().toString()

@Shared
String onErrorReplaceAppId = 'kafka-on-error-replace-' + UUID.randomUUID().toString()

protected Map<String, Object> getConfiguration() {
return super.getConfiguration() + [
'kafka.streams.on-error-no-config.client.id': UUID.randomUUID(),
'kafka.streams.on-error-no-config.application.id': onErrorNoConfigAppId,
'kafka.streams.on-error-no-config.group.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.client.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.application.id': onErrorReplaceAppId,
'kafka.streams.on-error-replace.group.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.uncaught-exception-handler': 'REPLACE_THREAD']
}

void "test uncaught exception with no exception handler"() {
given: "a stream configured with no exception handler"
def stream = context.getBean(KafkaStreams, Qualifiers.byName('on-error-no-config'))
def client = context.getBean(OnErrorNoConfigClient)
def listener = context.getBean(OnErrorNoConfigListener)

when: "the stream thread throws an uncaught exception"
client.send('ERROR')
client.send('hello')

then: "the stream enters ERROR state"
conditions.eventually {
stream.state() == KafkaStreams.State.ERROR
}
stream.metadataForLocalThreads().empty == true
listener.received == null
}

void "test uncaught exception with REPLACE_THREAD"() {
given: "a stream configured with REPLACE_THREAD"
def stream = context.getBean(KafkaStreams, Qualifiers.byName("on-error-replace"))
def client = context.getBean(OnErrorReplaceClient)
def listener = context.getBean(OnErrorReplaceListener)

when: "the stream thread throws an uncaught exception"
client.send('ERROR')
client.send('hello')

then: "the stream replaces the thread and keeps running"
conditions.eventually {
listener.received == 'HELLO'
}
stream.state() == KafkaStreams.State.RUNNING
stream.metadataForLocalThreads().empty == false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.concurrent.atomic.AtomicBoolean;

@Requires(property = "spec.name", value = "CustomUncaughtHandlerSpec")
@Factory
public class CustomUncaughtHandlerStreamFactory {
public static final String CUSTOM_HANDLER = "my-custom-handler";
public static final String CUSTOM_HANDLER_INPUT = "my-custom-handler-input";
public static final String CUSTOM_HANDLER_OUTPUT = "my-custom-handler-output";

@Singleton
@Named(CUSTOM_HANDLER)
KStream<String, String> createMyCustomHandlerStream(@Named(CUSTOM_HANDLER) ConfiguredStreamBuilder builder) {
final AtomicBoolean flag = new AtomicBoolean();
final KStream<String, String> source = builder
.stream(CUSTOM_HANDLER_INPUT, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(value -> {
// Throw a custom exception only the first time we find "ERROR"
if (flag.compareAndSet(false, true) && value.equals("ERROR")) {
throw new MyException("Uh-oh! Prepare for an uncaught exception");
}
return value.toUpperCase();
});
source.to(CUSTOM_HANDLER_OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
return source;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.micronaut.configuration.kafka.streams.uncaught;

// tag::imports[]
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
// end::imports[]

@Requires(property = "spec.name", value = "CustomUncaughtHandlerSpec")
// tag::clazz[]
@Singleton
public class MyStreamsUncaughtExceptionHandler
implements ApplicationEventListener<BeforeKafkaStreamStart>, StreamsUncaughtExceptionHandler {

boolean dangerAvoided = false;

@Override
public void onApplicationEvent(BeforeKafkaStreamStart event) {
event.getKafkaStreams().setUncaughtExceptionHandler(this);
}

@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
if (exception.getCause() instanceof MyException) {
this.dangerAvoided = true;
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
}
// end::clazz[]

class MyException extends RuntimeException{
public MyException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaClient
public interface OnErrorNoConfigClient {

@Topic(OnErrorStreamFactory.ON_ERROR_NO_CONFIG_INPUT)
void send(String message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaListener(offsetReset = EARLIEST, groupId = "OnErrorNoConfigListener", uniqueGroupId = true)
public class OnErrorNoConfigListener {

public String received;

@Topic(OnErrorStreamFactory.ON_ERROR_NO_CONFIG_OUTPUT)
void receive(String message) {
received = message;
}
}
Loading

0 comments on commit e75ed8e

Please sign in to comment.