Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for setting uncaught exception handler #780

Merged
merged 10 commits into from
Aug 28, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.slf4j.Logger;
Expand All @@ -36,8 +38,12 @@
import java.io.Closeable;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

import static java.util.Arrays.asList;
import static java.util.function.Predicate.not;

/**
* A factory that constructs the {@link KafkaStreams} bean.
Expand All @@ -51,6 +57,7 @@ public class KafkaStreamsFactory implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsFactory.class);

private static final String START_KAFKA_STREAMS_PROPERTY = "start-kafka-streams";
private static final String UNCAUGHT_EXCEPTION_HANDLER_PROPERTY = "uncaught-exception-handler";

private final Map<KafkaStreams, ConfiguredStreamBuilder> streams = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -108,6 +115,7 @@ KafkaStreams kafkaStreams(
builder.getConfiguration(),
kafkaClientSupplier
);
setUncaughtExceptionHandler(builder.getConfiguration(), kafkaStreams);
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
final String startKafkaStreamsValue = builder.getConfiguration().getProperty(
START_KAFKA_STREAMS_PROPERTY, Boolean.TRUE.toString());
final boolean startKafkaStreams = Boolean.parseBoolean(startKafkaStreamsValue);
Expand Down Expand Up @@ -158,4 +166,35 @@ public void close() {
}
}

/**
* Set the uncaught exception handler for a given kafka streams instance.
*
* @param properties The configuration for the given kafka streams.
* @param kafkaStreams The kafka streams to configure.
* @return An optional exception handler if {@code uncaught-exception-handler} was configured.
*/
Optional<StreamsUncaughtExceptionHandler> setUncaughtExceptionHandler(Properties properties, KafkaStreams kafkaStreams) {
final Optional<StreamsUncaughtExceptionHandler> uncaughtExceptionHandler = Optional
.ofNullable(properties.getProperty(UNCAUGHT_EXCEPTION_HANDLER_PROPERTY))
.filter(not(String::isBlank))
.map(action -> {
try {
final StreamThreadExceptionResponse response = StreamThreadExceptionResponse.valueOf(action.toUpperCase());
return exception -> {
if (LOG.isWarnEnabled()) {
LOG.warn("Responding with {} to unexpected exception thrown by kafka stream thread", response, exception);
}
return response;
};
} catch (IllegalArgumentException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Ignoring illegal exception handler: {}. Please use one of: {}", action,
asList(StreamThreadExceptionResponse.values()));
}
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do something like?

    /**
     * Make an uncaught exception handler for a given kafka streams configuration.
     *
     * @param properties The kafka streams configuration.
     * @return An optional exception handler if {@code uncaught-exception-handler} was configured.
     */
    private Optional<StreamsUncaughtExceptionHandler> makeUncaughtExceptionHandler(Properties properties) {
        return Optional.ofNullable(properties.getProperty(UNCAUGHT_EXCEPTION_HANDLER_PROPERTY))
            .filter(not(String::isBlank))
            .flatMap(this::makeUncaughtExceptionHandler);
    }
    
    private Optional<StreamsUncaughtExceptionHandler> makeUncaughtExceptionHandler(String action) {
                try {
                    final StreamThreadExceptionResponse response = StreamThreadExceptionResponse.valueOf(action.toUpperCase());
                    return exception -> {
                        if (LOG.isWarnEnabled()) {
                            LOG.warn("Responding with {} to unexpected exception thrown by kafka stream thread", response, exception);
                        }
                        return Optional.of(response);
                    };
                } catch (IllegalArgumentException e) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Ignoring illegal exception handler: {}. Please use one of: {}", action,
                            asList(StreamThreadExceptionResponse.values()));
                    }
                    return Optional.empty();
                }
            });
    }

}
});
uncaughtExceptionHandler.ifPresent(kafkaStreams::setUncaughtExceptionHandler);
return uncaughtExceptionHandler;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.micronaut.configuration.kafka.streams

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler
import spock.lang.Unroll

import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*

class KafkaStreamsFactorySpec extends AbstractTestContainersSpec {

void "set exception handler when no config is given"() {
given:
KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory)
KafkaStreams kafkaStreams = Mock()
Properties props = new Properties()

when:
def handler = kafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams)

