diff --git a/build.gradle b/build.gradle index 712bc727e1..a528efbffd 100644 --- a/build.gradle +++ b/build.gradle @@ -266,6 +266,7 @@ project ('spring-kafka-test') { compile ("org.mockito:mockito-core:$mockitoVersion", optional) compile ("junit:junit:$junit4Version", optional) + compile "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion" compile ("org.assertj:assertj-core:$assertjVersion", optional) compile ("org.apache.logging.log4j:log4j-core:$log4jVersion", optional) diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java new file mode 100644 index 0000000000..48b170d254 --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/EmbeddedKafkaCondition.java @@ -0,0 +1,171 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.test.condition; + +import java.io.IOException; +import java.io.InputStream; +import java.io.StringReader; +import java.lang.reflect.AnnotatedElement; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ExtensionContext.Namespace; +import org.junit.jupiter.api.extension.ExtensionContext.Store; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * JUnit5 condition for an embedded broker. + * + * @author Gary Russell + * @since 2.3 + * + */ +public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver { + + private static final String EMBEDDED_BROKER = "embedded-kafka"; + + private static final ThreadLocal BROKERS = new ThreadLocal<>(); + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) + throws ParameterResolutionException { + + return parameterContext.getParameter().getType().equals(EmbeddedKafkaBroker.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext context) + throws ParameterResolutionException { + + EmbeddedKafkaBroker broker = getBrokerFromStore(context); + Assert.state(broker != null, "Could not find embedded broker instance"); + return broker; + } + + @Override + public void afterAll(ExtensionContext context) { + EmbeddedKafkaBroker broker = BROKERS.get(); + if (broker != null) { + broker.destroy(); + BROKERS.remove(); + } + } + + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) { + Optional element = context.getElement(); + if (element.isPresent()) { + /* + * When running in a spring test context, the EmbeddedKafkaContextCustomizer will + * create the broker. + */ + if (AnnotatedElementUtils.findMergedAnnotation(element.get(), SpringJUnitConfig.class) == null) { + EmbeddedKafka embedded = AnnotatedElementUtils.findMergedAnnotation(element.get(), EmbeddedKafka.class); + if (embedded != null) { + EmbeddedKafkaBroker broker = getBrokerFromStore(context); + if (broker == null) { + broker = createBroker(embedded); + BROKERS.set(broker); + getStore(context).put(EMBEDDED_BROKER, broker); + } + } + } + } + return ConditionEvaluationResult.enabled(""); + } + + @SuppressWarnings("unchecked") + private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) { + EmbeddedKafkaBroker broker; + broker = new EmbeddedKafkaBroker(embedded.count(), + embedded.controlledShutdown(), embedded.topics()); + broker.kafkaPorts(embedded.ports()); + Properties properties = new Properties(); + + for (String pair : embedded.brokerProperties()) { + if (!StringUtils.hasText(pair)) { + continue; + } + try { + properties.load(new StringReader(pair)); + } + catch (Exception ex) { + throw new IllegalStateException("Failed to load broker property from [" + pair + "]", + ex); + } + } + if (StringUtils.hasText(embedded.brokerPropertiesLocation())) { + Resource propertiesResource = new PathMatchingResourcePatternResolver() + .getResource(embedded.brokerPropertiesLocation()); + if (!propertiesResource.exists()) { + throw new IllegalStateException( + "Failed to load broker properties from [" + propertiesResource + + "]: resource does not exist."); + } + try (InputStream in = propertiesResource.getInputStream()) { + Properties p = new Properties(); + p.load(in); + p.forEach((key, value) -> properties.putIfAbsent(key, value)); + } + catch (IOException ex) { + throw new IllegalStateException( + "Failed to load broker properties from [" + propertiesResource + "]", ex); + } + } + broker.brokerProperties((Map) (Map) properties); + broker.afterPropertiesSet(); + return broker; + } + + private EmbeddedKafkaBroker getBrokerFromStore(ExtensionContext context) { + EmbeddedKafkaBroker broker = getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class) == null + ? getStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class) + : getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class); + return broker; + } + + private Store getStore(ExtensionContext context) { + return context.getStore(Namespace.create(getClass(), context)); + } + + private Store getParentStore(ExtensionContext context) { + ExtensionContext parent = context.getParent().get(); + return parent.getStore(Namespace.create(getClass(), parent)); + } + + + public static EmbeddedKafkaBroker getBroker() { + return BROKERS.get(); + } + +} diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/package-info.java new file mode 100644 index 0000000000..a2ac916502 --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes for JUnit5 conditions. + */ +package org.springframework.kafka.test.condition; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java index 45fb3614cc..a7839630da 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafka.java @@ -23,7 +23,10 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import org.junit.jupiter.api.extension.ExtendWith; + import org.springframework.core.annotation.AliasFor; +import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; /** * Annotation that can be specified on a test class that runs Spring Kafka based tests. @@ -59,6 +62,7 @@ * * @see org.springframework.kafka.test.EmbeddedKafkaBroker */ +@ExtendWith(EmbeddedKafkaCondition.class) @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @@ -108,8 +112,8 @@ /** * Properties in form {@literal key=value} that should be added to the broker config - * before runs. Properties may contain property placeholders, e.g. - * {@code delete.topic.enable=${topic.delete:true}}. + * before runs. When used in a Spring test context, properties may contain property + * placeholders, e.g. {@code delete.topic.enable=${topic.delete:true}}. * @return the properties to add * @see #brokerPropertiesLocation() * @see org.springframework.kafka.test.EmbeddedKafkaBroker#brokerProperties(java.util.Map) @@ -118,10 +122,11 @@ /** * Spring {@code Resource} url specifying the location of properties that should be - * added to the broker config. The {@code brokerPropertiesLocation} url and the - * properties themselves may contain placeholders that are resolved during - * initialization. Properties specified by {@link #brokerProperties()} will override - * properties found in {@code brokerPropertiesLocation}. + * added to the broker config. When used in a Spring test context, the + * {@code brokerPropertiesLocation} url and the properties themselves may contain + * placeholders that are resolved during initialization. Properties specified by + * {@link #brokerProperties()} will override properties found in + * {@code brokerPropertiesLocation}. * @return a {@code Resource} url specifying the location of properties to add * @see #brokerProperties() * @see org.springframework.kafka.test.EmbeddedKafkaBroker#brokerProperties(java.util.Map) diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java new file mode 100644 index 0000000000..861da22319 --- /dev/null +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/condition/EmbeddedKafkaConditionTests.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.test.condition; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; + +/** + * @author Gary Russell + * @since 2.3 + * + */ +@EmbeddedKafka +public class EmbeddedKafkaConditionTests { + + @Test + public void test(EmbeddedKafkaBroker broker) { + assertThat(broker.getBrokersAsString()).isNotNull(); + } + +} diff --git a/src/reference/asciidoc/testing.adoc b/src/reference/asciidoc/testing.adoc index fabf1ac97b..93aedf936a 100644 --- a/src/reference/asciidoc/testing.adoc +++ b/src/reference/asciidoc/testing.adoc @@ -263,6 +263,32 @@ Properties defined by `brokerProperties` override properties found in `brokerPro You can use the `@EmbeddedKafka` annotation with JUnit 4 or JUnit 5. +[[embedded-kafka-junit5]] +==== @EmbeddedKafka Annotation with JUnit5 + +Starting with version 2.3, there are two ways to use the `@EmbeddedKafka` annotation with JUnit5. +When used with the `@SpringJunitConfig` annotation, the embedded broker is added to the test application context. +You can auto wire the broker into your test, at the class or method level, to get the broker address list. + +When *not* using the spring test context, the `EmbdeddedKafkaCondition` creates a broker; the condition includes a parameter resolver so you can access the broker in your test method... + +==== +[source, java] +---- +@EmbeddedKafka +public class EmbeddedKafkaConditionTests { + + @Test + public void test(EmbeddedKafkaBroker broker) { + String brokerList = broker.getBrokersAsString(); + ... + } + +} +---- +==== + + ==== Embedded Broker in `@SpringBootTest` Annotations https://start.spring.io/[Spring Initializr] now automatically adds the `spring-kafka-test` dependency in test scope to the project configuration.