diff --git a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java index 20b3d7a80..9d0475936 100644 --- a/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java +++ b/kafka-streams/src/main/java/io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.java @@ -173,7 +173,7 @@ public void close() { * @param kafkaStreams The kafka streams to configure. * @return An optional exception handler if {@code uncaught-exception-handler} was configured. */ - private static Optional setUncaughtExceptionHandler(Properties properties, KafkaStreams kafkaStreams) { + Optional setUncaughtExceptionHandler(Properties properties, KafkaStreams kafkaStreams) { final Optional uncaughtExceptionHandler = Optional .ofNullable(properties.getProperty(UNCAUGHT_EXCEPTION_HANDLER_PROPERTY)) .filter(not(String::isBlank)) diff --git a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsFactorySpec.groovy b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsFactorySpec.groovy index c31bef15a..8e8f952a3 100644 --- a/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsFactorySpec.groovy +++ b/kafka-streams/src/test/groovy/io/micronaut/configuration/kafka/streams/KafkaStreamsFactorySpec.groovy @@ -2,20 +2,20 @@ package io.micronaut.configuration.kafka.streams import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler -import spock.lang.Specification import spock.lang.Unroll import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.* -class KafkaStreamsFactorySpec extends Specification { +class KafkaStreamsFactorySpec extends AbstractTestContainersSpec { void "set exception handler when no config is given"() { given: + KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory) KafkaStreams kafkaStreams = Mock() Properties props = new Properties() when: - def handler = KafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams) + def handler = kafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams) then: handler.empty @@ -24,11 +24,12 @@ class KafkaStreamsFactorySpec extends Specification { void "set exception handler when no valid config is given"() { given: + KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory) KafkaStreams kafkaStreams = Mock() Properties props = ['uncaught-exception-handler': config] when: - def handler = KafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams) + def handler = kafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams) then: handler.empty @@ -41,16 +42,18 @@ class KafkaStreamsFactorySpec extends Specification { @Unroll void "set exception handler when given config is #config"(String config) { given: + KafkaStreamsFactory kafkaStreamsFactory = context.getBean(KafkaStreamsFactory) KafkaStreams kafkaStreams = Mock() Properties props = ['uncaught-exception-handler': config] when: - def handler = KafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams) + def handler = kafkaStreamsFactory.setUncaughtExceptionHandler(props, kafkaStreams) then: handler.present handler.get().handle(null) == expected 1 * kafkaStreams.setUncaughtExceptionHandler(_ as StreamsUncaughtExceptionHandler) + 0 * _ where: config | expected