Skip to content

Commit

Permalink
Test uncaught exceptions against an actual kafka instance
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo committed Jul 19, 2023
1 parent ef7c015 commit eb12223
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micronaut.configuration.kafka.streams.optimization.OptimizationStream;
import io.micronaut.configuration.kafka.streams.startkafkastreams.StartKafkaStreamsOff;
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorStreamFactory;
import io.micronaut.configuration.kafka.streams.wordcount.WordCountStream;
import io.micronaut.context.annotation.BootstrapContextCompatible;
import io.micronaut.context.annotation.Value;
Expand Down Expand Up @@ -39,6 +40,10 @@ void initializeTopics() {
WordCountStream.OUTPUT,
WordCountStream.NAMED_WORD_COUNT_INPUT,
WordCountStream.NAMED_WORD_COUNT_OUTPUT,
OnErrorStreamFactory.ON_ERROR_NO_CONFIG_INPUT,
OnErrorStreamFactory.ON_ERROR_NO_CONFIG_OUTPUT,
OnErrorStreamFactory.ON_ERROR_REPLACE_INPUT,
OnErrorStreamFactory.ON_ERROR_REPLACE_OUTPUT,
StartKafkaStreamsOff.STREAMS_OFF_INPUT,
StartKafkaStreamsOff.STREAMS_OFF_OUTPUT,
OptimizationStream.OPTIMIZATION_ON_INPUT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.micronaut.configuration.kafka.streams

import groovy.util.logging.Slf4j
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorNoConfigClient
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorNoConfigListener
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorReplaceClient
import io.micronaut.configuration.kafka.streams.uncaught.OnErrorReplaceListener
import io.micronaut.context.annotation.Property
import io.micronaut.inject.qualifiers.Qualifiers
import org.apache.kafka.streams.KafkaStreams
import spock.lang.Shared

@Slf4j
@Property(name = 'spec.name', value = 'UncaughtExceptionsSpec')
class UncaughtExceptionsSpec extends AbstractTestContainersSpec {

@Shared
String onErrorNoConfigAppId = 'kafka-on-error-no-config-' + UUID.randomUUID().toString()

@Shared
String onErrorReplaceAppId = 'kafka-on-error-replace-' + UUID.randomUUID().toString()

protected Map<String, Object> getConfiguration() {
return super.getConfiguration() + [
'kafka.streams.on-error-no-config.client.id': UUID.randomUUID(),
'kafka.streams.on-error-no-config.application.id': onErrorNoConfigAppId,
'kafka.streams.on-error-no-config.group.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.client.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.application.id': onErrorReplaceAppId,
'kafka.streams.on-error-replace.group.id': UUID.randomUUID(),
'kafka.streams.on-error-replace.uncaught-exception-handler': 'REPLACE_THREAD']
}

void "test uncaught exception with no exception handler"() {
given: "a stream configured with no exception handler"
def stream = context.getBean(KafkaStreams, Qualifiers.byName('on-error-no-config'))
def client = context.getBean(OnErrorNoConfigClient)
def listener = context.getBean(OnErrorNoConfigListener)

when: "the stream thread throws an uncaught exception"
client.send('ERROR')
client.send('hello')

then: "the stream enters ERROR state"
conditions.eventually {
stream.state() == KafkaStreams.State.ERROR
}
stream.metadataForLocalThreads().empty == true
listener.received == null
}

void "test uncaught exception with REPLACE_THREAD"() {
given: "a stream configured with REPLACE_THREAD"
def stream = context.getBean(KafkaStreams, Qualifiers.byName("on-error-replace"))
def client = context.getBean(OnErrorReplaceClient)
def listener = context.getBean(OnErrorReplaceListener)

when: "the stream thread throws an uncaught exception"
client.send('ERROR')
client.send('hello')

then: "the stream replaces the thread and keeps running"
conditions.eventually {
listener.received == 'HELLO'
}
stream.state() == KafkaStreams.State.RUNNING
stream.metadataForLocalThreads().empty == false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaClient
public interface OnErrorNoConfigClient {

@Topic(OnErrorStreamFactory.ON_ERROR_NO_CONFIG_INPUT)
void send(String message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaListener(offsetReset = EARLIEST, groupId = "OnErrorNoConfigListener", uniqueGroupId = true)
public class OnErrorNoConfigListener {

public String received;

@Topic(OnErrorStreamFactory.ON_ERROR_NO_CONFIG_OUTPUT)
void receive(String message) {
received = message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaClient
public interface OnErrorReplaceClient {

@Topic(OnErrorStreamFactory.ON_ERROR_REPLACE_INPUT)
void send(String message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@KafkaListener(offsetReset = EARLIEST, groupId = "OnErrorReplaceListener", uniqueGroupId = true)
public class OnErrorReplaceListener {

public String received;

@Topic(OnErrorStreamFactory.ON_ERROR_REPLACE_OUTPUT)
void receive(String message) {
received = message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.micronaut.configuration.kafka.streams.uncaught;

import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.util.concurrent.atomic.AtomicBoolean;

@Requires(property = "spec.name", value = "UncaughtExceptionsSpec")
@Factory
public class OnErrorStreamFactory {

public static final String ON_ERROR_NO_CONFIG = "on-error-no-config";
public static final String ON_ERROR_NO_CONFIG_INPUT = "on-error-no-config-input";
public static final String ON_ERROR_NO_CONFIG_OUTPUT = "on-error-no-config-output";
public static final String ON_ERROR_REPLACE = "on-error-replace";
public static final String ON_ERROR_REPLACE_INPUT = "on-error-replace-input";
public static final String ON_ERROR_REPLACE_OUTPUT = "on-error-replace-output";

@Singleton
@Named(ON_ERROR_NO_CONFIG)
KStream<String, String> createOnErrorNoConfigStream(@Named(ON_ERROR_NO_CONFIG) ConfiguredStreamBuilder builder) {
final KStream<String, String> source = builder
.stream(ON_ERROR_NO_CONFIG_INPUT, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(makeErrorValueMapper());
source.to(ON_ERROR_REPLACE_OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
return source;
}

@Singleton
@Named(ON_ERROR_REPLACE)
KStream<String, String> createOnErrorReplaceThreadStream(@Named(ON_ERROR_REPLACE) ConfiguredStreamBuilder builder) {
final KStream<String, String> source = builder
.stream(ON_ERROR_REPLACE_INPUT, Consumed.with(Serdes.String(), Serdes.String()))
.mapValues(makeErrorValueMapper());
source.to(ON_ERROR_REPLACE_OUTPUT, Produced.with(Serdes.String(), Serdes.String()));
return source;
}

static ValueMapper<String, String> makeErrorValueMapper() {
final AtomicBoolean flag = new AtomicBoolean();
return value -> {
// Throw an exception only the first time we find "ERROR"
if (flag.compareAndSet(false, true) && value.equals("ERROR")) {
throw new IllegalStateException("Uh-oh! Prepare for an uncaught exception");
}
return value.toUpperCase();
};
}
}

0 comments on commit eb12223

Please sign in to comment.