then:
handler.empty
0 * _
}

void "set exception handler when no valid config is given"() {
given:
KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory)
KafkaStreams kafkaStreams = Mock()
Properties props = ['uncaught-exception-handler': config]

when:
def handler = kafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams)

then:
handler.empty
0 * _

where:
config << ['', ' ', 'ILLEGAL_VALUE', '!!REPLACE_THREAD!!']
}

@Unroll
void "set exception handler when given config is #config"(String config) {
given:
KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory)
KafkaStreams kafkaStreams = Mock()
Properties props = ['uncaught-exception-handler': config]

when:
def handler = kafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams)

then:
handler.present
handler.get().handle(null) == expected
1 * kafkaStreams.setUncaughtExceptionHandler(_ as StreamsUncaughtExceptionHandler)
0 * _

where:
config | expected
'replace_thread' | REPLACE_THREAD
'shutdown_CLIENT' | SHUTDOWN_CLIENT
'SHUTDOWN_APPLICATION' | SHUTDOWN_APPLICATION
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micronaut.configuration.kafka.streams.optimization.OptimizationStream;
import io.micronaut.configuration.kafka.streams.startkafkastreams.StartKafkaStreamsOff;
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorStreamFactory;
import io.micronaut.configuration.kafka.streams.wordcount.WordCountStream;
import io.micronaut.context.annotation.BootstrapContextCompatible;
import io.micronaut.context.annotation.Value;
Expand Down Expand Up @@ -39,6 +40,10 @@ void initializeTopics() {
WordCountStream.OUTPUT,
WordCountStream.NAMED_WORD_COUNT_INPUT,
WordCountStream.NAMED_WORD_COUNT_OUTPUT,
OnErrorStreamFactory.ON_ERROR_NO_CONFIG_INPUT,
OnErrorStreamFactory.ON_ERROR_NO_CONFIG_OUTPUT,
OnErrorStreamFactory.ON_ERROR_REPLACE_INPUT,
OnErrorStreamFactory.ON_ERROR_REPLACE_OUTPUT,
StartKafkaStreamsOff.STREAMS_OFF_INPUT,
StartKafkaStreamsOff.STREAMS_OFF_OUTPUT,
OptimizationStream.OPTIMIZATION_ON_INPUT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.micronaut.configuration.kafka.streams

import groovy.util.logging.Slf4j
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorNoConfigClient
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorNoConfigListener
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorReplaceClient
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorReplaceListener
import io.micronaut.context.annotation.Property
import io.micronaut.inject.qualifiers.Qualifiers
import org.apache.kafka.streams.KafkaStreams
import spock.lang.Shared

@Slf4j
@Property(name = 'spec.name', value = 'UncaughtExceptionsSpec')
class UncaughtExceptionsSpec extends AbstractTestContainersSpec {

@Shared
String onErrorNoConfigAppId = 'kafka-on-error-no-config-' + UUID.randomUUID().toString()

@Shared
String onErrorReplaceAppId = 'kafka-on-error-replace-' + UUID.randomUUID().toString()

protected Map<String, Object> getConfiguration() {
return super.getConfiguration() + [
'kafka.streams.on-error-no-config.client.id': UUID.randomUUID(),
'kafka.streams.on-error-no-config.application.id': onErrorNoConfigAppId,
'kafka.streams.on-error-no-config.group.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.client.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.application.id': onErrorReplaceAppId,
'kafka.streams.on-error-replace.group.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.uncaught-exception-handler': 'REPLACE_THREAD']
}

void "test uncaught exception with no exception handler"() {
given: "a stream configured with no exception handler"
def stream = context.getBean(KafkaStreams, Qualifiers.byName('on-error-no-config'))
def client = context.getBean(OnErrorNoConfigClient)
def listener = context.getBean(OnErrorNoConfigListener)

when: "the stream thread throws an uncaught exception"
client.send('ERROR')
client.send('hello')

then: "the stream enters ERROR state"
conditions.eventually {
stream.state() == KafkaStreams.State.ERROR
}
stream.metadataForLocalThreads().empty == true
listener.received == null
}

void "test uncaught exception with REPLACE_THREAD"() {
given: "a stream configured with REPLACE_THREAD"
def stream = context.getBean(KafkaStreams, Qualifiers.byName("on-error-replace"))
def client = context.getBean(OnErrorReplaceClient)
def listener = context.getBean(OnErrorReplaceListener)

when: "the stream thread throws an uncaught exception"
client.send('ERROR')
client.send('hello')

then: "the stream replaces the thread and keeps running"
conditions.eventually {
listener.received == 'HELLO'
}
stream.state() == KafkaStreams.State.RUNNING
stream.metadataForLocalThreads().empty == false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.micronaut.configuration.kafka.streams.uncaught;

// tag::imports[]
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventListener;
import jakarta.inject.Singleton;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
// end::imports[]

@Requires(property = "do.not.enable", value = "just for documentation purposes")
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
// tag::clazz[]
@Singleton
public class MyStreamsUncaughtExceptionHandler
implements ApplicationEventListener<BeforeKafkaStreamStart>, StreamsUncaughtExceptionHandler {

@Override
public void onApplicationEvent(BeforeKafkaStreamStart event) {
event.getKafkaStreams().setUncaughtExceptionHandler(this);
}

@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
if (exception instanceof MyException) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
}
}
// end::clazz[]

interface MyException { }
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaClient
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
public interface OnErrorNoConfigClient {

@Topic(OnErrorStreamFactory.ON_ERROR_NO_CONFIG_INPUT)
void send(String message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaListener(offsetReset = EARLIEST, groupId = "OnErrorNoConfigListener", uniqueGroupId = true)
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
public class OnErrorNoConfigListener {

public String received;

@Topic(OnErrorStreamFactory.ON_ERROR_NO_CONFIG_OUTPUT)
void receive(String message) {
received = message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaClient
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
public interface OnErrorReplaceClient {

@Topic(OnErrorStreamFactory.ON_ERROR_REPLACE_INPUT)
void send(String message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaListener(offsetReset = EARLIEST, groupId = "OnErrorReplaceListener", uniqueGroupId = true)
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
public class OnErrorReplaceListener {

public String received;

@Topic(OnErrorStreamFactory.ON_ERROR_REPLACE_OUTPUT)
void receive(String message) {
received = message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.concurrent.atomic.AtomicBoolean;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@Factory
guillermocalvo marked this conversation as resolved.
Show resolved Hide resolved
public class OnErrorStreamFactory {

public static final String ON_ERROR_NO_CONFIG = "on-error-no-config";
public static final String ON_ERROR_NO_CONFIG_INPUT = "on-error-no-config-input";
public static final String ON_ERROR_NO_CONFIG_OUTPUT = "on-error-no-config-output";
public static final String ON_ERROR_REPLACE = "on-error-replace";
public static final String ON_ERROR_REPLACE_INPUT = "on-error-replace-input";
public static final String ON_ERROR_REPLACE_OUTPUT = "on-error-replace-output";

@Singleton
@Named(ON_ERROR_NO_CONFIG)
KStream<String, String> createOnErrorNoConfigStream(@Named(ON_ERROR_NO_CONFIG) ConfiguredStreamBuilder builder) {
final KStream<String, String> source = builder
.stream(ON_ERROR_NO_CONFIG_INPUT, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(makeErrorValueMapper());
source.to(ON_ERROR_REPLACE_OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
return source;
}

@Singleton
@Named(ON_ERROR_REPLACE)
KStream<String, String> createOnErrorReplaceThreadStream(@Named(ON_ERROR_REPLACE) ConfiguredStreamBuilder builder) {
final KStream<String, String> source = builder
.stream(ON_ERROR_REPLACE_INPUT, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(makeErrorValueMapper());
source.to(ON_ERROR_REPLACE_OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
return source;
}

static ValueMapper<String, String> makeErrorValueMapper() {
final AtomicBoolean flag = new AtomicBoolean();
return value -> {
// Throw an exception only the first time we find "ERROR"
if (flag.compareAndSet(false, true) && value.equals("ERROR")) {
throw new IllegalStateException("Uh-oh! Prepare for an uncaught exception");
}
return value.toUpperCase();
};
}
}
Loading