From f3257f30cdcf947bd0f9034702bc268a53ec7398 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Tue, 23 Jul 2024 19:26:42 +0200 Subject: [PATCH 1/6] Try to fix testPauseReconciliationInKafkaRebalanceAndTopic Signed-off-by: Jakub Stejskal --- .../java/io/strimzi/systemtest/operators/ReconciliationST.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/ReconciliationST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/ReconciliationST.java index c06ba2f142f..f39e0ab28e2 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/ReconciliationST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/ReconciliationST.java @@ -152,7 +152,7 @@ void testPauseReconciliationInKafkaRebalanceAndTopic() { ) ); resourceManager.createResourceWithWait( - KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 3, 1).build(), + KafkaTemplates.kafkaWithCruiseControlTunedForFastModelGeneration(testStorage.getClusterName(), 3, 1).build(), ScraperTemplates.scraperPod(testStorage.getNamespaceName(), testStorage.getScraperName()).build() ); From 13a9af0549a7e952c079379a8dd3002a32164522 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Wed, 24 Jul 2024 10:36:54 +0200 Subject: [PATCH 2/6] Change wait for KafkaRebalance to allow Ready, ProposalReady, PendingProposal states Signed-off-by: Jakub Stejskal --- .../resources/crd/KafkaRebalanceResource.java | 5 +- .../utils/kafkaUtils/KafkaRebalanceUtils.java | 89 ++++++++++++------- .../cruisecontrol/CruiseControlST.java | 19 ++-- .../operators/MultipleClusterOperatorsST.java | 3 +- .../operators/ReconciliationST.java | 8 +- 5 files changed, 70 insertions(+), 54 deletions(-) diff --git a/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaRebalanceResource.java b/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaRebalanceResource.java index 9b262eccccc..251829d72e6 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaRebalanceResource.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/resources/crd/KafkaRebalanceResource.java @@ -10,7 +10,6 @@ import io.strimzi.api.kafka.Crds; import io.strimzi.api.kafka.model.rebalance.KafkaRebalance; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceList; -import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState; import io.strimzi.systemtest.resources.ResourceManager; import io.strimzi.systemtest.resources.ResourceType; import io.strimzi.systemtest.utils.kafkaUtils.KafkaRebalanceUtils; @@ -47,8 +46,8 @@ public void update(KafkaRebalance resource) { @Override public boolean waitForReadiness(KafkaRebalance resource) { - return KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(resource.getMetadata().getNamespace(), resource.getMetadata().getName(), KafkaRebalanceState.PendingProposal); - } + return KafkaRebalanceUtils.waitForKafkaRebalanceReadiness(resource); + }; public static MixedOperation> kafkaRebalanceClient() { return Crds.kafkaRebalanceOperation(ResourceManager.kubeClient().getClient()); diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaRebalanceUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaRebalanceUtils.java index ef10681f9e7..6b29e62b47c 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaRebalanceUtils.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaRebalanceUtils.java @@ -10,13 +10,13 @@ import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceStatus; import io.strimzi.operator.common.Annotations; -import io.strimzi.operator.common.Reconciliation; -import io.strimzi.operator.common.ReconciliationLogger; import io.strimzi.systemtest.TestConstants; import io.strimzi.systemtest.resources.ResourceManager; import io.strimzi.systemtest.resources.ResourceOperation; import io.strimzi.systemtest.resources.crd.KafkaRebalanceResource; import io.strimzi.test.TestUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Arrays; import java.util.Collections; @@ -27,11 +27,11 @@ public class KafkaRebalanceUtils { - private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaRebalanceUtils.class); + private static final Logger LOGGER = LogManager.getLogger(KafkaRebalanceUtils.class); private KafkaRebalanceUtils() {} - private static Condition rebalanceStateCondition(Reconciliation reconciliation, String namespaceName, String resourceName) { + private static Condition rebalanceStateCondition(String namespaceName, String resourceName) { List statusConditions = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName) .withName(resourceName).get().getStatus().getConditions().stream() @@ -42,10 +42,10 @@ private static Condition rebalanceStateCondition(Reconciliation reconciliation, if (statusConditions.size() == 1) { return statusConditions.get(0); } else if (statusConditions.size() > 1) { - LOGGER.warnCr(reconciliation, "Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status"); + LOGGER.warn("Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status"); throw new RuntimeException("Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status"); } else { - LOGGER.warnCr(reconciliation, "No KafkaRebalance State Conditions were present in the KafkaRebalance status"); + LOGGER.warn("No KafkaRebalance State Conditions were present in the KafkaRebalance status"); throw new RuntimeException("No KafkaRebalance State Conditions were present in the KafkaRebalance status"); } } @@ -59,68 +59,68 @@ public static boolean waitForKafkaRebalanceCustomResourceState(String resourceNa return waitForKafkaRebalanceCustomResourceState(kubeClient().getNamespace(), resourceName, state); } - public static String annotateKafkaRebalanceResource(Reconciliation reconciliation, String namespaceName, String resourceName, KafkaRebalanceAnnotation annotation) { - LOGGER.infoCr(reconciliation, "Annotating KafkaRebalance: {} with annotation: {}", resourceName, annotation.toString()); + public static String annotateKafkaRebalanceResource(String namespaceName, String resourceName, KafkaRebalanceAnnotation annotation) { + LOGGER.info("Annotating KafkaRebalance: {} with annotation: {}", resourceName, annotation.toString()); return ResourceManager.cmdKubeClient().namespace(namespaceName) .execInCurrentNamespace("annotate", "kafkarebalance", resourceName, Annotations.ANNO_STRIMZI_IO_REBALANCE + "=" + annotation.toString()) .out() .trim(); } - public static String annotateKafkaRebalanceResource(Reconciliation reconciliation, String resourceName, KafkaRebalanceAnnotation annotation) { - return annotateKafkaRebalanceResource(reconciliation, kubeClient().getNamespace(), resourceName, annotation); + public static String annotateKafkaRebalanceResource(String resourceName, KafkaRebalanceAnnotation annotation) { + return annotateKafkaRebalanceResource(kubeClient().getNamespace(), resourceName, annotation); } - public static void doRebalancingProcessWithAutoApproval(Reconciliation reconciliation, String namespaceName, String rebalanceName) { - doRebalancingProcess(reconciliation, namespaceName, rebalanceName, true); + public static void doRebalancingProcessWithAutoApproval(String namespaceName, String rebalanceName) { + doRebalancingProcess(namespaceName, rebalanceName, true); } - public static void doRebalancingProcess(Reconciliation reconciliation, String namespaceName, String rebalanceName) { - doRebalancingProcess(reconciliation, namespaceName, rebalanceName, false); + public static void doRebalancingProcess(String namespaceName, String rebalanceName) { + doRebalancingProcess(namespaceName, rebalanceName, false); } - public static void doRebalancingProcess(Reconciliation reconciliation, String namespaceName, String rebalanceName, boolean autoApproval) { - LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "="))); - LOGGER.infoCr(reconciliation, KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType()); - LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "="))); + public static void doRebalancingProcess(String namespaceName, String rebalanceName, boolean autoApproval) { + LOGGER.info(String.join("", Collections.nCopies(76, "="))); + LOGGER.info(KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType()); + LOGGER.info(String.join("", Collections.nCopies(76, "="))); if (!autoApproval) { // it can sometimes happen that KafkaRebalance is already in the ProposalReady state -> race condition prevention - if (!rebalanceStateCondition(reconciliation, namespaceName, rebalanceName).getType().equals(KafkaRebalanceState.ProposalReady.name())) { - LOGGER.infoCr(reconciliation, "Verifying that KafkaRebalance resource is in {} state", KafkaRebalanceState.ProposalReady); + if (!rebalanceStateCondition(namespaceName, rebalanceName).getType().equals(KafkaRebalanceState.ProposalReady.name())) { + LOGGER.info("Verifying that KafkaRebalance resource is in {} state", KafkaRebalanceState.ProposalReady); waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.ProposalReady); } - LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "="))); - LOGGER.infoCr(reconciliation, KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType()); - LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "="))); + LOGGER.info(String.join("", Collections.nCopies(76, "="))); + LOGGER.info(KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType()); + LOGGER.info(String.join("", Collections.nCopies(76, "="))); // using automatic-approval annotation final KafkaRebalance kr = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get(); if (kr.getMetadata().getAnnotations().containsKey(Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL) && kr.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL).equals("true")) { - LOGGER.infoCr(reconciliation, "Triggering the rebalance automatically (because Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL is set to true) " + + LOGGER.info("Triggering the rebalance automatically (because Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL is set to true) " + "without an annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve"); } else { - LOGGER.infoCr(reconciliation, "Triggering the rebalance with annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve"); + LOGGER.info("Triggering the rebalance with annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve"); - String response = annotateKafkaRebalanceResource(reconciliation, namespaceName, rebalanceName, KafkaRebalanceAnnotation.approve); + String response = annotateKafkaRebalanceResource(namespaceName, rebalanceName, KafkaRebalanceAnnotation.approve); - LOGGER.infoCr(reconciliation, "Response from the annotation process {}", response); + LOGGER.info("Response from the annotation process {}", response); } } - LOGGER.infoCr(reconciliation, "Verifying that annotation triggers the {} state", KafkaRebalanceState.Rebalancing); + LOGGER.info("Verifying that annotation triggers the {} state", KafkaRebalanceState.Rebalancing); waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.Rebalancing); - LOGGER.infoCr(reconciliation, "Verifying that KafkaRebalance is in the {} state", KafkaRebalanceState.Ready); + LOGGER.info("Verifying that KafkaRebalance is in the {} state", KafkaRebalanceState.Ready); waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.Ready); } - public static void waitForRebalanceStatusStability(Reconciliation reconciliation, String namespaceName, String resourceName) { + public static void waitForRebalanceStatusStability(String namespaceName, String resourceName) { int[] stableCounter = {0}; KafkaRebalanceStatus oldStatus = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(resourceName).get().getStatus(); @@ -129,16 +129,39 @@ public static void waitForRebalanceStatusStability(Reconciliation reconciliation if (KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(resourceName).get().getStatus().equals(oldStatus)) { stableCounter[0]++; if (stableCounter[0] == TestConstants.GLOBAL_STABILITY_OFFSET_COUNT) { - LOGGER.infoCr(reconciliation, "KafkaRebalance status is stable for: {} poll intervals", stableCounter[0]); + LOGGER.info("KafkaRebalance status is stable for: {} poll intervals", stableCounter[0]); return true; } } else { - LOGGER.infoCr(reconciliation, "KafkaRebalance status is not stable. Going to set the counter to zero"); + LOGGER.info("KafkaRebalance status is not stable. Going to set the counter to zero"); stableCounter[0] = 0; return false; } - LOGGER.infoCr(reconciliation, "KafkaRebalance status gonna be stable in {} polls", TestConstants.GLOBAL_STABILITY_OFFSET_COUNT - stableCounter[0]); + LOGGER.info("KafkaRebalance status gonna be stable in {} polls", TestConstants.GLOBAL_STABILITY_OFFSET_COUNT - stableCounter[0]); return false; }); } + + public static boolean waitForKafkaRebalanceReadiness(KafkaRebalance resource) { + List readyConditions = Arrays.asList(KafkaRebalanceState.PendingProposal.toString(), KafkaRebalanceState.ProposalReady.toString(), KafkaRebalanceState.Ready.toString()); + LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "Waiting for {}: {}/{} will have desired ready state ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), readyConditions); + + TestUtils.waitFor(String.format("%s: %s#%s will have desired state: %s", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), KafkaRebalanceState.Ready), + TestConstants.GLOBAL_POLL_INTERVAL, ResourceOperation.getTimeoutForKafkaRebalanceState(KafkaRebalanceState.Ready), + () -> { + List actualConditions = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(resource.getMetadata().getNamespace()).resource(resource).get().getStatus().getConditions().stream().map(Condition::getType).toList(); + + for (String condition: actualConditions) { + if (readyConditions.contains(condition)) { + LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is in ready state ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), condition); + return true; + } + } + LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is not ready ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), actualConditions); + + return false; + }); + + return true; + } } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java index 134c0648c03..02cb62d4836 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java @@ -14,13 +14,11 @@ import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlResources; import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; -import io.strimzi.api.kafka.model.rebalance.KafkaRebalance; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceAnnotation; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceMode; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceStatus; import io.strimzi.operator.common.Annotations; -import io.strimzi.operator.common.Reconciliation; import io.strimzi.systemtest.AbstractST; import io.strimzi.systemtest.Environment; import io.strimzi.systemtest.TestConstants; @@ -202,14 +200,14 @@ void testCruiseControlWithRebalanceResourceAndRefreshAnnotation() { RollingUpdateUtils.waitTillComponentHasRolled(Environment.TEST_SUITE_NAMESPACE, testStorage.getBrokerSelector(), 3, brokerPods); - KafkaRebalanceUtils.doRebalancingProcess(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()), Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()); + KafkaRebalanceUtils.doRebalancingProcess(Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()); LOGGER.info("Annotating KafkaRebalance: {} with 'refresh' anno", testStorage.getClusterName()); - KafkaRebalanceUtils.annotateKafkaRebalanceResource(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()), Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName(), KafkaRebalanceAnnotation.refresh); + KafkaRebalanceUtils.annotateKafkaRebalanceResource(Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName(), KafkaRebalanceAnnotation.refresh); KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName(), KafkaRebalanceState.ProposalReady); LOGGER.info("Trying rebalancing process again"); - KafkaRebalanceUtils.doRebalancingProcess(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()), Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()); + KafkaRebalanceUtils.doRebalancingProcess(Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()); } @IsolatedTest @@ -233,7 +231,7 @@ void testCruiseControlChangesFromRebalancingtoProposalReadyWhenSpecUpdated() { KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(clusterOperator.getDeploymentNamespace(), testStorage.getClusterName(), KafkaRebalanceState.ProposalReady); LOGGER.info("Annotating KafkaRebalance: {} with 'approve' anno", testStorage.getClusterName()); - KafkaRebalanceUtils.annotateKafkaRebalanceResource(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, clusterOperator.getDeploymentNamespace(), testStorage.getClusterName()), clusterOperator.getDeploymentNamespace(), testStorage.getClusterName(), KafkaRebalanceAnnotation.approve); + KafkaRebalanceUtils.annotateKafkaRebalanceResource(clusterOperator.getDeploymentNamespace(), testStorage.getClusterName(), KafkaRebalanceAnnotation.approve); // updating the KafkaRebalance resource by configuring replication throttle KafkaRebalanceResource.replaceKafkaRebalanceResourceInSpecificNamespace(testStorage.getClusterName(), kafkaRebalance -> kafkaRebalance.getSpec().setReplicationThrottle(100000), clusterOperator.getDeploymentNamespace()); @@ -314,7 +312,7 @@ void testCruiseControlTopicExclusion() { assertThat(kafkaRebalanceStatus.getOptimizationResult().get("excludedTopics").toString(), containsString(excludedTopic2)); assertThat(kafkaRebalanceStatus.getOptimizationResult().get("excludedTopics").toString(), not(containsString(includedTopic))); - KafkaRebalanceUtils.annotateKafkaRebalanceResource(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceAnnotation.approve); + KafkaRebalanceUtils.annotateKafkaRebalanceResource(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceAnnotation.approve); KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceState.Ready); } @@ -467,7 +465,7 @@ void testCruiseControlDuringBrokerScaleUpAndDown() { ); KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceState.ProposalReady); - KafkaRebalanceUtils.doRebalancingProcess(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, testStorage.getNamespaceName(), testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName()); + KafkaRebalanceUtils.doRebalancingProcess(testStorage.getNamespaceName(), testStorage.getClusterName()); KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(testStorage.getNamespaceName()).withName(testStorage.getClusterName()).delete(); LOGGER.info("Checking that Topic: {} has replicas on one of the new brokers (or both)", testStorage.getTopicName()); @@ -487,7 +485,7 @@ void testCruiseControlDuringBrokerScaleUpAndDown() { ); KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceState.ProposalReady); - KafkaRebalanceUtils.doRebalancingProcess(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, testStorage.getNamespaceName(), testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName()); + KafkaRebalanceUtils.doRebalancingProcess(testStorage.getNamespaceName(), testStorage.getClusterName()); LOGGER.info("Checking that Topic: {} has replicas only on first 3 brokers", testStorage.getTopicName()); topicReplicas = KafkaTopicUtils.getKafkaTopicReplicasForEachPartition(testStorage.getNamespaceName(), testStorage.getTopicName(), scraperPodName, KafkaResources.plainBootstrapAddress(testStorage.getClusterName())); @@ -525,8 +523,7 @@ void testKafkaRebalanceAutoApprovalMechanism() { .build() ); - KafkaRebalanceUtils.doRebalancingProcessWithAutoApproval(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, - testStorage.getNamespaceName(), testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName()); + KafkaRebalanceUtils.doRebalancingProcessWithAutoApproval(testStorage.getNamespaceName(), testStorage.getClusterName()); } @BeforeAll diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java index 603c75935db..2b52db12435 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java @@ -13,7 +13,6 @@ import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlResources; import io.strimzi.api.kafka.model.rebalance.KafkaRebalance; import io.strimzi.operator.common.Annotations; -import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.Labels; import io.strimzi.systemtest.AbstractST; import io.strimzi.systemtest.Environment; @@ -348,7 +347,7 @@ void testKafkaCCAndRebalanceWithMultipleCOs() { KafkaUtils.waitForClusterStability(testStorage.getNamespaceName(), testStorage.getClusterName()); - KafkaRebalanceUtils.doRebalancingProcess(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, testStorage.getNamespaceName(), testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName()); + KafkaRebalanceUtils.doRebalancingProcess(testStorage.getNamespaceName(), testStorage.getClusterName()); LOGGER.info("Verifying that operands are operated by expected Cluster Operator {}", FIRST_CO_NAME); MetricsUtils.assertMetricResourcesHigherThanOrEqualTo(secondCoMetricsCollector, KafkaRebalance.RESOURCE_KIND, 1); diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/ReconciliationST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/ReconciliationST.java index f39e0ab28e2..132193f1033 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/ReconciliationST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/ReconciliationST.java @@ -7,11 +7,9 @@ import io.strimzi.api.kafka.model.connect.KafkaConnect; import io.strimzi.api.kafka.model.connect.KafkaConnectResources; import io.strimzi.api.kafka.model.kafka.KafkaResources; -import io.strimzi.api.kafka.model.rebalance.KafkaRebalance; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceAnnotation; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState; import io.strimzi.operator.common.Annotations; -import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.model.Labels; import io.strimzi.systemtest.AbstractST; import io.strimzi.systemtest.Environment; @@ -188,11 +186,11 @@ void testPauseReconciliationInKafkaRebalanceAndTopic() { KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceState.ReconciliationPaused); - KafkaRebalanceUtils.annotateKafkaRebalanceResource(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, testStorage.getNamespaceName(), testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceAnnotation.approve); + KafkaRebalanceUtils.annotateKafkaRebalanceResource(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceAnnotation.approve); // unfortunately we don't have any option to check, if something is changed when reconciliations are paused // so we will check stability of status - KafkaRebalanceUtils.waitForRebalanceStatusStability(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, testStorage.getNamespaceName(), testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName()); + KafkaRebalanceUtils.waitForRebalanceStatusStability(testStorage.getNamespaceName(), testStorage.getClusterName()); LOGGER.info("Setting annotation to \"false\" and waiting for KafkaRebalance to be in {} state", KafkaRebalanceState.Ready); KafkaRebalanceResource.replaceKafkaRebalanceResourceInSpecificNamespace(testStorage.getClusterName(), @@ -201,7 +199,7 @@ void testPauseReconciliationInKafkaRebalanceAndTopic() { KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceState.ProposalReady); // because approve annotation wasn't reflected, approving again - KafkaRebalanceUtils.annotateKafkaRebalanceResource(new Reconciliation("test", KafkaRebalance.RESOURCE_KIND, testStorage.getNamespaceName(), testStorage.getClusterName()), testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceAnnotation.approve); + KafkaRebalanceUtils.annotateKafkaRebalanceResource(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceAnnotation.approve); KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(testStorage.getNamespaceName(), testStorage.getClusterName(), KafkaRebalanceState.Ready); } From 114cea317f2831d078e30e9f3ea0e4746ab1a185 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Wed, 24 Jul 2024 10:49:31 +0200 Subject: [PATCH 3/6] Add cc with tuning also to testKafkaCCAndRebalanceWithMultipleCOs Signed-off-by: Jakub Stejskal --- .../systemtest/operators/MultipleClusterOperatorsST.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java b/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java index 2b52db12435..4a7572093e9 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/operators/MultipleClusterOperatorsST.java @@ -282,7 +282,7 @@ void testKafkaCCAndRebalanceWithMultipleCOs() { KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build() ) ); - resourceManager.createResourceWithWait(KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 3, 3) + resourceManager.createResourceWithWait(KafkaTemplates.kafkaWithCruiseControlTunedForFastModelGeneration(testStorage.getClusterName(), 3, 3) .editOrNewMetadata() .addToLabels(FIRST_CO_SELECTOR) .withNamespace(testStorage.getNamespaceName()) From e76e088fd637421110ba6174f687fe33033ac806 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Thu, 25 Jul 2024 21:56:58 +0200 Subject: [PATCH 4/6] Try to fix another CC tests Signed-off-by: Jakub Stejskal --- .../io/strimzi/systemtest/cruisecontrol/CruiseControlST.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java index 02cb62d4836..18e780a808d 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java @@ -194,7 +194,7 @@ void testCruiseControlWithRebalanceResourceAndRefreshAnnotation() { // CruiseControl spec is now enabled KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), kafka -> { // Get default CC spec with tune options and set it to existing Kafka - CruiseControlSpec cruiseControlSpec = KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 3, 3).build().getSpec().getCruiseControl(); + CruiseControlSpec cruiseControlSpec = KafkaTemplates.kafkaWithCruiseControlTunedForFastModelGeneration(testStorage.getClusterName(), 3, 3).build().getSpec().getCruiseControl(); kafka.getSpec().setCruiseControl(cruiseControlSpec); }, Environment.TEST_SUITE_NAMESPACE); @@ -435,7 +435,7 @@ void testCruiseControlDuringBrokerScaleUpAndDown() { ); resourceManager.createResourceWithWait( - KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), initialReplicas, initialReplicas).build(), + KafkaTemplates.kafkaWithCruiseControlTunedForFastModelGeneration(testStorage.getClusterName(), initialReplicas, initialReplicas).build(), KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 10, 3, testStorage.getNamespaceName()).build(), ScraperTemplates.scraperPod(testStorage.getNamespaceName(), testStorage.getScraperName()).build() ); From bd363293fdf657b6a21714bf3b097d00f659e777 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Fri, 26 Jul 2024 08:19:50 +0200 Subject: [PATCH 5/6] Change also Kafka CC related configs Signed-off-by: Jakub Stejskal --- .../strimzi/systemtest/cruisecontrol/CruiseControlST.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java index 18e780a808d..5da5001ff72 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java @@ -9,6 +9,7 @@ import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; import io.strimzi.api.kafka.model.kafka.JbodStorage; import io.strimzi.api.kafka.model.kafka.JbodStorageBuilder; +import io.strimzi.api.kafka.model.kafka.Kafka; import io.strimzi.api.kafka.model.kafka.KafkaResources; import io.strimzi.api.kafka.model.kafka.KafkaStatus; import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; @@ -194,8 +195,9 @@ void testCruiseControlWithRebalanceResourceAndRefreshAnnotation() { // CruiseControl spec is now enabled KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), kafka -> { // Get default CC spec with tune options and set it to existing Kafka - CruiseControlSpec cruiseControlSpec = KafkaTemplates.kafkaWithCruiseControlTunedForFastModelGeneration(testStorage.getClusterName(), 3, 3).build().getSpec().getCruiseControl(); - kafka.getSpec().setCruiseControl(cruiseControlSpec); + Kafka kafkaUpdated = KafkaTemplates.kafkaWithCruiseControlTunedForFastModelGeneration(testStorage.getClusterName(), 3, 3).build(); + kafka.getSpec().setCruiseControl(kafkaUpdated.getSpec().getCruiseControl()); + kafka.getSpec().setKafka(kafkaUpdated.getSpec().getKafka()); }, Environment.TEST_SUITE_NAMESPACE); RollingUpdateUtils.waitTillComponentHasRolled(Environment.TEST_SUITE_NAMESPACE, testStorage.getBrokerSelector(), 3, brokerPods); From 1f6c81f8cec8495be75729c42f892ec01f9d4127 Mon Sep 17 00:00:00 2001 From: Jakub Stejskal Date: Fri, 26 Jul 2024 08:46:51 +0200 Subject: [PATCH 6/6] Fix build Signed-off-by: Jakub Stejskal --- .../io/strimzi/systemtest/cruisecontrol/CruiseControlST.java | 1 - 1 file changed, 1 deletion(-) diff --git a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java index 5da5001ff72..617959280ff 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/cruisecontrol/CruiseControlST.java @@ -14,7 +14,6 @@ import io.strimzi.api.kafka.model.kafka.KafkaStatus; import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder; import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlResources; -import io.strimzi.api.kafka.model.kafka.cruisecontrol.CruiseControlSpec; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceAnnotation; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceMode; import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;