Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions spring-kafka-docs/src/main/asciidoc/testing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ Instead of default `spring.embedded.kafka.brokers` system property, the address
For this purpose a `spring.embedded.kafka.brokers.property` (`EmbeddedKafkaBroker.BROKER_LIST_PROPERTY`) system property can be set before starting an embedded Kafka.
For example, with Spring Boot a `spring.kafka.bootstrap-servers` configuration property is expected to be set for auto-configuring Kafka client, respectively.
So, before running tests with an embedded Kafka on random ports, we can set `spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers` as a system property - and the `EmbeddedKafkaBroker` will use it to expose its broker addresses.
This is now the default value for this property (starting with version 3.0.10).

With the `EmbeddedKafkaBroker.brokerProperties(Map<String, String>)`, you can provide additional properties for the Kafka servers.
See https://kafka.apache.org/documentation/#brokerconfigs[Kafka Config] for more information about possible broker properties.
Expand Down Expand Up @@ -235,7 +236,8 @@ In addition, these properties can be provided:
Essentially these properties mimic some of the `@EmbeddedKafka` attributes.

See more information about configuration properties and how to provide them in the https://junit.org/junit5/docs/current/user-guide/#running-tests-config-params[JUnit 5 User Guide].
For example, a `spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers` entry (for testing in Spring Boot application) can be added into a `junit-platform.properties` file in the testing classpath.
For example, a `spring.embedded.kafka.brokers.property=my.bootstrap-servers` entry can be added into a `junit-platform.properties` file in the testing classpath.
Starting with version 3.0.10, the broker automatically sets this to `spring.kafka.bootstrap-servers`, by default, for testing with Spring Boot applications.

NOTE: It is recommended to not combine a global embedded Kafka and per-test class in a single test suite.
Both of them share the same system properties, so it is very likely going to lead to unexpected behavior.
Expand Down Expand Up @@ -422,7 +424,7 @@ The following example shows how to use an `@EmbeddedKafka` Annotation to create
----
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
bootstrapServersProperty = "spring.kafka.bootstrap-servers") // this is now the default
public class MyApplicationTests {

@Autowired
Expand All @@ -437,6 +439,7 @@ public class MyApplicationTests {
----
====

NOTE: The `bootstrapServersProperty` is automatically set to `spring.kafka.bootstrap-servers`, by default, starting with version 3.0.10.

==== Hamcrest Matchers

Expand Down
2 changes: 2 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,5 @@ Similarly, `RECEIVED_MESSAGE_KEY` is replaced by `RECEIVED_KEY` and `RECEIVED_PA

Version 3.0.7 introduced a `MockConsumerFactory` and `MockProducerFactory`.
See <<mock-cons-prod>> for more information.

Starting with version 3.0.10, the embedded Kafka broker, by default, sets the Spring Boot property `spring.kafka.bootstrap-servers` to the address(es) of the embedded broker(s).
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -181,7 +181,7 @@ public class EmbeddedKafkaBroker implements InitializingBean, DisposableBean {

private int zkSessionTimeout = DEFAULT_ZK_SESSION_TIMEOUT;

private String brokerListProperty;
private String brokerListProperty = "spring.kafka.bootstrap-servers";

private volatile ZooKeeperClient zooKeeperClient;

Expand Down Expand Up @@ -257,6 +257,8 @@ public EmbeddedKafkaBroker kafkaPorts(int... ports) {

/**
* Set the system property with this name to the list of broker addresses.
* Defaults to {@code spring.kafka.bootstrap-servers} for Spring Boot
* compatibility, since 3.0.10.
* @param brokerListProperty the brokerListProperty to set
* @return this broker.
* @since 2.3
Expand Down Expand Up @@ -374,10 +376,10 @@ public void afterPropertiesSet() {
if (this.brokerListProperty == null) {
this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
}
if (this.brokerListProperty == null) {
this.brokerListProperty = SPRING_EMBEDDED_KAFKA_BROKERS;
if (this.brokerListProperty != null) {
System.setProperty(this.brokerListProperty, getBrokersAsString());
}
System.setProperty(this.brokerListProperty, getBrokersAsString());
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
}

Expand Down Expand Up @@ -591,7 +593,8 @@ public <T> T doWithAdminFunction(Function<AdminClient, T> callback) {

@Override
public void destroy() {
System.getProperties().remove(brokerListProperty);
System.getProperties().remove(this.brokerListProperty);
System.getProperties().remove(SPRING_EMBEDDED_KAFKA_BROKERS);
System.getProperties().remove(SPRING_EMBEDDED_ZOOKEEPER_CONNECT);
for (KafkaServer kafkaServer : this.kafkaServers) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -151,13 +151,14 @@
String brokerPropertiesLocation() default "";

/**
* The property name to set with the bootstrap server addresses instead of the default
* The property name to set with the bootstrap server addresses as well as the default
* {@value org.springframework.kafka.test.EmbeddedKafkaBroker#SPRING_EMBEDDED_KAFKA_BROKERS}.
* Defaults to {@code spring.kafka.bootstrap-servers} since 3.0.10.
* @return the property name.
* @since 2.3
* @see org.springframework.kafka.test.EmbeddedKafkaBroker#brokerListProperty(String)
*/
String bootstrapServersProperty() default "";
String bootstrapServersProperty() default "spring.kafka.bootstrap-servers";

/**
* Timeout for internal ZK client connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,10 @@ public void testPlanExecutionStarted(TestPlan testPlan) {
this.embeddedKafkaBroker =
new EmbeddedKafkaBroker(count, false, partitions, topics)
.brokerProperties(brokerProperties)
.brokerListProperty(brokerListProperty)
.kafkaPorts(ports);
if (brokerListProperty != null) {
this.embeddedKafkaBroker.brokerListProperty(brokerListProperty);
}
this.embeddedKafkaBroker.afterPropertiesSet();

this.logger.info("Started global Embedded Kafka on: " + this.embeddedKafkaBroker.getBrokersAsString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,10 +30,16 @@ public class EmbeddedKafkaBrokerTests {
@Test
void testUpDown() {
EmbeddedKafkaBroker kafka = new EmbeddedKafkaBroker(1);
kafka.brokerListProperty("foo.bar");
kafka.afterPropertiesSet();
assertThat(kafka.getZookeeperConnectionString()).startsWith("127");
assertThat(System.getProperty("foo.bar")).isNotNull();
assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS))
.isEqualTo(System.getProperty("foo.bar"));
kafka.destroy();
assertThat(kafka.getZookeeperConnectionString()).isNull();
assertThat(System.getProperty("foo.bar")).isNull();
assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull();
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
spring.kafka.embedded.count=2
spring.embedded.kafka.brokers.property=spring.kafka.bootstrap-servers
spring.kafka.embedded.topics=topic1,topic2