diff --git a/.azure/build-pipeline.yaml b/.azure/build-pipeline.yaml index 6d641e3..2f0dc65 100644 --- a/.azure/build-pipeline.yaml +++ b/.azure/build-pipeline.yaml @@ -46,6 +46,13 @@ jobs: parameters: JDK_VERSION: $(JDK_VERSION) + # Run mutation testing before ITs tests + - bash: mvn clean test-compile org.pitest:pitest-maven:mutationCoverage + displayName: 'Run Mutation Testing' + env: + # Test container optimization + TESTCONTAINERS_RYUK_DISABLED: TRUE + TESTCONTAINERS_CHECKS_DISABLE: TRUE - bash: mvn -Dfailsafe.rerunFailingTestsCount=3 clean verify env: # Test container optimization diff --git a/README.md b/README.md index de67b60..899f134 100644 --- a/README.md +++ b/README.md @@ -235,6 +235,26 @@ StrimziKafkaCluster kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBu kafkaCluster.start(); ``` +### Running Mutation Testing + +To run mutation testing and assess your code’s robustness against small changes, use the following Maven command: +```bash +mvn clean test-compile org.pitest:pitest-maven:mutationCoverage +``` +This command will execute mutation tests based on the pitest-maven plugin and display results directly in the console. + +#### Viewing Mutation Testing Results + +After running the command, results will be available in an HTML report located in target/pit-reports/. +Open the index.html file in a browser to view detailed information about the mutation coverage. + +#### Using `@DoNotMutate` Annotation + +For parts of the code primarily covered by integration tests, we can use the `@DoNotMutate` annotation. +Applying this annotation to code ensures that mutation testing will ignore it. +This is particularly useful for code components that are challenging to test at a unit level but well-covered in integration tests. +Using `@DoNotMutate` helps keep mutation coverage metrics meaningful by excluding areas where mutation detection would not add value. + ### Additional tips 1. In case you are using `Azure pipelines` Ryuk needs to be turned off, since Azure does not allow starting privileged containers. diff --git a/pom.xml b/pom.xml index bffa7d2..282222f 100644 --- a/pom.xml +++ b/pom.xml @@ -105,6 +105,7 @@ 2.18.0 2.18.0 2.1.7 + 1.3.3 5.11.0 @@ -124,6 +125,8 @@ 3.5.0 1.6 1.7.0 + 1.2.1 + 1.17.0 1.26.1 @@ -190,6 +193,12 @@ toxiproxy-java ${toxiproxy.java.version} + + + com.arcmutate + pitest-annotations + ${pitest-annotations.version} + @@ -241,6 +250,13 @@ ${mockito.version} test + + + org.junit.jupiter + junit-jupiter-engine + ${jupiter.version} + test + @@ -398,6 +414,39 @@ + + org.pitest + pitest-maven + ${pit-plugin.version} + + + org.pitest + pitest-junit5-plugin + ${pit-junit-plugin.version} + + + + + + io.strimzi.test.container.Strimzi* + io.strimzi.test.container.KafkaContainer + + + + io.strimzi.test.container.Strimzi*Test + + + java.util.logging + org.apache.log4j + org.slf4j + org.apache.commons.logging + + 80 + 71 + true + + diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java index 479ad38..c6a6aa5 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java @@ -4,6 +4,7 @@ */ package io.strimzi.test.container; +import com.groupcdg.pitest.annotations.DoNotMutate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Container; @@ -263,12 +264,14 @@ public Collection getBrokers() { } @Override + @DoNotMutate public boolean hasKraftOrExternalZooKeeperConfigured() { KafkaContainer broker0 = brokers.iterator().next(); return broker0.hasKraftOrExternalZooKeeperConfigured() ? true : false; } @Override + @DoNotMutate public String getInternalZooKeeperConnect() { if (hasKraftOrExternalZooKeeperConfigured()) { throw new IllegalStateException("Connect string is not available when using KRaft or external ZooKeeper"); @@ -322,15 +325,25 @@ private void configureQuorumVoters(final Map additionalKafkaConf additionalKafkaConfiguration.put("controller.quorum.voters", quorumVoters); } - @SuppressWarnings({"CyclomaticComplexity"}) + @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) @Override + @DoNotMutate public void start() { Stream startables = this.brokers.stream(); try { Startables.deepStart(startables).get(60, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException("Failed to start Kafka containers", e); + throw new RuntimeException("Interrupted while starting Kafka containers", e); + } catch (ExecutionException | UnsupportedKraftKafkaVersionException e) { + Throwable cause = e.getCause(); + if (cause instanceof UnsupportedKraftKafkaVersionException) { + throw (UnsupportedKraftKafkaVersionException) cause; + } else { + throw new RuntimeException("Failed to start Kafka containers", e); + } + } catch (TimeoutException e) { + throw new RuntimeException("Timed out while starting Kafka containers", e); } if (this.isZooKeeperBasedKafkaCluster()) { @@ -400,6 +413,7 @@ public void start() { } @Override + @DoNotMutate public void stop() { if (this.isZooKeeperBasedKafkaCluster()) { // firstly we shut-down zookeeper -> reason: 'On the command line if I kill ZK first it sometimes prevents a broker from shutting down quickly.' @@ -419,4 +433,8 @@ public void stop() { public StrimziZookeeperContainer getZookeeper() { return zookeeper; } + + protected Network getNetwork() { + return network; + } } diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java index 2652a33..66510c0 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java @@ -6,6 +6,7 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import com.github.dockerjava.api.model.ContainerNetwork; +import com.groupcdg.pitest.annotations.DoNotMutate; import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import org.apache.logging.log4j.Level; @@ -144,6 +145,7 @@ private StrimziKafkaContainer(CompletableFuture imageName) { @Override @SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"}) + @DoNotMutate protected void doStart() { if (this.proxyContainer != null && !this.proxyContainer.isRunning()) { this.proxyContainer.start(); @@ -188,6 +190,7 @@ protected void doStart() { } @Override + @DoNotMutate public void stop() { if (proxyContainer != null && proxyContainer.isRunning()) { proxyContainer.stop(); @@ -213,6 +216,7 @@ protected String runStarterScript() { * * @return StrimziKafkaContainer instance */ + @DoNotMutate public StrimziKafkaContainer waitForRunning() { if (this.useKraft) { super.waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1)); @@ -223,6 +227,7 @@ public StrimziKafkaContainer waitForRunning() { } @Override + @DoNotMutate protected void containerIsStarting(final InspectContainerResponse containerInfo, final boolean reused) { super.containerIsStarting(containerInfo, reused); @@ -286,6 +291,7 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo, } @Override + @DoNotMutate public boolean hasKraftOrExternalZooKeeperConfigured() { return this.useKraft || this.externalZookeeperConnect != null; } @@ -371,6 +377,7 @@ protected String[] buildListenersConfig(final InspectContainerResponse container * In order to avoid any compile dependency on kafka-clients' Uuid specific class, * we implement our own uuid generator by replicating the Kafka's base64 encoded uuid generation logic. */ + @DoNotMutate private String randomUuid() { final UUID metadataTopicIdInternal = new UUID(0L, 1L); final UUID zeroIdImpactInternal = new UUID(0L, 0L); @@ -570,6 +577,7 @@ public String getInternalZooKeeperConnect() { * @return the bootstrap servers URL */ @Override + @DoNotMutate public String getBootstrapServers() { if (proxyContainer != null) { // returning the proxy host and port for indirect connection diff --git a/src/main/java/io/strimzi/test/container/StrimziZookeeperContainer.java b/src/main/java/io/strimzi/test/container/StrimziZookeeperContainer.java index ce5a6e3..b120978 100644 --- a/src/main/java/io/strimzi/test/container/StrimziZookeeperContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziZookeeperContainer.java @@ -5,6 +5,7 @@ package io.strimzi.test.container; import com.github.dockerjava.api.command.InspectContainerResponse; +import com.groupcdg.pitest.annotations.DoNotMutate; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,7 @@ public StrimziZookeeperContainer(String dockerImageName) { /** * Image name is lazily set in {@link #doStart()} method */ + @DoNotMutate private StrimziZookeeperContainer(CompletableFuture imageName) { super(imageName); this.imageNameProvider = imageName; @@ -76,6 +78,7 @@ private StrimziZookeeperContainer(CompletableFuture imageName) { } @Override + @DoNotMutate protected void doStart() { if (!imageNameProvider.isDone()) { imageNameProvider.complete(KafkaVersionService.strimziTestContainerImageName(kafkaVersion)); @@ -86,6 +89,7 @@ protected void doStart() { } @Override + @DoNotMutate protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { super.containerIsStarting(containerInfo, reused); @@ -111,6 +115,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole * @param zooKeeperPropertiesFile the mountable config file * @return StrimziZookeeperContainer instance */ + @DoNotMutate public StrimziZookeeperContainer withZooKeeperPropertiesFile(final MountableFile zooKeeperPropertiesFile) { Utils.asTransferableBytes(zooKeeperPropertiesFile) .ifPresent(properties -> withCopyToContainer(properties, "/opt/kafka/config/zookeeper.properties")); @@ -123,6 +128,7 @@ public StrimziZookeeperContainer withZooKeeperPropertiesFile(final MountableFile * @param kafkaVersion kafka version * @return StrimziKafkaContainer instance */ + @DoNotMutate public StrimziZookeeperContainer withKafkaVersion(final String kafkaVersion) { this.kafkaVersion = kafkaVersion; return this; @@ -133,6 +139,7 @@ public StrimziZookeeperContainer withKafkaVersion(final String kafkaVersion) { * * @return zookeeper connect string `host:port` */ + @DoNotMutate public String getConnectString() { return this.getHost() + ":" + this.getMappedPort(ZOOKEEPER_PORT); } diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterTest.java b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterTest.java index e4fb2e6..20f5dea 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaClusterTest.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaClusterTest.java @@ -5,13 +5,16 @@ package io.strimzi.test.container; import org.hamcrest.CoreMatchers; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; import org.testcontainers.containers.ToxiproxyContainer; import java.util.HashMap; import java.util.Map; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.emptyString; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -158,4 +161,212 @@ void testKafkaClusterWithKafkaVersionAndAdditionalConfigs() { assertThat(((StrimziKafkaContainer) cluster.getBrokers().iterator().next()).getKafkaVersion(), CoreMatchers.is("3.7.1")); assertThat(cluster.getAdditionalKafkaConfiguration().get("log.retention.bytes"), CoreMatchers.is("10485760")); } + + @Test + void testNetworkAssignmentBasedOnSharedNetworkFlag() { + // Cluster with shared network enabled + StrimziKafkaCluster sharedNetworkCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .withSharedNetwork() + .build(); + + assertThat(sharedNetworkCluster.isSharedNetworkEnabled(), CoreMatchers.is(true)); + assertThat(sharedNetworkCluster.getNetwork(), CoreMatchers.is(Network.SHARED)); + + // Cluster with shared network disabled + StrimziKafkaCluster isolatedNetworkCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build(); + + assertThat(isolatedNetworkCluster.isSharedNetworkEnabled(), CoreMatchers.is(false)); + assertThat(isolatedNetworkCluster.getNetwork(), CoreMatchers.is(CoreMatchers.not(Network.SHARED))); + } + + @Test + void testBrokerNumValidation() { + // Test with brokersNum = 0 + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(0) + .build() + ); + assertThat(exception.getMessage(), CoreMatchers.containsString("brokersNum '0' must be greater than 0")); + + // Test with brokersNum = -1 + exception = assertThrows(IllegalArgumentException.class, () -> + new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(-1) + .build() + ); + assertThat(exception.getMessage(), CoreMatchers.containsString("brokersNum '-1' must be greater than 0")); + } + + @Test + void testZooKeeperBasedKafkaClusterSetup() { + // ZooKeeper-based cluster + StrimziKafkaCluster zookeeperCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .build(); + + assertThat(zookeeperCluster.isZooKeeperBasedKafkaCluster(), CoreMatchers.is(true)); + assertThat(zookeeperCluster.getZookeeper(), CoreMatchers.is(CoreMatchers.notNullValue())); + + // KRaft-based cluster + StrimziKafkaCluster kraftCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .withKraft() + .build(); + + assertThat(kraftCluster.isZooKeeperBasedKafkaCluster(), CoreMatchers.is(false)); + assertThat(kraftCluster.getZookeeper(), CoreMatchers.is(CoreMatchers.nullValue())); + } + + + @Test + void testAdditionalKafkaConfigurationHandling() { + // Null additional config + StrimziKafkaCluster clusterWithNullConfig = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .withInternalTopicReplicationFactor(3) + .withAdditionalKafkaConfiguration(null) + .build(); + + assertThat(clusterWithNullConfig.getAdditionalKafkaConfiguration().isEmpty(), CoreMatchers.is(true)); // should be empty + + // Non-null additional config + Map additionalConfig = new HashMap<>(); + additionalConfig.put("log.retention.ms", "60000"); + + StrimziKafkaCluster clusterWithConfig = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .withInternalTopicReplicationFactor(3) + .withAdditionalKafkaConfiguration(additionalConfig) + .build(); + + assertThat(clusterWithConfig.getAdditionalKafkaConfiguration(), Matchers.hasEntry("log.retention.ms", "60000")); + } + + @Test + void testClusterModeFlags() { + // ZooKeeper-based cluster + StrimziKafkaCluster zookeeperCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build(); + + assertThat(zookeeperCluster.isZooKeeperBasedKafkaCluster(), CoreMatchers.is(true)); + assertThat(zookeeperCluster.isKraftKafkaCluster(), CoreMatchers.is(false)); + + // KRaft-based cluster + StrimziKafkaCluster kraftCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .withKraft() + .build(); + + assertThat(kraftCluster.isZooKeeperBasedKafkaCluster(), CoreMatchers.is(false)); + assertThat(kraftCluster.isKraftKafkaCluster(), CoreMatchers.is(true)); + } + + @Test + void testValidateBrokerNumBoundary() { + // Test with brokersNum = 0 (should fail) + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(0) + .build() + ); + assertThat(exception.getMessage(), CoreMatchers.containsString("brokersNum '0' must be greater than 0")); + + // Test with brokersNum = 1 (should pass) + assertDoesNotThrow(() -> + new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build() + ); + } + + @Test + void testConfigureQuorumVotersIsCalledInKRaftMode() { + Map additionalConfig = new HashMap<>(); + additionalConfig.put("some.config", "someValue"); + + StrimziKafkaCluster cluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .withKraft() + .withAdditionalKafkaConfiguration(additionalConfig) + .build(); + + Map expectedConfig = new HashMap<>(additionalConfig); + expectedConfig.put("controller.quorum.voters", "0@broker-0:9094,1@broker-1:9094,2@broker-2:9094"); + + assertThat(cluster.getAdditionalKafkaConfiguration(), CoreMatchers.is(expectedConfig)); + } + + @Test + void testQuorumVotersConfigurationIsNotEmpty() { + StrimziKafkaCluster cluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .withKraft() + .build(); + + String quorumVoters = cluster.getAdditionalKafkaConfiguration().get("controller.quorum.voters"); + assertThat(quorumVoters, CoreMatchers.is(CoreMatchers.notNullValue())); + assertThat(quorumVoters, CoreMatchers.is(CoreMatchers.not(CoreMatchers.nullValue()))); + assertThat(quorumVoters, CoreMatchers.is("0@broker-0:9094,1@broker-1:9094,2@broker-2:9094")); + } + + @Test + void testAdditionalKafkaConfigurationsAreApplied() { + Map additionalConfig = new HashMap<>(); + additionalConfig.put("log.retention.ms", "60000"); + additionalConfig.put("auto.create.topics.enable", "false"); + + StrimziKafkaCluster cluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .withAdditionalKafkaConfiguration(additionalConfig) + .build(); + + assertThat(cluster.getAdditionalKafkaConfiguration(), CoreMatchers.is(additionalConfig)); + } + + @Test + void testValidateInternalTopicReplicationFactorBoundaries() { + // Test with internalTopicReplicationFactor = 0 (should fail) + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .withInternalTopicReplicationFactor(-1) + .build() + ); + assertThat(exception.getMessage(), CoreMatchers.containsString("must be less than brokersNum and greater than 0")); + + // Test with internalTopicReplicationFactor = 3 (equal to brokersNum, should pass) + assertDoesNotThrow(() -> + new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .withInternalTopicReplicationFactor(3) + .build() + ); + + // Test with internalTopicReplicationFactor = 4 (greater than brokersNum, should fail) + exception = assertThrows(IllegalArgumentException.class, () -> + new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .withInternalTopicReplicationFactor(4) + .build() + ); + assertThat(exception.getMessage(), CoreMatchers.containsString("must be less than brokersNum and greater than 0")); + } + + @Test + void testGetBootstrapServers() { + StrimziKafkaCluster cluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(3) + .build(); + + String bootstrapServers = cluster.getBootstrapServers(); + assertThat(bootstrapServers, CoreMatchers.is(CoreMatchers.notNullValue())); + assertThat(bootstrapServers, CoreMatchers.is(CoreMatchers.not(emptyString()))); + String[] servers = bootstrapServers.split(","); + assertThat(servers.length, CoreMatchers.is(3)); + } } diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java index 6705691..d44acba 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerIT.java @@ -100,8 +100,6 @@ void testStartContainerWithSomeConfiguration(final String imageName) { assertThat(logsFromKafka, CoreMatchers.containsString("log.cleaner.backoff.ms = 1000")); assertThat(logsFromKafka, CoreMatchers.containsString("ssl.enabled.protocols = [TLSv1]")); assertThat(logsFromKafka, CoreMatchers.containsString("log.index.interval.bytes = 2048")); - - systemUnderTest.stop(); } @ParameterizedTest(name = "testStartContainerWithFixedExposedPort-{0}") @@ -114,8 +112,6 @@ void testStartContainerWithFixedExposedPort(final String imageName) { systemUnderTest.start(); assertThat(systemUnderTest.getMappedPort(9092), equalTo(9092)); - - systemUnderTest.stop(); } @ParameterizedTest(name = "testStartContainerWithSSLBootstrapServers-{0}") @@ -128,8 +124,6 @@ void testStartContainerWithSSLBootstrapServers(final String imageName) { assertThat(systemUnderTest.getBootstrapServers(), is("SSL://" + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092))); - - systemUnderTest.stop(); } @ParameterizedTest(name = "testStartContainerWithServerProperties-{0}") @@ -147,8 +141,6 @@ void testStartContainerWithServerProperties(final String imageName) { assertThat(systemUnderTest.getBootstrapServers(), is("PLAINTEXT://" + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092))); - - systemUnderTest.stop(); } @Test @@ -166,7 +158,6 @@ void testStartContainerWithStrimziKafkaImage() { + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092))); assertThat(systemUnderTest.getDockerImageName(), is(imageName)); - systemUnderTest.stop(); // empty System.setProperty("strimzi.test-container.kafka.custom.image", ""); @@ -184,7 +175,6 @@ void testStartContainerWithCustomImage() { + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092))); assertThat(systemUnderTest.getDockerImageName(), is(imageName)); - systemUnderTest.stop(); } @Test @@ -201,71 +191,58 @@ void testStartContainerWithCustomNetwork() { + systemUnderTest.getContainerIpAddress() + ":" + systemUnderTest.getMappedPort(9092))); assertThat(systemUnderTest.getNetwork().getId(), is(network.getId())); - systemUnderTest.stop(); } @Test void testUnsupportedKafkaVersion() { - try { - systemUnderTest = new StrimziKafkaContainer() - .withKafkaVersion("2.4.0") - .waitForRunning(); + systemUnderTest = new StrimziKafkaContainer() + .withKafkaVersion("2.4.0") + .waitForRunning(); - assertThrows(UnknownKafkaVersionException.class, () -> systemUnderTest.start()); - } finally { - systemUnderTest.stop(); - } + assertThrows(UnknownKafkaVersionException.class, () -> systemUnderTest.start()); } @ParameterizedTest(name = "testKafkaContainerConnectFromOutsideToInternalZooKeeper-{0}") @MethodSource("retrieveKafkaVersionsFile") void testKafkaContainerConnectFromOutsideToInternalZooKeeper() { - try { - systemUnderTest = new StrimziKafkaContainer() - .waitForRunning(); - systemUnderTest.start(); + systemUnderTest = new StrimziKafkaContainer() + .waitForRunning(); + systemUnderTest.start(); - // Creates a socket address from a hostname and a port number - final String[] hostNameWithPort = systemUnderTest.getInternalZooKeeperConnect().split(":"); - - SocketAddress socketAddress = new InetSocketAddress(hostNameWithPort[0], Integer.parseInt(hostNameWithPort[1])); - - try (Socket socket = new Socket()) { - LOGGER.info("Hostname: {}, and port: {}", hostNameWithPort[0], hostNameWithPort[1]); - socket.connect(socketAddress, 5000); - } catch (SocketTimeoutException exception) { - LOGGER.error("SocketTimeoutException " + hostNameWithPort[0] + ":" + hostNameWithPort[1] + ". " + exception.getMessage()); - fail(); - } catch (IOException exception) { - LOGGER.error( - "IOException - Unable to connect to " + hostNameWithPort[0] + ":" + hostNameWithPort[1] + ". " + exception.getMessage()); - fail(); - } - } finally { - systemUnderTest.stop(); + // Creates a socket address from a hostname and a port number + final String[] hostNameWithPort = systemUnderTest.getInternalZooKeeperConnect().split(":"); + + SocketAddress socketAddress = new InetSocketAddress(hostNameWithPort[0], Integer.parseInt(hostNameWithPort[1])); + + try (Socket socket = new Socket()) { + LOGGER.info("Hostname: {}, and port: {}", hostNameWithPort[0], hostNameWithPort[1]); + socket.connect(socketAddress, 5000); + } catch (SocketTimeoutException exception) { + LOGGER.error("SocketTimeoutException " + hostNameWithPort[0] + ":" + hostNameWithPort[1] + ". " + exception.getMessage()); + fail(); + } catch (IOException exception) { + LOGGER.error( + "IOException - Unable to connect to " + hostNameWithPort[0] + ":" + hostNameWithPort[1] + ". " + exception.getMessage()); + fail(); } } @ParameterizedTest(name = "testKafkaContainerInternalCommunicationWithInternalZooKeeper-{0}") @MethodSource("retrieveKafkaVersionsFile") void testKafkaContainerInternalCommunicationWithInternalZooKeeper() throws IOException, InterruptedException { - try { - systemUnderTest = new StrimziKafkaContainer() - .waitForRunning(); - systemUnderTest.start(); + systemUnderTest = new StrimziKafkaContainer() + .waitForRunning(); + systemUnderTest.start(); - final Container.ExecResult result = this.systemUnderTest.execInContainer( - "sh", "-c", - "bin/zookeeper-shell.sh localhost:" + StrimziZookeeperContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1" - ); + final Container.ExecResult result = this.systemUnderTest.execInContainer( + "sh", "-c", + "bin/zookeeper-shell.sh localhost:" + StrimziZookeeperContainer.ZOOKEEPER_PORT + " ls /brokers/ids | tail -n 1" + ); - final String brokers = result.getStdout(); + final String brokers = result.getStdout(); - assertThat(result.getExitCode(), is(0)); // 0 -> success - assertThat(brokers, CoreMatchers.containsString("[0]")); - } finally { - systemUnderTest.stop(); - } + assertThat(result.getExitCode(), is(0)); // 0 -> success + assertThat(brokers, CoreMatchers.containsString("[0]")); } @ParameterizedTest(name = "testIllegalStateUsingInternalZooKeeperWithKraft-{0}") @@ -303,8 +280,6 @@ void testStartBrokerWithProxyContainer(final String imageName) { assertThat(systemUnderTest.getBootstrapServers(), is(String.format("PLAINTEXT://%s", systemUnderTest.getProxy().getListen()))); - - systemUnderTest.stop(); } @ParameterizedTest(name = "testGetProxyWithNoContainer-{0}") diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerTest.java b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerTest.java index 622088d..7ef620e 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerTest.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerTest.java @@ -42,14 +42,6 @@ void testDefaultInitialization() { assertThat(kafkaContainer.getExposedPorts(), hasItem(StrimziKafkaContainer.KAFKA_PORT)); } - @Test - void testInitializationWithImageName() { - String imageName = "strimzi/kafka:latest"; - kafkaContainer = new StrimziKafkaContainer(imageName); - assertThat(kafkaContainer, is(notNullValue())); - assertThat(kafkaContainer.getDockerImageName(), containsString(imageName)); - } - @Test void testOAuthConfiguration() { kafkaContainer = new StrimziKafkaContainer() @@ -139,14 +131,6 @@ void testExtractListenerNameThrowsExceptionOnInvalidBootstrap() { assertThat(exception.getMessage(), containsString("must be prefixed with a listener name")); } - @Test - void testWithProxyContainerSetsNetworkAndAlias() { - ToxiproxyContainer proxy = new ToxiproxyContainer(); - kafkaContainer.withProxyContainer(proxy); - assertThat(proxy.getNetwork().getId(), notNullValue()); - assertThat(proxy.getNetworkAliases(), hasItem("toxiproxy")); - } - @Test void testRunStarterScriptReturnsCorrectScript() { String script = kafkaContainer.runStarterScript(); @@ -205,19 +189,6 @@ void testWithBrokerIdThrowsExceptionWhenKraftAndIdsMismatch() { assertThat(exception.getMessage(), containsString("`broker.id` and `node.id` must have same value!")); } - @Test - void testDoStartThrowsExceptionForUnsupportedKraftVersion() { - kafkaContainer.withKafkaVersion("2.8.2"); - kafkaContainer.withKraft(); - - UnsupportedKraftKafkaVersionException exception = assertThrows( - UnsupportedKraftKafkaVersionException.class, - kafkaContainer::doStart, - "Expected doStart() to throw an exception for unsupported Kafka version in KRaft mode." - ); - assertThat(exception.getMessage(), containsString("is not supported in KRaft mode")); - } - @Test void testWithNodeIdReturnsSelf() { StrimziKafkaContainer result = kafkaContainer.withNodeId(1); @@ -388,16 +359,7 @@ void testConfigureOAuthBearer() { assertThat(properties.getProperty("principal.builder.class"), is("io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder")); String expectedJaasConfig = String.format( - "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + - "oauth.client.id=\"%s\" " + - "oauth.client.secret=\"%s\" " + - "oauth.token.endpoint.uri=\"%s\" " + - "oauth.username.claim=\"%s\";", - kafkaContainer.getClientId(), - kafkaContainer.getClientSecret(), - kafkaContainer.getOauthUri() + "/realms/" + kafkaContainer.getRealm() + "/protocol/openid-connect/token", - kafkaContainer.getUsernameClaim() - ); + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;"); String listenerNameLowerCase = "plaintext"; @@ -549,6 +511,114 @@ void testBuildDefaultServerPropertiesWithAuthenticationTypeButOAuthNotEnabled() assertThat(exception.getMessage(), containsString("OAuth2 is not enabled")); } + @Test + void testBuildDefaultServerPropertiesWithOAutPlain() { + String listeners = "SASL_PLAINTEXT://0.0.0.0:9092"; + String advertisedListeners = "SASL_PLAINTEXT://localhost:9092"; + kafkaContainer.listenerNames.add("SASL_PLAINTEXT"); + kafkaContainer + .withBrokerId(1) + .withNodeId(1) + .withSaslUsername("admin") + .withSaslPassword("password") + .withKraft() + .withOAuthConfig("test-realm", "test-client", "test-secret", "http://oauth-uri", "preferred_username") + .withAuthenticationType(AuthenticationType.OAUTH_OVER_PLAIN); + + Properties properties = kafkaContainer.buildDefaultServerProperties(listeners, advertisedListeners); + + assertThat(properties, notNullValue()); + assertThat(properties.getProperty("listeners"), is(listeners)); + assertThat(properties.getProperty("advertised.listeners"), is(advertisedListeners)); + assertThat(properties.getProperty("inter.broker.listener.name"), is("BROKER1")); + assertThat(properties.getProperty("broker.id"), is("1")); + assertThat(properties.getProperty("listener.security.protocol.map"), is(kafkaContainer.configureListenerSecurityProtocolMap("SASL_PLAINTEXT"))); + assertThat(properties.getProperty("num.network.threads"), is("3")); + assertThat(properties.getProperty("num.io.threads"), is("8")); + assertThat(properties.getProperty("socket.send.buffer.bytes"), is("102400")); + assertThat(properties.getProperty("socket.receive.buffer.bytes"), is("102400")); + assertThat(properties.getProperty("socket.request.max.bytes"), is("104857600")); + assertThat(properties.getProperty("log.dirs"), is("/tmp/default-log-dir")); + assertThat(properties.getProperty("num.partitions"), is("1")); + assertThat(properties.getProperty("num.recovery.threads.per.data.dir"), is("1")); + assertThat(properties.getProperty("offsets.topic.replication.factor"), is("1")); + assertThat(properties.getProperty("transaction.state.log.replication.factor"), is("1")); + assertThat(properties.getProperty("transaction.state.log.min.isr"), is("1")); + assertThat(properties.getProperty("log.retention.hours"), is("168")); + assertThat(properties.getProperty("log.retention.check.interval.ms"), is("300000")); + + // KRaft-specific assertions + assertThat(properties.getProperty("process.roles"), is("broker,controller")); + assertThat(properties.getProperty("node.id"), is("1")); + String expectedQuorumVoters = String.format("%d@%s%d:9094", 1, "broker-", 1); + assertThat(properties.getProperty("controller.quorum.voters"), is(expectedQuorumVoters)); + assertThat(properties.getProperty("controller.listener.names"), is("CONTROLLER")); + + // OAuth-specific assertions + assertThat(properties.getProperty("sasl.enabled.mechanisms"), is("PLAIN")); + assertThat(properties.getProperty("sasl.mechanism.inter.broker.protocol"), is("PLAIN")); + + assertThat(properties.getProperty("listener.name.sasl_plaintext.plain.sasl.jaas.config"), + is("org.apache.kafka.common.security.plain.PlainLoginModule required " + + "username=\"admin\" " + + "password=\"password\";")); + assertThat(properties.getProperty("listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class"), + is("io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler")); + } + + @Test + void testBuildDefaultServerPropertiesWithOAuthBearer() { + String listeners = "SASL_PLAINTEXT://0.0.0.0:9092"; + String advertisedListeners = "SASL_PLAINTEXT://localhost:9092"; + kafkaContainer.listenerNames.add("SASL_PLAINTEXT"); + kafkaContainer + .withBrokerId(1) + .withNodeId(1) + .withKraft() + .withOAuthConfig("test-realm", "test-client", "test-secret", "http://oauth-uri", "preferred_username") + .withAuthenticationType(AuthenticationType.OAUTH_BEARER); + + Properties properties = kafkaContainer.buildDefaultServerProperties(listeners, advertisedListeners); + + assertThat(properties, notNullValue()); + assertThat(properties.getProperty("listeners"), is(listeners)); + assertThat(properties.getProperty("advertised.listeners"), is(advertisedListeners)); + assertThat(properties.getProperty("inter.broker.listener.name"), is("BROKER1")); + assertThat(properties.getProperty("broker.id"), is("1")); + assertThat(properties.getProperty("listener.security.protocol.map"), is(kafkaContainer.configureListenerSecurityProtocolMap("SASL_PLAINTEXT"))); + assertThat(properties.getProperty("num.network.threads"), is("3")); + assertThat(properties.getProperty("num.io.threads"), is("8")); + assertThat(properties.getProperty("socket.send.buffer.bytes"), is("102400")); + assertThat(properties.getProperty("socket.receive.buffer.bytes"), is("102400")); + assertThat(properties.getProperty("socket.request.max.bytes"), is("104857600")); + assertThat(properties.getProperty("log.dirs"), is("/tmp/default-log-dir")); + assertThat(properties.getProperty("num.partitions"), is("1")); + assertThat(properties.getProperty("num.recovery.threads.per.data.dir"), is("1")); + assertThat(properties.getProperty("offsets.topic.replication.factor"), is("1")); + assertThat(properties.getProperty("transaction.state.log.replication.factor"), is("1")); + assertThat(properties.getProperty("transaction.state.log.min.isr"), is("1")); + assertThat(properties.getProperty("log.retention.hours"), is("168")); + assertThat(properties.getProperty("log.retention.check.interval.ms"), is("300000")); + + // KRaft-specific assertions + assertThat(properties.getProperty("process.roles"), is("broker,controller")); + assertThat(properties.getProperty("node.id"), is("1")); + String expectedQuorumVoters = String.format("%d@%s%d:9094", 1, "broker-", 1); + assertThat(properties.getProperty("controller.quorum.voters"), is(expectedQuorumVoters)); + assertThat(properties.getProperty("controller.listener.names"), is("CONTROLLER")); + + // OAuth-specific assertions + assertThat(properties.getProperty("sasl.enabled.mechanisms"), is("OAUTHBEARER")); + assertThat(properties.getProperty("sasl.mechanism.inter.broker.protocol"), is("OAUTHBEARER")); + + assertThat(properties.getProperty("listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config"), + is("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;")); + assertThat(properties.getProperty("listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class"), + is("io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")); + assertThat(properties.getProperty("listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class"), + is("io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler")); + } + @Test void testRandomUuidGeneratesValidUuid() throws Exception { // Use reflection to access the private randomUuid method diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftOauthIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftOauthIT.java index b37579a..9d6705c 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftOauthIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaKRaftOauthIT.java @@ -68,6 +68,9 @@ void testIsOAuthEnabledReturnsTrueWhenOAuthConfiguredAndOAuthEnvsAreSet() { assertThat(envMap.get("OAUTH_TOKEN_ENDPOINT_URI"), CoreMatchers.is(oauthUri + "/realms/" + realmName + "/protocol/openid-connect/token")); assertThat(envMap.get("OAUTH_USERNAME_CLAIM"), CoreMatchers.is("preferred_username")); } finally { + if (this.keycloakContainer != null) { + this.keycloakContainer.stop(); + } if (this.systemUnderTest != null) { this.systemUnderTest.stop(); } @@ -148,6 +151,11 @@ void testOAuthOverPlain() { assertThat(record.value(), CoreMatchers.is("value")); } + for (ConsumerRecord record : records) { + assertThat(record.key(), CoreMatchers.is("key")); + assertThat(record.value(), CoreMatchers.is("value")); + } + consumer.close(); } finally { if (this.keycloakContainer != null) { diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java b/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java index 83b9036..07588c0 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaKraftContainerIT.java @@ -163,6 +163,8 @@ void testWithKafkaLog() { assertThat(systemUnderTest.getLogs(), CoreMatchers.containsString("INFO")); assertThat(systemUnderTest.getLogs(), CoreMatchers.containsString("DEBUG")); assertThat(systemUnderTest.getLogs(), CoreMatchers.containsString("TRACE")); + + systemUnderTest.stop(); } private void verify() throws InterruptedException, ExecutionException, TimeoutException {