From 1dd713799ecb6602b8d790e2b580cb79c6de2ecf Mon Sep 17 00:00:00 2001 From: Jakub Scholz Date: Thu, 8 Aug 2024 16:40:11 +0200 Subject: [PATCH 1/2] Update of unit tests to use KRaft - part 5 Signed-off-by: Jakub Scholz --- .../assembly/KafkaClusterCreatorTest.java | 418 +----------- .../KafkaClusterCreatorZooBasedTest.java | 530 +++++++++++++++ .../KafkaListenerReconcilerClusterIPTest.java | 395 ----------- ...concilerSkipBootstrapLoadBalancerTest.java | 342 ---------- .../assembly/KafkaListenerReconcilerTest.java | 622 ++++++++++++++++++ 5 files changed, 1181 insertions(+), 1126 deletions(-) create mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorZooBasedTest.java delete mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerClusterIPTest.java delete mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerSkipBootstrapLoadBalancerTest.java create mode 100644 cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerTest.java diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorTest.java index 6cce714abbb..e04a8be6aae 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorTest.java @@ -17,8 +17,6 @@ import io.strimzi.operator.cluster.KafkaVersionTestUtils; import io.strimzi.operator.cluster.ResourceUtils; import io.strimzi.operator.cluster.model.KafkaMetadataConfigurationState; -import io.strimzi.operator.cluster.model.KafkaVersion; -import io.strimzi.operator.cluster.model.KafkaVersionChange; import io.strimzi.operator.cluster.model.NodeRef; import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; import io.strimzi.operator.common.Annotations; @@ -56,46 +54,9 @@ public class KafkaClusterCreatorTest { private final static String NAMESPACE = "my-ns"; private final static String CLUSTER_NAME = "my-cluster"; private final static Reconciliation RECONCILIATION = new Reconciliation("test", "kind", NAMESPACE, CLUSTER_NAME); - private final static KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); - private final static KafkaVersionChange VERSION_CHANGE = new KafkaVersionChange( - VERSIONS.defaultVersion(), - VERSIONS.defaultVersion(), - VERSIONS.defaultVersion().protocolVersion(), - VERSIONS.defaultVersion().messageVersion(), - VERSIONS.defaultVersion().metadataVersion() - ); private final static ClusterOperatorConfig CO_CONFIG = ResourceUtils.dummyClusterOperatorConfig(); private final static Kafka KAFKA = new KafkaBuilder() - .withNewMetadata() - .withName(CLUSTER_NAME) - .withNamespace(NAMESPACE) - .endMetadata() - .withNewSpec() - .withNewKafka() - .withReplicas(3) - .withListeners(new GenericKafkaListenerBuilder() - .withName("tls") - .withPort(9092) - .withType(KafkaListenerType.INTERNAL) - .withTls(true) - .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() - .endSpec() - .build(); - private final static Kafka KAFKA_WITH_POOLS = new KafkaBuilder(KAFKA) - .withNewMetadata() - .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled") - .endMetadata() - .build(); - private final static Kafka KAFKA_WITH_KRAFT = new KafkaBuilder() .withNewMetadata() .withName(CLUSTER_NAME) .withNamespace(NAMESPACE) @@ -222,7 +183,6 @@ public class KafkaClusterCreatorTest { .endStatus() .build(); - private static final Map> CURRENT_PODS_3_NODES = Map.of("my-cluster-kafka", List.of("my-cluster-kafka-0", "my-cluster-kafka-1", "my-cluster-kafka-2")); private static final Map> CURRENT_PODS_5_NODES = Map.of("my-cluster-kafka", List.of("my-cluster-kafka-0", "my-cluster-kafka-1", "my-cluster-kafka-2", "my-cluster-kafka-3", "my-cluster-kafka-4")); private static Vertx vertx; @@ -241,49 +201,23 @@ public static void afterAll() { } ////////////////////////////////////////////////// - // Regular Kafka cluster tests + // KRaft tests ////////////////////////////////////////////////// @Test - public void testNewClusterWithoutNodePools(VertxTestContext context) { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA, null, Map.of(), Map.of(), VERSION_CHANGE, kafkaStatus, true) - .onComplete(context.succeeding(kc -> context.verify(() -> { - // Kafka cluster is created - assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(3)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2))); - assertThat(kc.removedNodes(), is(Set.of())); - - // Check the status conditions - assertThat(kafkaStatus.getConditions(), is(nullValue())); - - // No scale-down => scale-down check is not done - verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - @Test - public void testExistingClusterWithoutNodePools(VertxTestContext context) { + public void testNewClusterWithKRaft(VertxTestContext context) { ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA, null, Map.of(), CURRENT_PODS_3_NODES, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_CONTROLLERS, POOL_A, POOL_B), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(3)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2))); + assertThat(kc.nodes().size(), is(9)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2, 3, 4, 5, 6, 7, 8))); assertThat(kc.removedNodes(), is(Set.of())); // Check the status conditions @@ -296,178 +230,20 @@ public void testExistingClusterWithoutNodePools(VertxTestContext context) { }))); } - @Test - public void testRevertScaleDownWithoutNodePools(VertxTestContext context) { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock brokers-in-use check - BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; - when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(0, 1, 2, 3, 4))); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA, null, Map.of(), CURRENT_PODS_5_NODES, VERSION_CHANGE, kafkaStatus, true) - .onComplete(context.succeeding(kc -> context.verify(() -> { - // Kafka cluster is created - assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(5)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2, 3, 4))); - assertThat(kc.removedNodes(), is(Set.of())); - - // Check the status conditions - assertThat(kafkaStatus.getConditions().size(), is(1)); - assertThat(kafkaStatus.getConditions().get(0).getStatus(), is("True")); - assertThat(kafkaStatus.getConditions().get(0).getType(), is("Warning")); - assertThat(kafkaStatus.getConditions().get(0).getReason(), is("ScaleDownPreventionCheck")); - assertThat(kafkaStatus.getConditions().get(0).getMessage(), is("Reverting scale-down of Kafka my-cluster by changing number of replicas to 5")); - - // Scale-down reverted => should be called once - verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - @Test - public void testCorrectScaleDownWithoutNodePools(VertxTestContext context) { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock brokers-in-use check - BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; - when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(0, 1, 2))); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA, null, Map.of(), CURRENT_PODS_5_NODES, VERSION_CHANGE, kafkaStatus, true) - .onComplete(context.succeeding(kc -> context.verify(() -> { - // Kafka cluster is created - assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(3)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2))); - assertThat(kc.removedNodes(), is(Set.of(3, 4))); - - // Check the status conditions - assertThat(kafkaStatus.getConditions(), is(nullValue())); - - // Scale-down reverted => should be called once - verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - @Test - public void testThrowsRevertScaleDownFailsWithoutNodePools(VertxTestContext context) { + @Test + public void testNewClusterWithMixedNodesKRaft(VertxTestContext context) { ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - // Mock brokers-in-use check - BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; - when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(0, 1, 2, 3, 4))); - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA, null, Map.of(), CURRENT_PODS_5_NODES, VERSION_CHANGE, kafkaStatus, false) - .onComplete(context.failing(ex -> context.verify(() -> { - // Check exception - assertThat(ex, instanceOf(InvalidResourceException.class)); - assertThat(ex.getMessage(), is("Following errors were found when processing the Kafka custom resource: [Cannot scale-down Kafka brokers [3, 4] because they have assigned partition-replicas.]")); - - // Check the status conditions - assertThat(kafkaStatus.getConditions(), is(nullValue())); - - // Scale-down failed => should be called once - verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - @Test - public void tesSkipScaleDownCheckWithoutNodePools(VertxTestContext context) { - Kafka kafka = new KafkaBuilder(KAFKA) - .editMetadata() - .addToAnnotations(Annotations.ANNO_STRIMZI_IO_SKIP_BROKER_SCALEDOWN_CHECK, "true") - .endMetadata() - .build(); - - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock brokers-in-use check - BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; - when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(0, 1, 2, 3, 4))); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(kafka, null, Map.of(), CURRENT_PODS_5_NODES, VERSION_CHANGE, kafkaStatus, false) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_MIXED), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); assertThat(kc.nodes().size(), is(3)); assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2))); - assertThat(kc.removedNodes(), is(Set.of(3, 4))); - - // Check the status conditions - assertThat(kafkaStatus.getConditions(), is(nullValue())); - - // Scale-down check skipped => should be never called - verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - ////////////////////////////////////////////////// - // Kafka cluster with node pools tests - ////////////////////////////////////////////////// - - @Test - public void testNewClusterWithNodePools(VertxTestContext context) { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A, POOL_B), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) - .onComplete(context.succeeding(kc -> context.verify(() -> { - // Kafka cluster is created - assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(6)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2, 3, 4, 5))); - assertThat(kc.removedNodes(), is(Set.of())); - - // Check the status conditions - assertThat(kafkaStatus.getConditions(), is(nullValue())); - - // No scale-down => scale-down check is not done - verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - @Test - public void testExistingClusterWithNodePools(VertxTestContext context) { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) - .onComplete(context.succeeding(kc -> context.verify(() -> { - // Kafka cluster is created - assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(6)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002))); assertThat(kc.removedNodes(), is(Set.of())); // Check the status conditions @@ -481,155 +257,19 @@ public void testExistingClusterWithNodePools(VertxTestContext context) { } @Test - public void testRevertScaleDownWithNodePools(VertxTestContext context) { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock brokers-in-use check - BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; - when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 1003, 2004))); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) - .onComplete(context.succeeding(kc -> context.verify(() -> { - // Kafka cluster is created - assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(10)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 1003, 1004, 2000, 2001, 2002, 2003, 2004))); - assertThat(kc.removedNodes(), is(Set.of())); - - // Check the status conditions - assertThat(kafkaStatus.getConditions().size(), is(2)); - assertThat(kafkaStatus.getConditions().get(0).getStatus(), is("True")); - assertThat(kafkaStatus.getConditions().get(0).getType(), is("Warning")); - assertThat(kafkaStatus.getConditions().get(0).getReason(), is("ScaleDownPreventionCheck")); - assertThat(kafkaStatus.getConditions().get(0).getMessage(), is("Reverting scale-down of KafkaNodePool pool-a by changing number of replicas to 5")); - assertThat(kafkaStatus.getConditions().get(1).getStatus(), is("True")); - assertThat(kafkaStatus.getConditions().get(1).getType(), is("Warning")); - assertThat(kafkaStatus.getConditions().get(1).getReason(), is("ScaleDownPreventionCheck")); - assertThat(kafkaStatus.getConditions().get(1).getMessage(), is("Reverting scale-down of KafkaNodePool pool-b by changing number of replicas to 5")); - - // Scale-down reverted => should be called once - verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - @Test - public void testCorrectScaleDownWithNodePools(VertxTestContext context) { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock brokers-in-use check - BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; - when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 2000, 2001, 2002))); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), CURRENT_PODS_5_NODES, VERSION_CHANGE, kafkaStatus, true) - .onComplete(context.succeeding(kc -> context.verify(() -> { - // Kafka cluster is created - assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(6)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002))); - assertThat(kc.removedNodes(), is(Set.of(1003, 1004, 2003, 2004))); - - // Check the status conditions - assertThat(kafkaStatus.getConditions(), is(nullValue())); - - // Scale-down reverted => should be called once - verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - @Test - public void testThrowsRevertScaleDownFailsWithNodePools(VertxTestContext context) { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock brokers-in-use check - BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; - when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 1003, 1004, 2003, 2004))); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, VERSION_CHANGE, kafkaStatus, false) - .onComplete(context.failing(ex -> context.verify(() -> { - // Check exception - assertThat(ex, instanceOf(InvalidResourceException.class)); - assertThat(ex.getMessage(), is("Following errors were found when processing the Kafka custom resource: [Cannot scale-down Kafka brokers [1003, 1004, 2003, 2004] because they have assigned partition-replicas.]")); - - // Check the status conditions - assertThat(kafkaStatus.getConditions(), is(nullValue())); - - // Scale-down failed => should be called once - verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - @Test - public void tesSkipScaleDownCheckWithNodePools(VertxTestContext context) { - Kafka kafka = new KafkaBuilder(KAFKA_WITH_POOLS) - .editMetadata() - .addToAnnotations(Annotations.ANNO_STRIMZI_IO_SKIP_BROKER_SCALEDOWN_CHECK, "true") - .endMetadata() - .build(); - - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock brokers-in-use check - BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; - when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 1003, 1004, 2003, 2004))); - - KafkaStatus kafkaStatus = new KafkaStatus(); - KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); - - Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(kafka, List.of(POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, VERSION_CHANGE, kafkaStatus, false) - .onComplete(context.succeeding(kc -> context.verify(() -> { - // Kafka cluster is created - assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(6)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002))); - assertThat(kc.removedNodes(), is(Set.of(1003, 1004, 2003, 2004))); - - // Check the status conditions - assertThat(kafkaStatus.getConditions(), is(nullValue())); - - // Scale-down check skipped => should be never called - verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); - - async.flag(); - }))); - } - - ////////////////////////////////////////////////// - // KRaft tests - ////////////////////////////////////////////////// - - @Test - public void testNewClusterWithKRaft(VertxTestContext context) { + public void testExistingClusterWithKRaft(VertxTestContext context) { ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); KafkaStatus kafkaStatus = new KafkaStatus(); KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_CONTROLLERS, POOL_A, POOL_B), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_CONTROLLERS_WITH_STATUS, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); assertThat(kc.nodes().size(), is(9)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2, 3, 4, 5, 6, 7, 8))); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002))); assertThat(kc.removedNodes(), is(Set.of())); // Check the status conditions @@ -643,19 +283,19 @@ public void testNewClusterWithKRaft(VertxTestContext context) { } @Test - public void testExistingClusterWithKRaft(VertxTestContext context) { + public void testExistingClusterWithMixedNodesKRaft(VertxTestContext context) { ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); KafkaStatus kafkaStatus = new KafkaStatus(); KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_CONTROLLERS_WITH_STATUS, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_MIXED_WITH_STATUS), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); - assertThat(kc.nodes().size(), is(9)); - assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002))); + assertThat(kc.nodes().size(), is(3)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(3000, 3001, 3002))); assertThat(kc.removedNodes(), is(Set.of())); // Check the status conditions @@ -680,7 +320,7 @@ public void testRevertScaleDownWithKRaft(VertxTestContext context) { KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_CONTROLLERS_WITH_STATUS_5_NODES, POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_CONTROLLERS_WITH_STATUS_5_NODES, POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); @@ -718,7 +358,7 @@ public void testRevertScaleDownWithKRaftMixedNodes(VertxTestContext context) { KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_MIXED_WITH_STATUS_5_NODES), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_MIXED_WITH_STATUS_5_NODES), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); @@ -752,7 +392,7 @@ public void testCorrectScaleDownWithKRaft(VertxTestContext context) { KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_CONTROLLERS_WITH_STATUS_5_NODES, POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), CURRENT_PODS_5_NODES, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_CONTROLLERS_WITH_STATUS_5_NODES, POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), CURRENT_PODS_5_NODES, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); @@ -782,7 +422,7 @@ public void testThrowsRevertScaleDownFailsWithKRaft(VertxTestContext context) { KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_CONTROLLERS_WITH_STATUS_5_NODES, POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, VERSION_CHANGE, kafkaStatus, false) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_CONTROLLERS_WITH_STATUS_5_NODES, POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, false) .onComplete(context.failing(ex -> context.verify(() -> { // Check exception assertThat(ex, instanceOf(InvalidResourceException.class)); @@ -800,7 +440,7 @@ public void testThrowsRevertScaleDownFailsWithKRaft(VertxTestContext context) { @Test public void testSkipScaleDownCheckWithKRaft(VertxTestContext context) { - Kafka kafka = new KafkaBuilder(KAFKA_WITH_KRAFT) + Kafka kafka = new KafkaBuilder(KAFKA) .editMetadata() .addToAnnotations(Annotations.ANNO_STRIMZI_IO_SKIP_BROKER_SCALEDOWN_CHECK, "true") .endMetadata() @@ -812,7 +452,7 @@ public void testSkipScaleDownCheckWithKRaft(VertxTestContext context) { KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(kafka, List.of(POOL_CONTROLLERS_WITH_STATUS_5_NODES, POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, VERSION_CHANGE, kafkaStatus, false) + creator.prepareKafkaCluster(kafka, List.of(POOL_CONTROLLERS_WITH_STATUS_5_NODES, POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, false) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); @@ -842,7 +482,7 @@ public void testRevertRoleChangeWithKRaftMixedNodes(VertxTestContext context) { KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_MIXED_NOT_MIXED_ANYMORE, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_MIXED_NOT_MIXED_ANYMORE, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); @@ -885,7 +525,7 @@ public void testRevertRoleChangeWithKRaftDedicatedNodes(VertxTestContext context KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_MIXED_WITH_STATUS, POOL_A_WITH_STATUS, poolBFromBrokerToControllerOnly), Map.of(), null, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_MIXED_WITH_STATUS, POOL_A_WITH_STATUS, poolBFromBrokerToControllerOnly), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); @@ -922,7 +562,7 @@ public void testCorrectRoleChangeWithKRaft(VertxTestContext context) { KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_MIXED_NOT_MIXED_ANYMORE, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), CURRENT_PODS_5_NODES, VERSION_CHANGE, kafkaStatus, true) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_MIXED_NOT_MIXED_ANYMORE, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), CURRENT_PODS_5_NODES, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, true) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); @@ -953,7 +593,7 @@ public void testThrowsRevertBrokerChangeFailsWithKRaft(VertxTestContext context) KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(KAFKA_WITH_KRAFT, List.of(POOL_MIXED_NOT_MIXED_ANYMORE, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, VERSION_CHANGE, kafkaStatus, false) + creator.prepareKafkaCluster(KAFKA, List.of(POOL_MIXED_NOT_MIXED_ANYMORE, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, false) .onComplete(context.failing(ex -> context.verify(() -> { // Check exception assertThat(ex, instanceOf(InvalidResourceException.class)); @@ -971,7 +611,7 @@ public void testThrowsRevertBrokerChangeFailsWithKRaft(VertxTestContext context) @Test public void testSkipRoleChangeCheckWithKRaft(VertxTestContext context) { - Kafka kafka = new KafkaBuilder(KAFKA_WITH_KRAFT) + Kafka kafka = new KafkaBuilder(KAFKA) .editMetadata() .addToAnnotations(Annotations.ANNO_STRIMZI_IO_SKIP_BROKER_SCALEDOWN_CHECK, "true") .endMetadata() @@ -983,7 +623,7 @@ public void testSkipRoleChangeCheckWithKRaft(VertxTestContext context) { KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.KRAFT, supplier); Checkpoint async = context.checkpoint(); - creator.prepareKafkaCluster(kafka, List.of(POOL_MIXED_NOT_MIXED_ANYMORE, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, VERSION_CHANGE, kafkaStatus, false) + creator.prepareKafkaCluster(kafka, List.of(POOL_MIXED_NOT_MIXED_ANYMORE, POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, kafkaStatus, false) .onComplete(context.succeeding(kc -> context.verify(() -> { // Kafka cluster is created assertThat(kc, is(notNullValue())); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorZooBasedTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorZooBasedTest.java new file mode 100644 index 00000000000..c1472b60bef --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorZooBasedTest.java @@ -0,0 +1,530 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.operator.assembly; + +import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaBuilder; +import io.strimzi.api.kafka.model.kafka.KafkaStatus; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; +import io.strimzi.api.kafka.model.nodepool.KafkaNodePool; +import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder; +import io.strimzi.api.kafka.model.nodepool.ProcessRoles; +import io.strimzi.operator.cluster.ClusterOperatorConfig; +import io.strimzi.operator.cluster.KafkaVersionTestUtils; +import io.strimzi.operator.cluster.ResourceUtils; +import io.strimzi.operator.cluster.model.KafkaMetadataConfigurationState; +import io.strimzi.operator.cluster.model.NodeRef; +import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; +import io.strimzi.operator.common.Annotations; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.model.InvalidResourceException; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.WorkerExecutor; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(VertxExtension.class) +public class KafkaClusterCreatorZooBasedTest { + private final static String NAMESPACE = "my-ns"; + private final static String CLUSTER_NAME = "my-cluster"; + private final static Reconciliation RECONCILIATION = new Reconciliation("test", "kind", NAMESPACE, CLUSTER_NAME); + private final static ClusterOperatorConfig CO_CONFIG = ResourceUtils.dummyClusterOperatorConfig(); + + private final static Kafka KAFKA = new KafkaBuilder() + .withNewMetadata() + .withName(CLUSTER_NAME) + .withNamespace(NAMESPACE) + .endMetadata() + .withNewSpec() + .withNewKafka() + .withReplicas(3) + .withListeners(new GenericKafkaListenerBuilder() + .withName("tls") + .withPort(9092) + .withType(KafkaListenerType.INTERNAL) + .withTls(true) + .build()) + .withNewEphemeralStorage() + .endEphemeralStorage() + .endKafka() + .withNewZookeeper() + .withReplicas(3) + .withNewEphemeralStorage() + .endEphemeralStorage() + .endZookeeper() + .endSpec() + .build(); + private final static Kafka KAFKA_WITH_POOLS = new KafkaBuilder(KAFKA) + .withNewMetadata() + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled") + .endMetadata() + .build(); + + private final static KafkaNodePool POOL_A = new KafkaNodePoolBuilder() + .withNewMetadata() + .withName("pool-a") + .withNamespace(NAMESPACE) + .endMetadata() + .withNewSpec() + .withReplicas(3) + .withNewJbodStorage() + .withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build()) + .endJbodStorage() + .withRoles(ProcessRoles.BROKER) + .endSpec() + .build(); + private final static KafkaNodePool POOL_A_WITH_STATUS = new KafkaNodePoolBuilder(POOL_A) + .withNewStatus() + .withRoles(ProcessRoles.BROKER) + .withNodeIds(1000, 1001, 1002) + .endStatus() + .build(); + private final static KafkaNodePool POOL_A_WITH_STATUS_5_NODES = new KafkaNodePoolBuilder(POOL_A) + .withNewStatus() + .withRoles(ProcessRoles.BROKER) + .withNodeIds(1000, 1001, 1002, 1003, 1004) + .endStatus() + .build(); + + private final static KafkaNodePool POOL_B = new KafkaNodePoolBuilder() + .withNewMetadata() + .withName("pool-b") + .withNamespace(NAMESPACE) + .endMetadata() + .withNewSpec() + .withReplicas(3) + .withNewJbodStorage() + .withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("200Gi").build()) + .endJbodStorage() + .withRoles(ProcessRoles.BROKER) + .endSpec() + .build(); + private final static KafkaNodePool POOL_B_WITH_STATUS = new KafkaNodePoolBuilder(POOL_B) + .withNewStatus() + .withRoles(ProcessRoles.BROKER) + .withNodeIds(2000, 2001, 2002) + .endStatus() + .build(); + private final static KafkaNodePool POOL_B_WITH_STATUS_5_NODES = new KafkaNodePoolBuilder(POOL_B) + .withNewStatus() + .withRoles(ProcessRoles.BROKER) + .withNodeIds(2000, 2001, 2002, 2003, 2004) + .endStatus() + .build(); + + private static final Map> CURRENT_PODS_3_NODES = Map.of("my-cluster-kafka", List.of("my-cluster-kafka-0", "my-cluster-kafka-1", "my-cluster-kafka-2")); + private static final Map> CURRENT_PODS_5_NODES = Map.of("my-cluster-kafka", List.of("my-cluster-kafka-0", "my-cluster-kafka-1", "my-cluster-kafka-2", "my-cluster-kafka-3", "my-cluster-kafka-4")); + + private static Vertx vertx; + private static WorkerExecutor sharedWorkerExecutor; + + @BeforeAll + public static void beforeAll() { + vertx = Vertx.vertx(); + sharedWorkerExecutor = vertx.createSharedWorkerExecutor("kubernetes-ops-pool"); + } + + @AfterAll + public static void afterAll() { + sharedWorkerExecutor.close(); + vertx.close(); + } + + ////////////////////////////////////////////////// + // Regular Kafka cluster tests + ////////////////////////////////////////////////// + + @Test + public void testNewClusterWithoutNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA, null, Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, true) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(3)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2))); + assertThat(kc.removedNodes(), is(Set.of())); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // No scale-down => scale-down check is not done + verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void testExistingClusterWithoutNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA, null, Map.of(), CURRENT_PODS_3_NODES, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, true) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(3)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2))); + assertThat(kc.removedNodes(), is(Set.of())); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // No scale-down => scale-down check is not done + verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void testRevertScaleDownWithoutNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock brokers-in-use check + BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; + when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(0, 1, 2, 3, 4))); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA, null, Map.of(), CURRENT_PODS_5_NODES, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, true) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(5)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2, 3, 4))); + assertThat(kc.removedNodes(), is(Set.of())); + + // Check the status conditions + assertThat(kafkaStatus.getConditions().size(), is(1)); + assertThat(kafkaStatus.getConditions().get(0).getStatus(), is("True")); + assertThat(kafkaStatus.getConditions().get(0).getType(), is("Warning")); + assertThat(kafkaStatus.getConditions().get(0).getReason(), is("ScaleDownPreventionCheck")); + assertThat(kafkaStatus.getConditions().get(0).getMessage(), is("Reverting scale-down of Kafka my-cluster by changing number of replicas to 5")); + + // Scale-down reverted => should be called once + verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void testCorrectScaleDownWithoutNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock brokers-in-use check + BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; + when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(0, 1, 2))); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA, null, Map.of(), CURRENT_PODS_5_NODES, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, true) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(3)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2))); + assertThat(kc.removedNodes(), is(Set.of(3, 4))); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // Scale-down reverted => should be called once + verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void testThrowsRevertScaleDownFailsWithoutNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock brokers-in-use check + BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; + when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(0, 1, 2, 3, 4))); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA, null, Map.of(), CURRENT_PODS_5_NODES, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, false) + .onComplete(context.failing(ex -> context.verify(() -> { + // Check exception + assertThat(ex, instanceOf(InvalidResourceException.class)); + assertThat(ex.getMessage(), is("Following errors were found when processing the Kafka custom resource: [Cannot scale-down Kafka brokers [3, 4] because they have assigned partition-replicas.]")); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // Scale-down failed => should be called once + verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void tesSkipScaleDownCheckWithoutNodePools(VertxTestContext context) { + Kafka kafka = new KafkaBuilder(KAFKA) + .editMetadata() + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_SKIP_BROKER_SCALEDOWN_CHECK, "true") + .endMetadata() + .build(); + + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock brokers-in-use check + BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; + when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(0, 1, 2, 3, 4))); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(kafka, null, Map.of(), CURRENT_PODS_5_NODES, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, false) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(3)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2))); + assertThat(kc.removedNodes(), is(Set.of(3, 4))); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // Scale-down check skipped => should be never called + verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + ////////////////////////////////////////////////// + // Kafka cluster with node pools tests + ////////////////////////////////////////////////// + + @Test + public void testNewClusterWithNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A, POOL_B), Map.of(), null, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, true) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(6)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2, 3, 4, 5))); + assertThat(kc.removedNodes(), is(Set.of())); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // No scale-down => scale-down check is not done + verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void testExistingClusterWithNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A_WITH_STATUS, POOL_B_WITH_STATUS), Map.of(), null, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, true) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(6)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002))); + assertThat(kc.removedNodes(), is(Set.of())); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // No scale-down => scale-down check is not done + verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void testRevertScaleDownWithNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock brokers-in-use check + BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; + when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 1003, 2004))); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, true) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(10)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 1003, 1004, 2000, 2001, 2002, 2003, 2004))); + assertThat(kc.removedNodes(), is(Set.of())); + + // Check the status conditions + assertThat(kafkaStatus.getConditions().size(), is(2)); + assertThat(kafkaStatus.getConditions().get(0).getStatus(), is("True")); + assertThat(kafkaStatus.getConditions().get(0).getType(), is("Warning")); + assertThat(kafkaStatus.getConditions().get(0).getReason(), is("ScaleDownPreventionCheck")); + assertThat(kafkaStatus.getConditions().get(0).getMessage(), is("Reverting scale-down of KafkaNodePool pool-a by changing number of replicas to 5")); + assertThat(kafkaStatus.getConditions().get(1).getStatus(), is("True")); + assertThat(kafkaStatus.getConditions().get(1).getType(), is("Warning")); + assertThat(kafkaStatus.getConditions().get(1).getReason(), is("ScaleDownPreventionCheck")); + assertThat(kafkaStatus.getConditions().get(1).getMessage(), is("Reverting scale-down of KafkaNodePool pool-b by changing number of replicas to 5")); + + // Scale-down reverted => should be called once + verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void testCorrectScaleDownWithNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock brokers-in-use check + BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; + when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 2000, 2001, 2002))); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), CURRENT_PODS_5_NODES, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, true) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(6)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002))); + assertThat(kc.removedNodes(), is(Set.of(1003, 1004, 2003, 2004))); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // Scale-down reverted => should be called once + verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void testThrowsRevertScaleDownFailsWithNodePools(VertxTestContext context) { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock brokers-in-use check + BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; + when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 1003, 1004, 2003, 2004))); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(KAFKA_WITH_POOLS, List.of(POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, false) + .onComplete(context.failing(ex -> context.verify(() -> { + // Check exception + assertThat(ex, instanceOf(InvalidResourceException.class)); + assertThat(ex.getMessage(), is("Following errors were found when processing the Kafka custom resource: [Cannot scale-down Kafka brokers [1003, 1004, 2003, 2004] because they have assigned partition-replicas.]")); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // Scale-down failed => should be called once + verify(supplier.brokersInUseCheck, times(1)).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } + + @Test + public void tesSkipScaleDownCheckWithNodePools(VertxTestContext context) { + Kafka kafka = new KafkaBuilder(KAFKA_WITH_POOLS) + .editMetadata() + .addToAnnotations(Annotations.ANNO_STRIMZI_IO_SKIP_BROKER_SCALEDOWN_CHECK, "true") + .endMetadata() + .build(); + + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock brokers-in-use check + BrokersInUseCheck brokersInUseOps = supplier.brokersInUseCheck; + when(brokersInUseOps.brokersInUse(any(), any(), any(), any())).thenReturn(Future.succeededFuture(Set.of(1000, 1001, 1002, 1003, 1004, 2003, 2004))); + + KafkaStatus kafkaStatus = new KafkaStatus(); + KafkaClusterCreator creator = new KafkaClusterCreator(vertx, RECONCILIATION, CO_CONFIG, KafkaMetadataConfigurationState.ZK, supplier); + + Checkpoint async = context.checkpoint(); + creator.prepareKafkaCluster(kafka, List.of(POOL_A_WITH_STATUS_5_NODES, POOL_B_WITH_STATUS_5_NODES), Map.of(), null, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, kafkaStatus, false) + .onComplete(context.succeeding(kc -> context.verify(() -> { + // Kafka cluster is created + assertThat(kc, is(notNullValue())); + assertThat(kc.nodes().size(), is(6)); + assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002))); + assertThat(kc.removedNodes(), is(Set.of(1003, 1004, 2003, 2004))); + + // Check the status conditions + assertThat(kafkaStatus.getConditions(), is(nullValue())); + + // Scale-down check skipped => should be never called + verify(supplier.brokersInUseCheck, never()).brokersInUse(any(), any(), any(), any()); + + async.flag(); + }))); + } +} diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerClusterIPTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerClusterIPTest.java deleted file mode 100644 index 63552c96f58..00000000000 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerClusterIPTest.java +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.operator.assembly; - -import io.fabric8.kubernetes.api.model.Service; -import io.strimzi.api.kafka.model.kafka.Kafka; -import io.strimzi.api.kafka.model.kafka.KafkaBuilder; -import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; -import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerConfigurationBroker; -import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerConfigurationBrokerBuilder; -import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; -import io.strimzi.api.kafka.model.kafka.listener.ListenerStatus; -import io.strimzi.operator.cluster.KafkaVersionTestUtils; -import io.strimzi.operator.cluster.PlatformFeaturesAvailability; -import io.strimzi.operator.cluster.ResourceUtils; -import io.strimzi.operator.cluster.model.KafkaCluster; -import io.strimzi.operator.cluster.model.KafkaMetadataConfigurationState; -import io.strimzi.operator.cluster.model.KafkaPool; -import io.strimzi.operator.cluster.model.KafkaVersion; -import io.strimzi.operator.cluster.model.MockSharedEnvironmentProvider; -import io.strimzi.operator.cluster.model.SharedEnvironmentProvider; -import io.strimzi.operator.cluster.model.nodepools.NodePoolUtils; -import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; -import io.strimzi.operator.cluster.operator.resource.kubernetes.IngressOperator; -import io.strimzi.operator.cluster.operator.resource.kubernetes.RouteOperator; -import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator; -import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceOperator; -import io.strimzi.operator.common.Reconciliation; -import io.strimzi.operator.common.model.Labels; -import io.strimzi.operator.common.operator.resource.ReconcileResult; -import io.strimzi.platform.KubernetesVersion; -import io.vertx.core.Future; -import io.vertx.junit5.Checkpoint; -import io.vertx.junit5.VertxExtension; -import io.vertx.junit5.VertxTestContext; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.hamcrest.CoreMatchers.hasItems; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.notNull; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(VertxExtension.class) -public class KafkaListenerReconcilerClusterIPTest { - private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); - private static final SharedEnvironmentProvider SHARED_ENV_PROVIDER = new MockSharedEnvironmentProvider(); - - public static final String NAMESPACE = "test"; - public static final String CLUSTER_NAME = "my-kafka"; - public static final String DNS_NAME_FOR_BROKER_0 = "broker-0.test.dns.name"; - public static final String DNS_NAME_FOR_BROKER_1 = "broker-1.test.dns.name"; - public static final String DNS_NAME_FOR_BROKER_2 = "broker-2.test.dns.name"; - public static final String DNS_NAME_FOR_BOOTSTRAP_SERVICE = "my-kafka-kafka-external-bootstrap.test.svc"; - public static final int LISTENER_PORT = 9094; - - @Test - public void testClusterIpWithoutTLS(VertxTestContext context) { - Kafka kafka = new KafkaBuilder() - .withNewMetadata() - .withName(CLUSTER_NAME) - .withNamespace(NAMESPACE) - .endMetadata() - .withNewSpec() - .withNewKafka() - .withReplicas(3) - .withListeners(new GenericKafkaListenerBuilder() - .withName("external") - .withPort(LISTENER_PORT) - .withTls(false) - .withType(KafkaListenerType.CLUSTER_IP) - .withNewConfiguration() - .withCreateBootstrapService(true) - .endConfiguration() - .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() - .withNewEntityOperator() - .withNewUserOperator() - .endUserOperator() - .withNewTopicOperator() - .endTopicOperator() - .endEntityOperator() - .endSpec() - .build(); - List pools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafka, null, Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, false, SHARED_ENV_PROVIDER); - KafkaCluster kafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, KafkaMetadataConfigurationState.ZK, null, SHARED_ENV_PROVIDER); - - ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); - - MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( - new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), - kafkaCluster, - new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), - supplier.secretOperations, - supplier.serviceOperations, - supplier.routeOperations, - supplier.ingressOperations - ); - - Checkpoint async = context.checkpoint(); - reconciler.reconcile() - .onComplete(context.succeeding(res -> context.verify(() -> { - // Check status - assertThat(res.listenerStatuses.size(), is(1)); - ListenerStatus listenerStatus = res.listenerStatuses.get(0); - assertThat(listenerStatus.getBootstrapServers(), is("my-kafka-kafka-external-bootstrap.test.svc:9094")); - assertThat(listenerStatus.getAddresses().size(), is(1)); - assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE)); - assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); - - //check hostnames - assertThat(res.bootstrapDnsNames.size(), is(0)); - assertThat(res.brokerDnsNames.size(), is(0)); - Set allBrokersAdvertisedHostNames = res.advertisedHostnames.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()); - assertThat(allBrokersAdvertisedHostNames.size(), is(3)); - assertThat(allBrokersAdvertisedHostNames, hasItems("my-kafka-kafka-1.test.svc", "my-kafka-kafka-2.test.svc", "my-kafka-kafka-0.test.svc")); - assertThat(res.advertisedPorts.size(), is(3)); - assertThat(res.advertisedPorts.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()), hasItems("9094")); - - // Check creation of services - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"), notNull()); - - async.flag(); - }))); - } - - @Test - public void testClusterIpWithTLS(VertxTestContext context) { - Kafka kafka = new KafkaBuilder() - .withNewMetadata() - .withName(CLUSTER_NAME) - .withNamespace(NAMESPACE) - .endMetadata() - .withNewSpec() - .withNewKafka() - .withReplicas(3) - .withListeners(new GenericKafkaListenerBuilder() - .withName("external") - .withPort(LISTENER_PORT) - .withTls(true) - .withType(KafkaListenerType.CLUSTER_IP) - .withNewConfiguration() - .withCreateBootstrapService(true) - .endConfiguration() - .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() - .withNewEntityOperator() - .withNewUserOperator() - .endUserOperator() - .withNewTopicOperator() - .endTopicOperator() - .endEntityOperator() - .endSpec() - .build(); - List pools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafka, null, Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, false, SHARED_ENV_PROVIDER); - KafkaCluster kafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, KafkaMetadataConfigurationState.ZK, null, SHARED_ENV_PROVIDER); - - ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); - - MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( - new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), - kafkaCluster, - new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), - supplier.secretOperations, - supplier.serviceOperations, - supplier.routeOperations, - supplier.ingressOperations - ); - - Checkpoint async = context.checkpoint(); - reconciler.reconcile() - .onComplete(context.succeeding(res -> context.verify(() -> { - // Check status - assertThat(res.listenerStatuses.size(), is(1)); - ListenerStatus listenerStatus = res.listenerStatuses.get(0); - assertThat(listenerStatus.getBootstrapServers(), is("my-kafka-kafka-external-bootstrap.test.svc:9094")); - assertThat(listenerStatus.getAddresses().size(), is(1)); - assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE)); - assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); - - //check hostnames - assertThat(res.bootstrapDnsNames.size(), is(4)); - assertThat(res.bootstrapDnsNames, hasItems("my-kafka-kafka-external-bootstrap", "my-kafka-kafka-external-bootstrap.test", "my-kafka-kafka-external-bootstrap.test.svc", "my-kafka-kafka-external-bootstrap.test.svc.cluster.local")); - Set allBrokersDnsNames = res.brokerDnsNames.values().stream().flatMap(s -> s.stream()).collect(Collectors.toSet()); - assertThat(allBrokersDnsNames.size(), is(3)); - assertThat(allBrokersDnsNames, hasItems("my-kafka-kafka-1.test.svc", "my-kafka-kafka-2.test.svc", "my-kafka-kafka-0.test.svc")); - Set allBrokersAdvertisedHostNames = res.advertisedHostnames.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()); - assertThat(allBrokersAdvertisedHostNames.size(), is(3)); - assertThat(allBrokersAdvertisedHostNames, hasItems("my-kafka-kafka-1.test.svc", "my-kafka-kafka-2.test.svc", "my-kafka-kafka-0.test.svc")); - assertThat(res.advertisedPorts.size(), is(3)); - assertThat(res.advertisedPorts.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()), hasItems("9094")); - - // Check creation of services - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"), notNull()); - - async.flag(); - }))); - } - - - @Test - public void testClusterIpWithCustomBrokerHosts(VertxTestContext context) { - GenericKafkaListenerConfigurationBroker broker0 = new GenericKafkaListenerConfigurationBrokerBuilder() - .withBroker(0) - .withAdvertisedHost("my-address-0") - .build(); - - GenericKafkaListenerConfigurationBroker broker1 = new GenericKafkaListenerConfigurationBrokerBuilder() - .withBroker(1) - .withAdvertisedHost("my-address-1") - .build(); - - GenericKafkaListenerConfigurationBroker broker2 = new GenericKafkaListenerConfigurationBrokerBuilder() - .withBroker(2) - .withAdvertisedHost("my-address-2") - .build(); - - - Kafka kafka = new KafkaBuilder() - .withNewMetadata() - .withName(CLUSTER_NAME) - .withNamespace(NAMESPACE) - .endMetadata() - .withNewSpec() - .withNewKafka() - .withReplicas(3) - .withListeners(new GenericKafkaListenerBuilder() - .withName("external") - .withPort(LISTENER_PORT) - .withTls(true) - .withType(KafkaListenerType.CLUSTER_IP) - .withNewConfiguration() - .withCreateBootstrapService(true) - .withBrokers(broker0, broker1, broker2) - .endConfiguration() - .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() - .withNewEntityOperator() - .withNewUserOperator() - .endUserOperator() - .withNewTopicOperator() - .endTopicOperator() - .endEntityOperator() - .endSpec() - .build(); - List pools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafka, null, Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, false, SHARED_ENV_PROVIDER); - KafkaCluster kafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, KafkaMetadataConfigurationState.ZK, null, SHARED_ENV_PROVIDER); - - ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); - - MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( - new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), - kafkaCluster, - new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), - supplier.secretOperations, - supplier.serviceOperations, - supplier.routeOperations, - supplier.ingressOperations - ); - - Checkpoint async = context.checkpoint(); - reconciler.reconcile() - .onComplete(context.succeeding(res -> context.verify(() -> { - // Check status - assertThat(res.listenerStatuses.size(), is(1)); - ListenerStatus listenerStatus = res.listenerStatuses.get(0); - assertThat(listenerStatus.getBootstrapServers(), is("my-kafka-kafka-external-bootstrap.test.svc:9094")); - assertThat(listenerStatus.getAddresses().size(), is(1)); - assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE)); - assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); - - //check hostnames - assertThat(res.bootstrapDnsNames.size(), is(4)); - assertThat(res.bootstrapDnsNames, hasItems("my-kafka-kafka-external-bootstrap", "my-kafka-kafka-external-bootstrap.test", "my-kafka-kafka-external-bootstrap.test.svc", "my-kafka-kafka-external-bootstrap.test.svc.cluster.local")); - Set allBrokersDnsNames = res.brokerDnsNames.values().stream().flatMap(s -> s.stream()).collect(Collectors.toSet()); - assertThat(allBrokersDnsNames.size(), is(6)); - assertThat(allBrokersDnsNames, hasItems("my-address-0", "my-address-1", "my-address-2", "my-kafka-kafka-1.test.svc", "my-kafka-kafka-2.test.svc", "my-kafka-kafka-0.test.svc")); - Set allBrokersAdvertisedHostNames = res.advertisedHostnames.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()); - assertThat(allBrokersAdvertisedHostNames.size(), is(3)); - assertThat(allBrokersAdvertisedHostNames, hasItems("my-address-0", "my-address-1", "my-address-2")); - assertThat(res.advertisedPorts.size(), is(3)); - assertThat(res.advertisedPorts.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()), hasItems("9094")); - // Check creation of services - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"), notNull()); - - async.flag(); - }))); - } - - private ResourceOperatorSupplier prepareResourceOperatorSupplier() { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock the Services for the kafka. - Service mockServiceBootstrap = mock(Service.class, RETURNS_DEEP_STUBS); - Service mockServiceBroker0 = mock(Service.class, RETURNS_DEEP_STUBS); - Service mockServiceBroker1 = mock(Service.class, RETURNS_DEEP_STUBS); - Service mockServiceBroker2 = mock(Service.class, RETURNS_DEEP_STUBS); - when(mockServiceBootstrap.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BOOTSTRAP_SERVICE); - when(mockServiceBroker0.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_0); - when(mockServiceBroker1.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_1); - when(mockServiceBroker2.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_2); - - // Mock the ServiceOperator for the kafka services. - ServiceOperator mockServiceOperator = supplier.serviceOperations; - - // Delegate the batchReconcile call to the real method which calls the other mocked methods. This allows us to better test the exact behavior. - when(mockServiceOperator.batchReconcile(any(), eq(NAMESPACE), any(), any())).thenCallRealMethod(); - - // Mock getting of services and their readiness - when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"))).thenReturn(Future.succeededFuture(mockServiceBootstrap)); - when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"))).thenReturn(Future.succeededFuture(mockServiceBroker0)); - when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"))).thenReturn(Future.succeededFuture(mockServiceBroker1)); - when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"))).thenReturn(Future.succeededFuture(mockServiceBroker2)); - - // Mock listing of services - when(mockServiceOperator.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); - - // Mock service creation / update - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBootstrap))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker0))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker1))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker2))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-brokers"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(new Service()))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-bootstrap"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(new Service()))); - - return supplier; - } - - /** - * Override KafkaListenersReconciler to only run reconciliation steps that concern the Load balancer resources feature - */ - static class MockKafkaListenersReconciler extends KafkaListenersReconciler { - public MockKafkaListenersReconciler( - Reconciliation reconciliation, - KafkaCluster kafka, - PlatformFeaturesAvailability pfa, - SecretOperator secretOperator, - ServiceOperator serviceOperator, - RouteOperator routeOperator, - IngressOperator ingressOperator) { - super(reconciliation, kafka, null, pfa, 300_000L, secretOperator, serviceOperator, routeOperator, ingressOperator); - } - - @Override - public Future reconcile() { - return services() - .compose(i -> clusterIPServicesReady()) - .compose(i -> Future.succeededFuture(result)); - } - } -} \ No newline at end of file diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerSkipBootstrapLoadBalancerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerSkipBootstrapLoadBalancerTest.java deleted file mode 100644 index b129cfeba2a..00000000000 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerSkipBootstrapLoadBalancerTest.java +++ /dev/null @@ -1,342 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.operator.cluster.operator.assembly; - -import io.fabric8.kubernetes.api.model.Service; -import io.strimzi.api.kafka.model.kafka.Kafka; -import io.strimzi.api.kafka.model.kafka.KafkaBuilder; -import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; -import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; -import io.strimzi.api.kafka.model.kafka.listener.ListenerStatus; -import io.strimzi.operator.cluster.KafkaVersionTestUtils; -import io.strimzi.operator.cluster.PlatformFeaturesAvailability; -import io.strimzi.operator.cluster.ResourceUtils; -import io.strimzi.operator.cluster.model.KafkaCluster; -import io.strimzi.operator.cluster.model.KafkaMetadataConfigurationState; -import io.strimzi.operator.cluster.model.KafkaPool; -import io.strimzi.operator.cluster.model.KafkaVersion; -import io.strimzi.operator.cluster.model.MockSharedEnvironmentProvider; -import io.strimzi.operator.cluster.model.SharedEnvironmentProvider; -import io.strimzi.operator.cluster.model.nodepools.NodePoolUtils; -import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; -import io.strimzi.operator.cluster.operator.resource.kubernetes.IngressOperator; -import io.strimzi.operator.cluster.operator.resource.kubernetes.RouteOperator; -import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator; -import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceOperator; -import io.strimzi.operator.common.Reconciliation; -import io.strimzi.operator.common.model.Labels; -import io.strimzi.operator.common.operator.resource.ReconcileResult; -import io.strimzi.platform.KubernetesVersion; -import io.vertx.core.Future; -import io.vertx.junit5.Checkpoint; -import io.vertx.junit5.VertxExtension; -import io.vertx.junit5.VertxTestContext; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.util.List; -import java.util.Map; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.notNull; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(VertxExtension.class) -public class KafkaListenerReconcilerSkipBootstrapLoadBalancerTest { - private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); - private static final SharedEnvironmentProvider SHARED_ENV_PROVIDER = new MockSharedEnvironmentProvider(); - public static final String NAMESPACE = "test"; - public static final String CLUSTER_NAME = "my-kafka"; - public static final String DNS_NAME_FOR_BROKER_0 = "broker-0.test.dns.name"; - public static final String DNS_NAME_FOR_BROKER_1 = "broker-1.test.dns.name"; - public static final String DNS_NAME_FOR_BROKER_2 = "broker-2.test.dns.name"; - public static final String DNS_NAME_FOR_BOOTSTRAP_SERVICE = "bootstrap-broker.test.dns.name"; - public static final int LISTENER_PORT = 9094; - - @Test - public void testLoadBalancerSkipBootstrapService(VertxTestContext context) { - Kafka kafka = new KafkaBuilder() - .withNewMetadata() - .withName(CLUSTER_NAME) - .withNamespace(NAMESPACE) - .endMetadata() - .withNewSpec() - .withNewKafka() - .withReplicas(3) - .withListeners(new GenericKafkaListenerBuilder() - .withName("external") - .withPort(LISTENER_PORT) - .withTls(true) - .withType(KafkaListenerType.LOADBALANCER) - .withNewConfiguration() - .withCreateBootstrapService(false) - .endConfiguration() - .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() - .withNewEntityOperator() - .withNewUserOperator() - .endUserOperator() - .withNewTopicOperator() - .endTopicOperator() - .endEntityOperator() - .endSpec() - .build(); - List pools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafka, null, Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, false, SHARED_ENV_PROVIDER); - KafkaCluster kafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, KafkaMetadataConfigurationState.ZK, null, SHARED_ENV_PROVIDER); - - ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); - - MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( - new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), - kafkaCluster, - new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), - supplier.secretOperations, - supplier.serviceOperations, - supplier.routeOperations, - supplier.ingressOperations - ); - - Checkpoint async = context.checkpoint(); - reconciler.reconcile() - .onComplete(context.succeeding(res -> context.verify(() -> { - // Check status - assertThat(res.listenerStatuses.size(), is(1)); - ListenerStatus listenerStatus = res.listenerStatuses.get(0); - assertThat(listenerStatus.getBootstrapServers(), is("broker-0.test.dns.name:9094,broker-1.test.dns.name:9094,broker-2.test.dns.name:9094")); - assertThat(listenerStatus.getAddresses().size(), is(3)); - assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BROKER_0)); - assertThat(listenerStatus.getAddresses().get(1).getHost(), is(DNS_NAME_FOR_BROKER_1)); - assertThat(listenerStatus.getAddresses().get(2).getHost(), is(DNS_NAME_FOR_BROKER_2)); - assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); - assertThat(listenerStatus.getAddresses().get(1).getPort(), is(LISTENER_PORT)); - assertThat(listenerStatus.getAddresses().get(2).getPort(), is(LISTENER_PORT)); - - // Check creation of services - verify(supplier.serviceOperations, never()).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"), notNull()); - - async.flag(); - }))); - } - - @Test - public void testLoadBalancerWithBootstrapService(VertxTestContext context) { - Kafka kafka = new KafkaBuilder() - .withNewMetadata() - .withName(CLUSTER_NAME) - .withNamespace(NAMESPACE) - .endMetadata() - .withNewSpec() - .withNewKafka() - .withReplicas(3) - .withListeners(new GenericKafkaListenerBuilder() - .withName("external") - .withPort(LISTENER_PORT) - .withTls(true) - .withType(KafkaListenerType.LOADBALANCER) - .withNewConfiguration() - .withCreateBootstrapService(true) - .endConfiguration() - .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() - .withNewEntityOperator() - .withNewUserOperator() - .endUserOperator() - .withNewTopicOperator() - .endTopicOperator() - .endEntityOperator() - .endSpec() - .build(); - List pools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafka, null, Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, false, SHARED_ENV_PROVIDER); - KafkaCluster kafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, KafkaMetadataConfigurationState.ZK, null, SHARED_ENV_PROVIDER); - - ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); - - MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( - new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), - kafkaCluster, - new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), - supplier.secretOperations, - supplier.serviceOperations, - supplier.routeOperations, - supplier.ingressOperations - ); - - Checkpoint async = context.checkpoint(); - reconciler.reconcile() - .onComplete(context.succeeding(res -> context.verify(() -> { - // Check status - assertThat(res.listenerStatuses.size(), is(1)); - ListenerStatus listenerStatus = res.listenerStatuses.get(0); - assertThat(listenerStatus.getBootstrapServers(), is("bootstrap-broker.test.dns.name:9094")); - assertThat(listenerStatus.getAddresses().size(), is(1)); - assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE)); - assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); - - // Check creation of services - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"), notNull()); - - async.flag(); - }))); - } - - - @Test - public void testLoadBalancer(VertxTestContext context) { - Kafka kafka = new KafkaBuilder() - .withNewMetadata() - .withName(CLUSTER_NAME) - .withNamespace(NAMESPACE) - .endMetadata() - .withNewSpec() - .withNewKafka() - .withReplicas(3) - .withListeners(new GenericKafkaListenerBuilder() - .withName("external") - .withPort(LISTENER_PORT) - .withTls(true) - .withType(KafkaListenerType.LOADBALANCER) - .build()) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endKafka() - .withNewZookeeper() - .withReplicas(3) - .withNewEphemeralStorage() - .endEphemeralStorage() - .endZookeeper() - .withNewEntityOperator() - .withNewUserOperator() - .endUserOperator() - .withNewTopicOperator() - .endTopicOperator() - .endEntityOperator() - .endSpec() - .build(); - List pools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafka, null, Map.of(), Map.of(), KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, false, SHARED_ENV_PROVIDER); - KafkaCluster kafkaCluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafka, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_ZOOKEEPER_VERSION_CHANGE, KafkaMetadataConfigurationState.ZK, null, SHARED_ENV_PROVIDER); - - ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); - - MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( - new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME), - kafkaCluster, - new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), - supplier.secretOperations, - supplier.serviceOperations, - supplier.routeOperations, - supplier.ingressOperations - ); - - Checkpoint async = context.checkpoint(); - reconciler.reconcile() - .onComplete(context.succeeding(res -> context.verify(() -> { - // Check status - assertThat(res.listenerStatuses.size(), is(1)); - ListenerStatus listenerStatus = res.listenerStatuses.get(0); - assertThat(listenerStatus.getBootstrapServers(), is("bootstrap-broker.test.dns.name:9094")); - assertThat(listenerStatus.getAddresses().size(), is(1)); - assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE)); - assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); - - // Check creation of services - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"), notNull()); - verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"), notNull()); - - async.flag(); - }))); - } - - private ResourceOperatorSupplier prepareResourceOperatorSupplier() { - ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); - - // Mock the Services for the kafka. - Service mockServiceBootstrap = mock(Service.class, RETURNS_DEEP_STUBS); - Service mockServiceBroker0 = mock(Service.class, RETURNS_DEEP_STUBS); - Service mockServiceBroker1 = mock(Service.class, RETURNS_DEEP_STUBS); - Service mockServiceBroker2 = mock(Service.class, RETURNS_DEEP_STUBS); - when(mockServiceBootstrap.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BOOTSTRAP_SERVICE); - when(mockServiceBroker0.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_0); - when(mockServiceBroker1.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_1); - when(mockServiceBroker2.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_2); - - // Mock the ServiceOperator for the kafka services. - ServiceOperator mockServiceOperator = supplier.serviceOperations; - - // Delegate the batchReconcile call to the real method which calls the other mocked methods. This allows us to better test the exact behavior. - when(mockServiceOperator.batchReconcile(any(), eq(NAMESPACE), any(), any())).thenCallRealMethod(); - - // Mock getting of services and their readiness - when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"))).thenReturn(Future.succeededFuture(mockServiceBootstrap)); - when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"))).thenReturn(Future.succeededFuture(mockServiceBroker0)); - when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"))).thenReturn(Future.succeededFuture(mockServiceBroker1)); - when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"))).thenReturn(Future.succeededFuture(mockServiceBroker2)); - - // Mock listing of services - when(mockServiceOperator.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); - - // Mock service creation / update - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBootstrap))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-0"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker0))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-1"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker1))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-2"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker2))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-brokers"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(new Service()))); - when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-bootstrap"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(new Service()))); - - return supplier; - } - - /** - * Override KafkaListenersReconciler to only run reconciliation steps that concern the Load balancer resources feature - */ - static class MockKafkaListenersReconciler extends KafkaListenersReconciler { - public MockKafkaListenersReconciler( - Reconciliation reconciliation, - KafkaCluster kafka, - PlatformFeaturesAvailability pfa, - SecretOperator secretOperator, - ServiceOperator serviceOperator, - RouteOperator routeOperator, - IngressOperator ingressOperator) { - super(reconciliation, kafka, null, pfa, 300_000L, secretOperator, serviceOperator, routeOperator, ingressOperator); - } - - @Override - public Future reconcile() { - return services() - .compose(i -> loadBalancerServicesReady()) - .compose(i -> Future.succeededFuture(result)); - } - } -} \ No newline at end of file diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerTest.java new file mode 100644 index 00000000000..2efbaddd554 --- /dev/null +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaListenerReconcilerTest.java @@ -0,0 +1,622 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.cluster.operator.assembly; + +import io.fabric8.kubernetes.api.model.Service; +import io.strimzi.api.kafka.model.kafka.Kafka; +import io.strimzi.api.kafka.model.kafka.KafkaBuilder; +import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerConfigurationBroker; +import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerConfigurationBrokerBuilder; +import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType; +import io.strimzi.api.kafka.model.kafka.listener.ListenerStatus; +import io.strimzi.api.kafka.model.nodepool.KafkaNodePool; +import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder; +import io.strimzi.api.kafka.model.nodepool.ProcessRoles; +import io.strimzi.operator.cluster.KafkaVersionTestUtils; +import io.strimzi.operator.cluster.PlatformFeaturesAvailability; +import io.strimzi.operator.cluster.ResourceUtils; +import io.strimzi.operator.cluster.model.KafkaCluster; +import io.strimzi.operator.cluster.model.KafkaMetadataConfigurationState; +import io.strimzi.operator.cluster.model.KafkaVersion; +import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier; +import io.strimzi.operator.cluster.operator.resource.kubernetes.IngressOperator; +import io.strimzi.operator.cluster.operator.resource.kubernetes.RouteOperator; +import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator; +import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceOperator; +import io.strimzi.operator.common.Annotations; +import io.strimzi.operator.common.Reconciliation; +import io.strimzi.operator.common.model.Labels; +import io.strimzi.operator.common.operator.resource.ReconcileResult; +import io.strimzi.platform.KubernetesVersion; +import io.vertx.core.Future; +import io.vertx.junit5.Checkpoint; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(VertxExtension.class) +public class KafkaListenerReconcilerTest { + private static final KafkaVersion.Lookup VERSIONS = KafkaVersionTestUtils.getKafkaVersionLookup(); + public static final String NAMESPACE = "test"; + public static final String CLUSTER_NAME = "my-kafka"; + public static final String DNS_NAME_FOR_BROKER_10 = "broker-10.test.dns.name"; + public static final String DNS_NAME_FOR_BROKER_11 = "broker-11.test.dns.name"; + public static final String DNS_NAME_FOR_BROKER_12 = "broker-12.test.dns.name"; + public static final String DNS_NAME_FOR_BOOTSTRAP_SERVICE = "bootstrap.test.dns.name"; + public static final int LISTENER_PORT = 9094; + private static final Kafka KAFKA = new KafkaBuilder() + .withNewMetadata() + .withName(CLUSTER_NAME) + .withAnnotations(Map.of( + Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled", + Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled" + )) + .endMetadata() + .withNewSpec() + .withNewKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("plain") + .withPort(9092) + .withType(KafkaListenerType.INTERNAL) + .withTls(false) + .build()) + .endKafka() + .withNewEntityOperator() + .withNewTopicOperator() + .endTopicOperator() + .withNewUserOperator() + .endUserOperator() + .endEntityOperator() + .endSpec() + .build(); + private static final KafkaNodePool POOL_CONTROLLERS = new KafkaNodePoolBuilder() + .withNewMetadata() + .withName("controllers") + .withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, CLUSTER_NAME)) + .withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NEXT_NODE_IDS, "[0-9]")) + .endMetadata() + .withNewSpec() + .withReplicas(3) + .withNewJbodStorage() + .withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").withStorageClass("gp99").build()) + .endJbodStorage() + .withRoles(ProcessRoles.CONTROLLER) + .endSpec() + .build(); + private static final KafkaNodePool POOL_BROKERS = new KafkaNodePoolBuilder() + .withNewMetadata() + .withName("brokers") + .withLabels(Map.of(Labels.STRIMZI_CLUSTER_LABEL, CLUSTER_NAME)) + .withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NEXT_NODE_IDS, "[10-99]")) + .endMetadata() + .withNewSpec() + .withReplicas(3) + .withNewJbodStorage() + .withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").withStorageClass("gp99").build()) + .endJbodStorage() + .withRoles(ProcessRoles.BROKER) + .endSpec() + .build(); + + @Test + public void testClusterIpWithoutTLS(VertxTestContext context) { + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("external") + .withPort(LISTENER_PORT) + .withTls(false) + .withType(KafkaListenerType.CLUSTER_IP) + .withNewConfiguration() + .withCreateBootstrapService(true) + .endConfiguration() + .build()) + .endKafka() + .endSpec() + .build(); + + ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME); + + KafkaCluster kafkaCluster = KafkaClusterCreator.createKafkaCluster( + reconciliation, + kafka, + List.of(POOL_CONTROLLERS, POOL_BROKERS), + Map.of(), + Map.of(), + KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, + KafkaMetadataConfigurationState.KRAFT, + VERSIONS, + supplier.sharedEnvironmentProvider + ); + + MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( + reconciliation, + kafkaCluster, + new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), + supplier.secretOperations, + supplier.serviceOperations, + supplier.routeOperations, + supplier.ingressOperations + ); + + Checkpoint async = context.checkpoint(); + reconciler.reconcile() + .onComplete(context.succeeding(res -> context.verify(() -> { + // Check status + assertThat(res.listenerStatuses.size(), is(1)); + ListenerStatus listenerStatus = res.listenerStatuses.get(0); + assertThat(listenerStatus.getBootstrapServers(), is("my-kafka-kafka-external-bootstrap.test.svc:9094")); + assertThat(listenerStatus.getAddresses().size(), is(1)); + assertThat(listenerStatus.getAddresses().get(0).getHost(), is("my-kafka-kafka-external-bootstrap.test.svc")); + assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); + + //check hostnames + assertThat(res.bootstrapDnsNames.size(), is(0)); + assertThat(res.brokerDnsNames.size(), is(0)); + Set allBrokersAdvertisedHostNames = res.advertisedHostnames.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()); + assertThat(allBrokersAdvertisedHostNames.size(), is(3)); + assertThat(allBrokersAdvertisedHostNames, hasItems("my-kafka-brokers-11.test.svc", "my-kafka-brokers-12.test.svc", "my-kafka-brokers-10.test.svc")); + assertThat(res.advertisedPorts.size(), is(3)); + assertThat(res.advertisedPorts.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()), hasItems("9094")); + + // Check creation of services + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-10"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-11"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-12"), notNull()); + + async.flag(); + }))); + } + + @Test + public void testClusterIpWithTLS(VertxTestContext context) { + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("external") + .withPort(LISTENER_PORT) + .withTls(true) + .withType(KafkaListenerType.CLUSTER_IP) + .withNewConfiguration() + .withCreateBootstrapService(true) + .endConfiguration() + .build()) + .endKafka() + .endSpec() + .build(); + + ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME); + + KafkaCluster kafkaCluster = KafkaClusterCreator.createKafkaCluster( + reconciliation, + kafka, + List.of(POOL_CONTROLLERS, POOL_BROKERS), + Map.of(), + Map.of(), + KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, + KafkaMetadataConfigurationState.KRAFT, + VERSIONS, + supplier.sharedEnvironmentProvider + ); + + MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( + reconciliation, + kafkaCluster, + new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), + supplier.secretOperations, + supplier.serviceOperations, + supplier.routeOperations, + supplier.ingressOperations + ); + + Checkpoint async = context.checkpoint(); + reconciler.reconcile() + .onComplete(context.succeeding(res -> context.verify(() -> { + // Check status + assertThat(res.listenerStatuses.size(), is(1)); + ListenerStatus listenerStatus = res.listenerStatuses.get(0); + assertThat(listenerStatus.getBootstrapServers(), is("my-kafka-kafka-external-bootstrap.test.svc:9094")); + assertThat(listenerStatus.getAddresses().size(), is(1)); + assertThat(listenerStatus.getAddresses().get(0).getHost(), is("my-kafka-kafka-external-bootstrap.test.svc")); + assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); + + //check hostnames + assertThat(res.bootstrapDnsNames.size(), is(4)); + assertThat(res.bootstrapDnsNames, hasItems("my-kafka-kafka-external-bootstrap", "my-kafka-kafka-external-bootstrap.test", "my-kafka-kafka-external-bootstrap.test.svc", "my-kafka-kafka-external-bootstrap.test.svc.cluster.local")); + Set allBrokersDnsNames = res.brokerDnsNames.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + assertThat(allBrokersDnsNames.size(), is(3)); + assertThat(allBrokersDnsNames, hasItems("my-kafka-brokers-11.test.svc", "my-kafka-brokers-12.test.svc", "my-kafka-brokers-10.test.svc")); + Set allBrokersAdvertisedHostNames = res.advertisedHostnames.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()); + assertThat(allBrokersAdvertisedHostNames.size(), is(3)); + assertThat(allBrokersAdvertisedHostNames, hasItems("my-kafka-brokers-11.test.svc", "my-kafka-brokers-12.test.svc", "my-kafka-brokers-10.test.svc")); + assertThat(res.advertisedPorts.size(), is(3)); + assertThat(res.advertisedPorts.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()), hasItems("9094")); + + // Check creation of services + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-10"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-11"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-12"), notNull()); + + async.flag(); + }))); + } + + + @Test + public void testClusterIpWithCustomBrokerHosts(VertxTestContext context) { + GenericKafkaListenerConfigurationBroker broker0 = new GenericKafkaListenerConfigurationBrokerBuilder() + .withBroker(10) + .withAdvertisedHost("my-address-0") + .build(); + + GenericKafkaListenerConfigurationBroker broker1 = new GenericKafkaListenerConfigurationBrokerBuilder() + .withBroker(11) + .withAdvertisedHost("my-address-1") + .build(); + + GenericKafkaListenerConfigurationBroker broker2 = new GenericKafkaListenerConfigurationBrokerBuilder() + .withBroker(12) + .withAdvertisedHost("my-address-2") + .build(); + + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("external") + .withPort(LISTENER_PORT) + .withTls(true) + .withType(KafkaListenerType.CLUSTER_IP) + .withNewConfiguration() + .withCreateBootstrapService(true) + .withBrokers(broker0, broker1, broker2) + .endConfiguration() + .build()) + .endKafka() + .endSpec() + .build(); + + ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME); + + KafkaCluster kafkaCluster = KafkaClusterCreator.createKafkaCluster( + reconciliation, + kafka, + List.of(POOL_CONTROLLERS, POOL_BROKERS), + Map.of(), + Map.of(), + KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, + KafkaMetadataConfigurationState.KRAFT, + VERSIONS, + supplier.sharedEnvironmentProvider + ); + + MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( + reconciliation, + kafkaCluster, + new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), + supplier.secretOperations, + supplier.serviceOperations, + supplier.routeOperations, + supplier.ingressOperations + ); + + Checkpoint async = context.checkpoint(); + reconciler.reconcile() + .onComplete(context.succeeding(res -> context.verify(() -> { + // Check status + assertThat(res.listenerStatuses.size(), is(1)); + ListenerStatus listenerStatus = res.listenerStatuses.get(0); + assertThat(listenerStatus.getBootstrapServers(), is("my-kafka-kafka-external-bootstrap.test.svc:9094")); + assertThat(listenerStatus.getAddresses().size(), is(1)); + assertThat(listenerStatus.getAddresses().get(0).getHost(), is("my-kafka-kafka-external-bootstrap.test.svc")); + assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); + + //check hostnames + assertThat(res.bootstrapDnsNames.size(), is(4)); + assertThat(res.bootstrapDnsNames, hasItems("my-kafka-kafka-external-bootstrap", "my-kafka-kafka-external-bootstrap.test", "my-kafka-kafka-external-bootstrap.test.svc", "my-kafka-kafka-external-bootstrap.test.svc.cluster.local")); + Set allBrokersDnsNames = res.brokerDnsNames.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + assertThat(allBrokersDnsNames.size(), is(6)); + assertThat(allBrokersDnsNames, hasItems("my-address-0", "my-address-1", "my-address-2", "my-kafka-brokers-11.test.svc", "my-kafka-brokers-12.test.svc", "my-kafka-brokers-10.test.svc")); + Set allBrokersAdvertisedHostNames = res.advertisedHostnames.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()); + assertThat(allBrokersAdvertisedHostNames.size(), is(3)); + assertThat(allBrokersAdvertisedHostNames, hasItems("my-address-0", "my-address-1", "my-address-2")); + assertThat(res.advertisedPorts.size(), is(3)); + assertThat(res.advertisedPorts.values().stream().flatMap(s -> s.values().stream()).collect(Collectors.toSet()), hasItems("9094")); + // Check creation of services + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-10"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-11"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-12"), notNull()); + + async.flag(); + }))); + } + + @Test + public void testLoadBalancerSkipBootstrapService(VertxTestContext context) { + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("external") + .withPort(LISTENER_PORT) + .withTls(true) + .withType(KafkaListenerType.LOADBALANCER) + .withNewConfiguration() + .withCreateBootstrapService(false) + .endConfiguration() + .build()) + .endKafka() + .endSpec() + .build(); + + ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME); + + KafkaCluster kafkaCluster = KafkaClusterCreator.createKafkaCluster( + reconciliation, + kafka, + List.of(POOL_CONTROLLERS, POOL_BROKERS), + Map.of(), + Map.of(), + KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, + KafkaMetadataConfigurationState.KRAFT, + VERSIONS, + supplier.sharedEnvironmentProvider + ); + + MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( + reconciliation, + kafkaCluster, + new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), + supplier.secretOperations, + supplier.serviceOperations, + supplier.routeOperations, + supplier.ingressOperations + ); + + Checkpoint async = context.checkpoint(); + reconciler.reconcile() + .onComplete(context.succeeding(res -> context.verify(() -> { + // Check status + assertThat(res.listenerStatuses.size(), is(1)); + ListenerStatus listenerStatus = res.listenerStatuses.get(0); + assertThat(listenerStatus.getBootstrapServers(), is("broker-10.test.dns.name:9094,broker-11.test.dns.name:9094,broker-12.test.dns.name:9094")); + assertThat(listenerStatus.getAddresses().size(), is(3)); + assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BROKER_10)); + assertThat(listenerStatus.getAddresses().get(1).getHost(), is(DNS_NAME_FOR_BROKER_11)); + assertThat(listenerStatus.getAddresses().get(2).getHost(), is(DNS_NAME_FOR_BROKER_12)); + assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); + assertThat(listenerStatus.getAddresses().get(1).getPort(), is(LISTENER_PORT)); + assertThat(listenerStatus.getAddresses().get(2).getPort(), is(LISTENER_PORT)); + + // Check creation of services + verify(supplier.serviceOperations, never()).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-10"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-11"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-12"), notNull()); + + async.flag(); + }))); + } + + @Test + public void testLoadBalancerWithBootstrapService(VertxTestContext context) { + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("external") + .withPort(LISTENER_PORT) + .withTls(true) + .withType(KafkaListenerType.LOADBALANCER) + .withNewConfiguration() + .withCreateBootstrapService(true) + .endConfiguration() + .build()) + .endKafka() + .endSpec() + .build(); + + ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME); + + KafkaCluster kafkaCluster = KafkaClusterCreator.createKafkaCluster( + reconciliation, + kafka, + List.of(POOL_CONTROLLERS, POOL_BROKERS), + Map.of(), + Map.of(), + KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, + KafkaMetadataConfigurationState.KRAFT, + VERSIONS, + supplier.sharedEnvironmentProvider + ); + + MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( + reconciliation, + kafkaCluster, + new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), + supplier.secretOperations, + supplier.serviceOperations, + supplier.routeOperations, + supplier.ingressOperations + ); + + Checkpoint async = context.checkpoint(); + reconciler.reconcile() + .onComplete(context.succeeding(res -> context.verify(() -> { + // Check status + assertThat(res.listenerStatuses.size(), is(1)); + ListenerStatus listenerStatus = res.listenerStatuses.get(0); + assertThat(listenerStatus.getBootstrapServers(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE + ":9094")); + assertThat(listenerStatus.getAddresses().size(), is(1)); + assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE)); + assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); + + // Check creation of services + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-10"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-11"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-12"), notNull()); + + async.flag(); + }))); + } + + + @Test + public void testLoadBalancer(VertxTestContext context) { + Kafka kafka = new KafkaBuilder(KAFKA) + .editSpec() + .editKafka() + .withListeners(new GenericKafkaListenerBuilder() + .withName("external") + .withPort(LISTENER_PORT) + .withTls(true) + .withType(KafkaListenerType.LOADBALANCER) + .build()) + .endKafka() + .endSpec() + .build(); + + ResourceOperatorSupplier supplier = prepareResourceOperatorSupplier(); + Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, CLUSTER_NAME); + + KafkaCluster kafkaCluster = KafkaClusterCreator.createKafkaCluster( + reconciliation, + kafka, + List.of(POOL_CONTROLLERS, POOL_BROKERS), + Map.of(), + Map.of(), + KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, + KafkaMetadataConfigurationState.KRAFT, + VERSIONS, + supplier.sharedEnvironmentProvider + ); + + MockKafkaListenersReconciler reconciler = new MockKafkaListenersReconciler( + reconciliation, + kafkaCluster, + new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), + supplier.secretOperations, + supplier.serviceOperations, + supplier.routeOperations, + supplier.ingressOperations + ); + + Checkpoint async = context.checkpoint(); + reconciler.reconcile() + .onComplete(context.succeeding(res -> context.verify(() -> { + // Check status + assertThat(res.listenerStatuses.size(), is(1)); + ListenerStatus listenerStatus = res.listenerStatuses.get(0); + assertThat(listenerStatus.getBootstrapServers(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE + ":9094")); + assertThat(listenerStatus.getAddresses().size(), is(1)); + assertThat(listenerStatus.getAddresses().get(0).getHost(), is(DNS_NAME_FOR_BOOTSTRAP_SERVICE)); + assertThat(listenerStatus.getAddresses().get(0).getPort(), is(LISTENER_PORT)); + + // Check creation of services + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-10"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-11"), notNull()); + verify(supplier.serviceOperations, times(1)).reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-12"), notNull()); + + async.flag(); + }))); + } + + private ResourceOperatorSupplier prepareResourceOperatorSupplier() { + ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false); + + // Mock the Services for the kafka. + Service mockServiceBootstrap = mock(Service.class, RETURNS_DEEP_STUBS); + Service mockServiceBroker0 = mock(Service.class, RETURNS_DEEP_STUBS); + Service mockServiceBroker1 = mock(Service.class, RETURNS_DEEP_STUBS); + Service mockServiceBroker2 = mock(Service.class, RETURNS_DEEP_STUBS); + when(mockServiceBootstrap.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BOOTSTRAP_SERVICE); + when(mockServiceBroker0.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_10); + when(mockServiceBroker1.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_11); + when(mockServiceBroker2.getStatus().getLoadBalancer().getIngress().get(0).getHostname()).thenReturn(DNS_NAME_FOR_BROKER_12); + + // Mock the ServiceOperator for the kafka services. + ServiceOperator mockServiceOperator = supplier.serviceOperations; + + // Delegate the batchReconcile call to the real method which calls the other mocked methods. This allows us to better test the exact behavior. + when(mockServiceOperator.batchReconcile(any(), eq(NAMESPACE), any(), any())).thenCallRealMethod(); + + // Mock getting of services and their readiness + when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"))).thenReturn(Future.succeededFuture(mockServiceBootstrap)); + when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-10"))).thenReturn(Future.succeededFuture(mockServiceBroker0)); + when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-11"))).thenReturn(Future.succeededFuture(mockServiceBroker1)); + when(mockServiceOperator.getAsync(eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-12"))).thenReturn(Future.succeededFuture(mockServiceBroker2)); + + // Mock listing of services + when(mockServiceOperator.listAsync(eq(NAMESPACE), any(Labels.class))).thenReturn(Future.succeededFuture(List.of())); + + // Mock service creation / update + when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-external-bootstrap"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBootstrap))); + when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-10"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker0))); + when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-11"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker1))); + when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-brokers-12"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(mockServiceBroker2))); + when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-brokers"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(new Service()))); + when(mockServiceOperator.reconcile(any(), eq(NAMESPACE), eq(CLUSTER_NAME + "-kafka-bootstrap"), any())).thenReturn(Future.succeededFuture(ReconcileResult.created(new Service()))); + + return supplier; + } + + /** + * Override KafkaListenersReconciler to only run reconciliation steps that concern the Load balancer resources feature + */ + static class MockKafkaListenersReconciler extends KafkaListenersReconciler { + public MockKafkaListenersReconciler( + Reconciliation reconciliation, + KafkaCluster kafka, + PlatformFeaturesAvailability pfa, + SecretOperator secretOperator, + ServiceOperator serviceOperator, + RouteOperator routeOperator, + IngressOperator ingressOperator) { + super(reconciliation, kafka, null, pfa, 300_000L, secretOperator, serviceOperator, routeOperator, ingressOperator); + } + + @Override + public Future reconcile() { + return services() + .compose(i -> clusterIPServicesReady()) + .compose(i -> loadBalancerServicesReady()) + .compose(i -> Future.succeededFuture(result)); + } + } +} \ No newline at end of file From fa2afb786b30fd90d599122a173535f1c02f5821 Mon Sep 17 00:00:00 2001 From: Jakub Scholz Date: Thu, 8 Aug 2024 17:17:14 +0200 Subject: [PATCH 2/2] Fix indentation Signed-off-by: Jakub Scholz --- .../cluster/operator/assembly/KafkaClusterCreatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorTest.java index e04a8be6aae..e7ff0e9b179 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaClusterCreatorTest.java @@ -230,7 +230,7 @@ public void testNewClusterWithKRaft(VertxTestContext context) { }))); } - @Test + @Test public void testNewClusterWithMixedNodesKRaft(VertxTestContext context) { ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false);