diff --git a/pom.xml b/pom.xml
index 0e97162..3d77906 100644
--- a/pom.xml
+++ b/pom.xml
@@ -454,8 +454,8 @@
org.slf4j
org.apache.commons.logging
- 80
- 71
+ 81
+ 72
true
diff --git a/src/main/java/io/strimzi/test/container/KafkaVersionService.java b/src/main/java/io/strimzi/test/container/KafkaVersionService.java
index 1102ac0..fb9dd84 100644
--- a/src/main/java/io/strimzi/test/container/KafkaVersionService.java
+++ b/src/main/java/io/strimzi/test/container/KafkaVersionService.java
@@ -14,6 +14,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
@@ -23,6 +25,7 @@
class KafkaVersionService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaVersionService.class);
+ private static final Pattern KAFKA_VERSION_PATTERN = Pattern.compile(".*-kafka-(\\d+\\.\\d+\\.\\d+)$");
private static class InstanceHolder {
public static final KafkaVersionService INSTANCE = new KafkaVersionService();
@@ -72,7 +75,7 @@ protected static String strimziTestContainerImageName(String kafkaVersion) {
// using strimzi-test-container images
if (kafkaVersion == null || kafkaVersion.isEmpty()) {
imageName = KafkaVersionService.getInstance().latestRelease().getImage();
- kafkaVersion = KafkaVersionService.getInstance().latestRelease().getImage();
+ kafkaVersion = KafkaVersionService.getInstance().latestRelease().getVersion();
LOGGER.info("No Kafka version specified. Using latest release: {}", kafkaVersion);
} else {
for (KafkaVersion kv : KafkaVersionService.getInstance().logicalKafkaVersionEntities) {
@@ -135,6 +138,29 @@ public static int compareVersions(String version1, String version2) {
return components.length - otherComponents.length;
}
+ /**
+ * Extracts the Kafka version from a Docker image name.
+ *
+ *
Expects the image name to contain "-kafka-" followed by the version number at the end.
+ * For example:
+ *
+ * - "quay.io/strimzi-test-container/test-container:0.107.0-rc1-kafka-3.7.1" returns "3.7.1"
+ * - "quay.io/strimzi-test-container/test-container:0.105.0-kafka-3.6.0" returns "3.6.0"
+ *
+ *
+ * @param imageName the Docker image name (e.g., "quay.io/...:0.107.0-rc1-kafka-3.7.1")
+ * @return the extracted Kafka version (e.g., "3.7.1")
+ * @throws IllegalArgumentException if the version cannot be extracted
+ */
+ public static String extractVersionFromImageName(final String imageName) {
+ final Matcher matcher = KAFKA_VERSION_PATTERN.matcher(imageName);
+ if (matcher.find()) {
+ return matcher.group(1); // Returns the Kafka version string like "3.9.0"
+ } else {
+ throw new IllegalArgumentException("Cannot extract Kafka version from image name: " + imageName);
+ }
+ }
+
/**
* Get the Kafka version in the following format (i.e., "3.0.0")
*
diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java
index 9edeefb..d9e5111 100644
--- a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java
+++ b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java
@@ -159,6 +159,7 @@ protected void doStart() {
if (!this.imageNameProvider.isDone()) {
this.imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(this.kafkaVersion));
}
+
try {
if (this.useKraft && ((this.kafkaVersion != null && this.kafkaVersion.startsWith("2.")) || this.imageNameProvider.get().contains("2.8.2"))) {
throw new UnsupportedKraftKafkaVersionException("Specified Kafka version " + this.kafkaVersion + " is not supported in KRaft mode.");
@@ -359,10 +360,27 @@ protected String[] buildListenersConfig(final InspectContainerResponse container
if (this.useKraft) {
final String controllerListenerName = "CONTROLLER";
+ final int controllerPort = 9094;
// adding Controller listener for Kraft mode
- // (DNS alias for multi-node setup; that way we other nodes can connect and communicate between each other)
- // we can't use 0.0.0.0 because https://github.com/apache/kafka/commit/9be27e715a209a892941bf35e66859d9c39c28c4
- kafkaListeners.append(controllerListenerName).append("://" + NETWORK_ALIAS_PREFIX + this.brokerId + ":9094");
+ kafkaListeners.append(controllerListenerName).append("://0.0.0.0:").append(controllerPort);
+ try {
+ if ((this.kafkaVersion != null && KafkaVersionService.KafkaVersion.compareVersions(this.kafkaVersion, "3.9.0") >= 0) ||
+ KafkaVersionService.KafkaVersion.compareVersions(KafkaVersionService.KafkaVersion.extractVersionFromImageName(this.imageNameProvider.get()), "3.9.0") >= 0) {
+ // We add CONTROLLER listener to advertised.listeners only when Kafka version is >= `3.9.0`, older version failed with:
+ // Exception in thread "main" java.lang.IllegalArgumentException: requirement failed:
+ // The advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when
+ // process.roles contains the broker role because Kafka clients that send requests via advertised listeners do not
+ // send requests to KRaft controllers -- they only send requests to KRaft brokers.
+ advertisedListeners.append(",")
+ .append(controllerListenerName)
+ .append("://")
+ .append(getHost())
+ .append(":")
+ .append(controllerPort);
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
this.listenerNames.add(controllerListenerName);
}
diff --git a/src/test/java/io/strimzi/test/container/KafkaVersionServiceTest.java b/src/test/java/io/strimzi/test/container/KafkaVersionServiceTest.java
index 50dd9a2..b7bdbac 100644
--- a/src/test/java/io/strimzi/test/container/KafkaVersionServiceTest.java
+++ b/src/test/java/io/strimzi/test/container/KafkaVersionServiceTest.java
@@ -8,6 +8,7 @@
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
public class KafkaVersionServiceTest {
@@ -37,4 +38,45 @@ void testKafkaVersionAttributes() {
assertThat(KAFKA_VERSION_3_2_0.getVersion(), CoreMatchers.is("3.2.0"));
assertThat(KAFKA_VERSION_3_2_0.getImage(), CoreMatchers.is("custom-image2"));
}
+
+ @Test
+ void testExtractVersionFromImageNameRC() {
+ String imageName = "quay.io/strimzi-test-container/test-container:0.107.0-rc1-kafka-3.7.1";
+ String expectedVersion = "3.7.1";
+
+ String extractedVersion = KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName);
+
+ assertThat(extractedVersion, CoreMatchers.is(expectedVersion));
+ }
+
+ @Test
+ void testExtractVersionFromImageNameGA() {
+ String imageName = "quay.io/strimzi-test-container/test-container:0.105.0-kafka-3.6.0";
+ String expectedVersion = "3.6.0";
+
+ String extractedVersion = KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName);
+
+ assertThat(extractedVersion, CoreMatchers.is(expectedVersion));
+ }
+
+ @Test
+ void testExtractVersionFromImageNameInvalidFormatNoKafka() {
+ String imageName = "quay.io/strimzi-test-container/test-container:0.107.0-rc1-nokafka-3.7.1";
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName));
+ assertThat(exception.getMessage(), CoreMatchers.containsString("Cannot extract Kafka version from image name"));
+ }
+
+ @Test
+ void testExtractVersionFromImageNameInvalidFormatNoVersion() {
+ String imageName = "quay.io/strimzi-test-container/test-container:0.107.0-rc1-kafka-";
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName));
+ assertThat(exception.getMessage(), CoreMatchers.containsString("Cannot extract Kafka version from image name"));
+ }
+
+ @Test
+ void testExtractVersionFromImageNameAdditionalSuffix() {
+ String imageName = "quay.io/strimzi-test-container/test-container:0.107.0-rc1-kafka-3.7.1-suffix";
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName));
+ assertThat(exception.getMessage(), CoreMatchers.containsString("Cannot extract Kafka version from image name"));
+ }
}
diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerMockTest.java b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerMockTest.java
index cb28a26..1c92730 100644
--- a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerMockTest.java
+++ b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerMockTest.java
@@ -23,10 +23,46 @@
public class StrimziKafkaContainerMockTest {
+ private final static String KAFKA_3_9_0 = "3.9.0";
+
private StrimziKafkaContainer kafkaContainer;
@Test
- void testBuildListenersConfigSingleNetwork() {
+ void testBuildListenersConfigSingleNetworkWithKRaftAndSpecificVersion() {
+ // Mocking InspectContainerResponse
+ InspectContainerResponse containerInfo = Mockito.mock(InspectContainerResponse.class);
+ NetworkSettings networkSettings = Mockito.mock(NetworkSettings.class);
+ Mockito.when(containerInfo.getNetworkSettings()).thenReturn(networkSettings);
+
+ // Mocking network settings with a single network
+ Map networks = new HashMap<>();
+ ContainerNetwork containerNetwork = Mockito.mock(ContainerNetwork.class);
+ Mockito.when(containerNetwork.getIpAddress()).thenReturn("172.17.0.2");
+ networks.put("bridge", containerNetwork);
+ Mockito.when(networkSettings.getNetworks()).thenReturn(networks);
+
+ // Mocking getBootstrapServers
+ kafkaContainer = new StrimziKafkaContainer() {
+ @Override
+ public String getBootstrapServers() {
+ return "PLAINTEXT://localhost:9092";
+ }
+ };
+
+ String[] listenersConfig = kafkaContainer
+ .withKafkaVersion(KAFKA_3_9_0)
+ .withKraft()
+ .buildListenersConfig(containerInfo);
+
+ String expectedListeners = "PLAINTEXT://0.0.0.0:9092,BROKER1://0.0.0.0:9091,CONTROLLER://0.0.0.0:9094";
+ String expectedAdvertisedListeners = "PLAINTEXT://localhost:9092,BROKER1://172.17.0.2:9091,CONTROLLER://localhost:9094";
+
+ assertThat(listenersConfig[0], is(expectedListeners));
+ assertThat(listenersConfig[1], is(expectedAdvertisedListeners));
+ }
+
+ @Test
+ void testBuildListenersConfigSingleNetworkWithKafka390() {
// Mocking InspectContainerResponse
InspectContainerResponse containerInfo = Mockito.mock(InspectContainerResponse.class);
NetworkSettings networkSettings = Mockito.mock(NetworkSettings.class);
@@ -105,7 +141,7 @@ void testBuildListenersConfigWithKRaft() {
Mockito.when(networkSettings.getNetworks()).thenReturn(networks);
// Enabling KRaft mode
- kafkaContainer = new StrimziKafkaContainer() {
+ kafkaContainer = new StrimziKafkaContainer("quay.io/strimzi-test-container/test-container:0.109.0-kafka-3.9.0") {
@Override
public String getBootstrapServers() {
return "PLAINTEXT://localhost:9092";
@@ -115,8 +151,8 @@ public String getBootstrapServers() {
String[] listenersConfig = kafkaContainer.buildListenersConfig(containerInfo);
- String expectedListeners = "PLAINTEXT://0.0.0.0:9092,BROKER1://0.0.0.0:9091,CONTROLLER://broker-0:9094";
- String expectedAdvertisedListeners = "PLAINTEXT://localhost:9092,BROKER1://172.17.0.2:9091";
+ String expectedListeners = "PLAINTEXT://0.0.0.0:9092,BROKER1://0.0.0.0:9091,CONTROLLER://0.0.0.0:9094";
+ String expectedAdvertisedListeners = "PLAINTEXT://localhost:9092,BROKER1://172.17.0.2:9091,CONTROLLER://localhost:9094";
assertThat(listenersConfig[0], is(expectedListeners));
assertThat(listenersConfig[1], is(expectedAdvertisedListeners));