Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Controller listener to the advertised.listeners #111

Merged
merged 9 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,8 @@
<avoidCallsTo>org.slf4j</avoidCallsTo>
<avoidCallsTo>org.apache.commons.logging</avoidCallsTo>
</avoidCallsTo>
<mutationThreshold>80</mutationThreshold>
<coverageThreshold>71</coverageThreshold>
<mutationThreshold>81</mutationThreshold>
<coverageThreshold>72</coverageThreshold>
<verbose>true</verbose>
</configuration>
</plugin>
Expand Down
28 changes: 27 additions & 1 deletion src/main/java/io/strimzi/test/container/KafkaVersionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
Expand All @@ -23,6 +25,7 @@
class KafkaVersionService {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaVersionService.class);
private static final Pattern KAFKA_VERSION_PATTERN = Pattern.compile(".*-kafka-(\\d+\\.\\d+\\.\\d+)$");

private static class InstanceHolder {
public static final KafkaVersionService INSTANCE = new KafkaVersionService();
Expand Down Expand Up @@ -72,7 +75,7 @@ protected static String strimziTestContainerImageName(String kafkaVersion) {
// using strimzi-test-container images
if (kafkaVersion == null || kafkaVersion.isEmpty()) {
imageName = KafkaVersionService.getInstance().latestRelease().getImage();
kafkaVersion = KafkaVersionService.getInstance().latestRelease().getImage();
kafkaVersion = KafkaVersionService.getInstance().latestRelease().getVersion();
LOGGER.info("No Kafka version specified. Using latest release: {}", kafkaVersion);
} else {
for (KafkaVersion kv : KafkaVersionService.getInstance().logicalKafkaVersionEntities) {
Expand Down Expand Up @@ -135,6 +138,29 @@ public static int compareVersions(String version1, String version2) {
return components.length - otherComponents.length;
}

/**
* Extracts the Kafka version from a Docker image name.
*
* <p>Expects the image name to contain "-kafka-" followed by the version number at the end.
* For example:
* <ul>
* <li>"quay.io/strimzi-test-container/test-container:0.107.0-rc1-kafka-3.7.1" returns "3.7.1"</li>
* <li>"quay.io/strimzi-test-container/test-container:0.105.0-kafka-3.6.0" returns "3.6.0"</li>
* </ul>
*
* @param imageName the Docker image name (e.g., "quay.io/...:0.107.0-rc1-kafka-3.7.1")
* @return the extracted Kafka version (e.g., "3.7.1")
* @throws IllegalArgumentException if the version cannot be extracted
*/
public static String extractVersionFromImageName(final String imageName) {
final Matcher matcher = KAFKA_VERSION_PATTERN.matcher(imageName);
if (matcher.find()) {
return matcher.group(1); // Returns the Kafka version string like "3.9.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty unreliable. I think we need to have better way to configure KAfka version than through the container image. If you do not want to solve it in this PR it might be fine, but we should have issues to track this.

This is pretty likel to fail when using custom images when in various ofline / disconnected environments, when you want to fix to use some specific image through digest etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking about the same with custom images. I can create an issue when I solve that also for custom images or at least log a warning when using custom images.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should work like this:

  • The builders should have a withVersion field/method
  • If not set, the default version (the latest Kafka version) should be used
  • The image should be by the users always set to correspond to the version

I think that is the way it should work. (As I said, I'm happy to leave it for later and not block this PR.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I have come up with a more clear solution. Let me know what you think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that doesn't seem to work :/. I will have to think about that.....created issue [1] to track it. And I rollback that solution I had previously.

[1] - #112

} else {
throw new IllegalArgumentException("Cannot extract Kafka version from image name: " + imageName);
}
}

/**
* Get the Kafka version in the following format (i.e., "3.0.0")
*
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ protected void doStart() {
if (!this.imageNameProvider.isDone()) {
this.imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(this.kafkaVersion));
}

try {
if (this.useKraft && ((this.kafkaVersion != null && this.kafkaVersion.startsWith("2.")) || this.imageNameProvider.get().contains("2.8.2"))) {
throw new UnsupportedKraftKafkaVersionException("Specified Kafka version " + this.kafkaVersion + " is not supported in KRaft mode.");
Expand Down Expand Up @@ -359,10 +360,27 @@ protected String[] buildListenersConfig(final InspectContainerResponse container

if (this.useKraft) {
final String controllerListenerName = "CONTROLLER";
final int controllerPort = 9094;
// adding Controller listener for Kraft mode
// (DNS alias for multi-node setup; that way we other nodes can connect and communicate between each other)
// we can't use 0.0.0.0 because https://github.com/apache/kafka/commit/9be27e715a209a892941bf35e66859d9c39c28c4
kafkaListeners.append(controllerListenerName).append("://" + NETWORK_ALIAS_PREFIX + this.brokerId + ":9094");
kafkaListeners.append(controllerListenerName).append("://0.0.0.0:").append(controllerPort);
try {
if ((this.kafkaVersion != null && KafkaVersionService.KafkaVersion.compareVersions(this.kafkaVersion, "3.9.0") >= 0) ||
KafkaVersionService.KafkaVersion.compareVersions(KafkaVersionService.KafkaVersion.extractVersionFromImageName(this.imageNameProvider.get()), "3.9.0") >= 0) {
// We add CONTROLLER listener to advertised.listeners only when Kafka version is >= `3.9.0`, older version failed with:
// Exception in thread "main" java.lang.IllegalArgumentException: requirement failed:
// The advertised.listeners config must not contain KRaft controller listeners from controller.listener.names when
// process.roles contains the broker role because Kafka clients that send requests via advertised listeners do not
// send requests to KRaft controllers -- they only send requests to KRaft brokers.
advertisedListeners.append(",")
.append(controllerListenerName)
.append("://")
.append(getHost())
.append(":")
.append(controllerPort);
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
this.listenerNames.add(controllerListenerName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class KafkaVersionServiceTest {

Expand Down Expand Up @@ -37,4 +38,45 @@ void testKafkaVersionAttributes() {
assertThat(KAFKA_VERSION_3_2_0.getVersion(), CoreMatchers.is("3.2.0"));
assertThat(KAFKA_VERSION_3_2_0.getImage(), CoreMatchers.is("custom-image2"));
}

@Test
void testExtractVersionFromImageNameRC() {
String imageName = "quay.io/strimzi-test-container/test-container:0.107.0-rc1-kafka-3.7.1";
String expectedVersion = "3.7.1";

String extractedVersion = KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName);

assertThat(extractedVersion, CoreMatchers.is(expectedVersion));
}

@Test
void testExtractVersionFromImageNameGA() {
String imageName = "quay.io/strimzi-test-container/test-container:0.105.0-kafka-3.6.0";
String expectedVersion = "3.6.0";

String extractedVersion = KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName);

assertThat(extractedVersion, CoreMatchers.is(expectedVersion));
}

@Test
void testExtractVersionFromImageNameInvalidFormatNoKafka() {
String imageName = "quay.io/strimzi-test-container/test-container:0.107.0-rc1-nokafka-3.7.1";
Exception exception = assertThrows(IllegalArgumentException.class, () -> KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName));
assertThat(exception.getMessage(), CoreMatchers.containsString("Cannot extract Kafka version from image name"));
}

@Test
void testExtractVersionFromImageNameInvalidFormatNoVersion() {
String imageName = "quay.io/strimzi-test-container/test-container:0.107.0-rc1-kafka-";
Exception exception = assertThrows(IllegalArgumentException.class, () -> KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName));
assertThat(exception.getMessage(), CoreMatchers.containsString("Cannot extract Kafka version from image name"));
}

@Test
void testExtractVersionFromImageNameAdditionalSuffix() {
String imageName = "quay.io/strimzi-test-container/test-container:0.107.0-rc1-kafka-3.7.1-suffix";
Exception exception = assertThrows(IllegalArgumentException.class, () -> KafkaVersionService.KafkaVersion.extractVersionFromImageName(imageName));
assertThat(exception.getMessage(), CoreMatchers.containsString("Cannot extract Kafka version from image name"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,46 @@

public class StrimziKafkaContainerMockTest {

private final static String KAFKA_3_9_0 = "3.9.0";

private StrimziKafkaContainer kafkaContainer;

@Test
void testBuildListenersConfigSingleNetwork() {
void testBuildListenersConfigSingleNetworkWithKRaftAndSpecificVersion() {
// Mocking InspectContainerResponse
InspectContainerResponse containerInfo = Mockito.mock(InspectContainerResponse.class);
NetworkSettings networkSettings = Mockito.mock(NetworkSettings.class);
Mockito.when(containerInfo.getNetworkSettings()).thenReturn(networkSettings);

// Mocking network settings with a single network
Map<String, ContainerNetwork> networks = new HashMap<>();
ContainerNetwork containerNetwork = Mockito.mock(ContainerNetwork.class);
Mockito.when(containerNetwork.getIpAddress()).thenReturn("172.17.0.2");
networks.put("bridge", containerNetwork);
Mockito.when(networkSettings.getNetworks()).thenReturn(networks);

// Mocking getBootstrapServers
kafkaContainer = new StrimziKafkaContainer() {
@Override
public String getBootstrapServers() {
return "PLAINTEXT://localhost:9092";
}
};

String[] listenersConfig = kafkaContainer
.withKafkaVersion(KAFKA_3_9_0)
.withKraft()
.buildListenersConfig(containerInfo);

String expectedListeners = "PLAINTEXT://0.0.0.0:9092,BROKER1://0.0.0.0:9091,CONTROLLER://0.0.0.0:9094";
String expectedAdvertisedListeners = "PLAINTEXT://localhost:9092,BROKER1://172.17.0.2:9091,CONTROLLER://localhost:9094";

assertThat(listenersConfig[0], is(expectedListeners));
assertThat(listenersConfig[1], is(expectedAdvertisedListeners));
}

@Test
void testBuildListenersConfigSingleNetworkWithKafka390() {
// Mocking InspectContainerResponse
InspectContainerResponse containerInfo = Mockito.mock(InspectContainerResponse.class);
NetworkSettings networkSettings = Mockito.mock(NetworkSettings.class);
Expand Down Expand Up @@ -105,7 +141,7 @@ void testBuildListenersConfigWithKRaft() {
Mockito.when(networkSettings.getNetworks()).thenReturn(networks);

// Enabling KRaft mode
kafkaContainer = new StrimziKafkaContainer() {
kafkaContainer = new StrimziKafkaContainer("quay.io/strimzi-test-container/test-container:0.109.0-kafka-3.9.0") {
@Override
public String getBootstrapServers() {
return "PLAINTEXT://localhost:9092";
Expand All @@ -115,8 +151,8 @@ public String getBootstrapServers() {

String[] listenersConfig = kafkaContainer.buildListenersConfig(containerInfo);

String expectedListeners = "PLAINTEXT://0.0.0.0:9092,BROKER1://0.0.0.0:9091,CONTROLLER://broker-0:9094";
String expectedAdvertisedListeners = "PLAINTEXT://localhost:9092,BROKER1://172.17.0.2:9091";
String expectedListeners = "PLAINTEXT://0.0.0.0:9092,BROKER1://0.0.0.0:9091,CONTROLLER://0.0.0.0:9094";
String expectedAdvertisedListeners = "PLAINTEXT://localhost:9092,BROKER1://172.17.0.2:9091,CONTROLLER://localhost:9094";

assertThat(listenersConfig[0], is(expectedListeners));
assertThat(listenersConfig[1], is(expectedAdvertisedListeners));
Expand Down
Loading