Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ project ('spring-kafka-test') {
compile ("org.mockito:mockito-core:$mockitoVersion", optional)

compile ("junit:junit:$junit4Version", optional)
compile "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion"

compile ("org.assertj:assertj-core:$assertjVersion", optional)
compile ("org.apache.logging.log4j:log4j-core:$log4jVersion", optional)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.test.condition;

import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.lang.reflect.AnnotatedElement;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
import org.junit.jupiter.api.extension.ExecutionCondition;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
import org.junit.jupiter.api.extension.ExtensionContext.Store;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;

import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* JUnit5 condition for an embedded broker.
*
* @author Gary Russell
* @since 2.3
*
*/
public class EmbeddedKafkaCondition implements ExecutionCondition, AfterAllCallback, ParameterResolver {

private static final String EMBEDDED_BROKER = "embedded-kafka";

private static final ThreadLocal<EmbeddedKafkaBroker> BROKERS = new ThreadLocal<>();

@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
throws ParameterResolutionException {

return parameterContext.getParameter().getType().equals(EmbeddedKafkaBroker.class);
}

@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext context)
throws ParameterResolutionException {

EmbeddedKafkaBroker broker = getBrokerFromStore(context);
Assert.state(broker != null, "Could not find embedded broker instance");
return broker;
}

@Override
public void afterAll(ExtensionContext context) {
EmbeddedKafkaBroker broker = BROKERS.get();
if (broker != null) {
broker.destroy();
BROKERS.remove();
}
}

@Override
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
Optional<AnnotatedElement> element = context.getElement();
if (element.isPresent()) {
/*
* When running in a spring test context, the EmbeddedKafkaContextCustomizer will
* create the broker.
*/
if (AnnotatedElementUtils.findMergedAnnotation(element.get(), SpringJUnitConfig.class) == null) {
EmbeddedKafka embedded = AnnotatedElementUtils.findMergedAnnotation(element.get(), EmbeddedKafka.class);
if (embedded != null) {
EmbeddedKafkaBroker broker = getBrokerFromStore(context);
if (broker == null) {
broker = createBroker(embedded);
BROKERS.set(broker);
getStore(context).put(EMBEDDED_BROKER, broker);
}
}
}
}
return ConditionEvaluationResult.enabled("");
}

@SuppressWarnings("unchecked")
private EmbeddedKafkaBroker createBroker(EmbeddedKafka embedded) {
EmbeddedKafkaBroker broker;
broker = new EmbeddedKafkaBroker(embedded.count(),
embedded.controlledShutdown(), embedded.topics());
broker.kafkaPorts(embedded.ports());
Properties properties = new Properties();

for (String pair : embedded.brokerProperties()) {
if (!StringUtils.hasText(pair)) {
continue;
}
try {
properties.load(new StringReader(pair));
}
catch (Exception ex) {
throw new IllegalStateException("Failed to load broker property from [" + pair + "]",
ex);
}
}
if (StringUtils.hasText(embedded.brokerPropertiesLocation())) {
Resource propertiesResource = new PathMatchingResourcePatternResolver()
.getResource(embedded.brokerPropertiesLocation());
if (!propertiesResource.exists()) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource
+ "]: resource does not exist.");
}
try (InputStream in = propertiesResource.getInputStream()) {
Properties p = new Properties();
p.load(in);
p.forEach((key, value) -> properties.putIfAbsent(key, value));
}
catch (IOException ex) {
throw new IllegalStateException(
"Failed to load broker properties from [" + propertiesResource + "]", ex);
}
}
broker.brokerProperties((Map<String, String>) (Map<?, ?>) properties);
broker.afterPropertiesSet();
return broker;
}

private EmbeddedKafkaBroker getBrokerFromStore(ExtensionContext context) {
EmbeddedKafkaBroker broker = getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class) == null
? getStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class)
: getParentStore(context).get(EMBEDDED_BROKER, EmbeddedKafkaBroker.class);
return broker;
}

private Store getStore(ExtensionContext context) {
return context.getStore(Namespace.create(getClass(), context));
}

private Store getParentStore(ExtensionContext context) {
ExtensionContext parent = context.getParent().get();
return parent.getStore(Namespace.create(getClass(), parent));
}


public static EmbeddedKafkaBroker getBroker() {
return BROKERS.get();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/**
* Provides classes for JUnit5 conditions.
*/
package org.springframework.kafka.test.condition;
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;

/**
* Annotation that can be specified on a test class that runs Spring Kafka based tests.
Expand Down Expand Up @@ -59,6 +62,7 @@
*
* @see org.springframework.kafka.test.EmbeddedKafkaBroker
*/
@ExtendWith(EmbeddedKafkaCondition.class)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
Expand Down Expand Up @@ -108,8 +112,8 @@

/**
* Properties in form {@literal key=value} that should be added to the broker config
* before runs. Properties may contain property placeholders, e.g.
* {@code delete.topic.enable=${topic.delete:true}}.
* before runs. When used in a Spring test context, properties may contain property
* placeholders, e.g. {@code delete.topic.enable=${topic.delete:true}}.
* @return the properties to add
* @see #brokerPropertiesLocation()
* @see org.springframework.kafka.test.EmbeddedKafkaBroker#brokerProperties(java.util.Map)
Expand All @@ -118,10 +122,11 @@

/**
* Spring {@code Resource} url specifying the location of properties that should be
* added to the broker config. The {@code brokerPropertiesLocation} url and the
* properties themselves may contain placeholders that are resolved during
* initialization. Properties specified by {@link #brokerProperties()} will override
* properties found in {@code brokerPropertiesLocation}.
* added to the broker config. When used in a Spring test context, the
* {@code brokerPropertiesLocation} url and the properties themselves may contain
* placeholders that are resolved during initialization. Properties specified by
* {@link #brokerProperties()} will override properties found in
* {@code brokerPropertiesLocation}.
* @return a {@code Resource} url specifying the location of properties to add
* @see #brokerProperties()
* @see org.springframework.kafka.test.EmbeddedKafkaBroker#brokerProperties(java.util.Map)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.test.condition;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;

/**
* @author Gary Russell
* @since 2.3
*
*/
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

@Test
public void test(EmbeddedKafkaBroker broker) {
assertThat(broker.getBrokersAsString()).isNotNull();
}

}
26 changes: 26 additions & 0 deletions src/reference/asciidoc/testing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,32 @@ Properties defined by `brokerProperties` override properties found in `brokerPro

You can use the `@EmbeddedKafka` annotation with JUnit 4 or JUnit 5.

[[embedded-kafka-junit5]]
==== @EmbeddedKafka Annotation with JUnit5

Starting with version 2.3, there are two ways to use the `@EmbeddedKafka` annotation with JUnit5.
When used with the `@SpringJunitConfig` annotation, the embedded broker is added to the test application context.
You can auto wire the broker into your test, at the class or method level, to get the broker address list.

When *not* using the spring test context, the `EmbdeddedKafkaCondition` creates a broker; the condition includes a parameter resolver so you can access the broker in your test method...

====
[source, java]
----
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {

@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}

}
----
====


==== Embedded Broker in `@SpringBootTest` Annotations

https://start.spring.io/[Spring Initializr] now automatically adds the `spring-kafka-test` dependency in test scope to the project configuration.
Expand Down