Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ST: Deflake testPauseReconciliationInKafkaRebalanceAndTopic and testKafkaCCAndRebalanceWithMultipleCOs #10375

Merged
merged 6 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalance;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceList;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.ResourceType;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaRebalanceUtils;
Expand Down Expand Up @@ -47,8 +46,8 @@ public void update(KafkaRebalance resource) {

@Override
public boolean waitForReadiness(KafkaRebalance resource) {
return KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(resource.getMetadata().getNamespace(), resource.getMetadata().getName(), KafkaRebalanceState.PendingProposal);
}
return KafkaRebalanceUtils.waitForKafkaRebalanceReadiness(resource);
};

public static MixedOperation<KafkaRebalance, KafkaRebalanceList, Resource<KafkaRebalance>> kafkaRebalanceClient() {
return Crds.kafkaRebalanceOperation(ResourceManager.kubeClient().getClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceStatus;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.ResourceOperation;
import io.strimzi.systemtest.resources.crd.KafkaRebalanceResource;
import io.strimzi.test.TestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -27,11 +27,11 @@

public class KafkaRebalanceUtils {

private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaRebalanceUtils.class);
private static final Logger LOGGER = LogManager.getLogger(KafkaRebalanceUtils.class);

private KafkaRebalanceUtils() {}

private static Condition rebalanceStateCondition(Reconciliation reconciliation, String namespaceName, String resourceName) {
private static Condition rebalanceStateCondition(String namespaceName, String resourceName) {

List<Condition> statusConditions = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName)
.withName(resourceName).get().getStatus().getConditions().stream()
Expand All @@ -42,10 +42,10 @@ private static Condition rebalanceStateCondition(Reconciliation reconciliation,
if (statusConditions.size() == 1) {
return statusConditions.get(0);
} else if (statusConditions.size() > 1) {
LOGGER.warnCr(reconciliation, "Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status");
LOGGER.warn("Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status");
throw new RuntimeException("Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status");
} else {
LOGGER.warnCr(reconciliation, "No KafkaRebalance State Conditions were present in the KafkaRebalance status");
LOGGER.warn("No KafkaRebalance State Conditions were present in the KafkaRebalance status");
throw new RuntimeException("No KafkaRebalance State Conditions were present in the KafkaRebalance status");
}
}
Expand All @@ -59,68 +59,68 @@ public static boolean waitForKafkaRebalanceCustomResourceState(String resourceNa
return waitForKafkaRebalanceCustomResourceState(kubeClient().getNamespace(), resourceName, state);
}

public static String annotateKafkaRebalanceResource(Reconciliation reconciliation, String namespaceName, String resourceName, KafkaRebalanceAnnotation annotation) {
LOGGER.infoCr(reconciliation, "Annotating KafkaRebalance: {} with annotation: {}", resourceName, annotation.toString());
public static String annotateKafkaRebalanceResource(String namespaceName, String resourceName, KafkaRebalanceAnnotation annotation) {
LOGGER.info("Annotating KafkaRebalance: {} with annotation: {}", resourceName, annotation.toString());
return ResourceManager.cmdKubeClient().namespace(namespaceName)
.execInCurrentNamespace("annotate", "kafkarebalance", resourceName, Annotations.ANNO_STRIMZI_IO_REBALANCE + "=" + annotation.toString())
.out()
.trim();
}

public static String annotateKafkaRebalanceResource(Reconciliation reconciliation, String resourceName, KafkaRebalanceAnnotation annotation) {
return annotateKafkaRebalanceResource(reconciliation, kubeClient().getNamespace(), resourceName, annotation);
public static String annotateKafkaRebalanceResource(String resourceName, KafkaRebalanceAnnotation annotation) {
return annotateKafkaRebalanceResource(kubeClient().getNamespace(), resourceName, annotation);
}

public static void doRebalancingProcessWithAutoApproval(Reconciliation reconciliation, String namespaceName, String rebalanceName) {
doRebalancingProcess(reconciliation, namespaceName, rebalanceName, true);
public static void doRebalancingProcessWithAutoApproval(String namespaceName, String rebalanceName) {
doRebalancingProcess(namespaceName, rebalanceName, true);
}

public static void doRebalancingProcess(Reconciliation reconciliation, String namespaceName, String rebalanceName) {
doRebalancingProcess(reconciliation, namespaceName, rebalanceName, false);
public static void doRebalancingProcess(String namespaceName, String rebalanceName) {
doRebalancingProcess(namespaceName, rebalanceName, false);
}

public static void doRebalancingProcess(Reconciliation reconciliation, String namespaceName, String rebalanceName, boolean autoApproval) {
LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "=")));
LOGGER.infoCr(reconciliation, KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType());
LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "=")));
public static void doRebalancingProcess(String namespaceName, String rebalanceName, boolean autoApproval) {
LOGGER.info(String.join("", Collections.nCopies(76, "=")));
LOGGER.info(KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType());
LOGGER.info(String.join("", Collections.nCopies(76, "=")));

if (!autoApproval) {
// it can sometimes happen that KafkaRebalance is already in the ProposalReady state -> race condition prevention
if (!rebalanceStateCondition(reconciliation, namespaceName, rebalanceName).getType().equals(KafkaRebalanceState.ProposalReady.name())) {
LOGGER.infoCr(reconciliation, "Verifying that KafkaRebalance resource is in {} state", KafkaRebalanceState.ProposalReady);
if (!rebalanceStateCondition(namespaceName, rebalanceName).getType().equals(KafkaRebalanceState.ProposalReady.name())) {
LOGGER.info("Verifying that KafkaRebalance resource is in {} state", KafkaRebalanceState.ProposalReady);

waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.ProposalReady);
}

LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "=")));
LOGGER.infoCr(reconciliation, KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType());
LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "=")));
LOGGER.info(String.join("", Collections.nCopies(76, "=")));
LOGGER.info(KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType());
LOGGER.info(String.join("", Collections.nCopies(76, "=")));

// using automatic-approval annotation
final KafkaRebalance kr = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get();
if (kr.getMetadata().getAnnotations().containsKey(Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL) &&
kr.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL).equals("true")) {
LOGGER.infoCr(reconciliation, "Triggering the rebalance automatically (because Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL is set to true) " +
LOGGER.info("Triggering the rebalance automatically (because Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL is set to true) " +
"without an annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve");
} else {
LOGGER.infoCr(reconciliation, "Triggering the rebalance with annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve");
LOGGER.info("Triggering the rebalance with annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve");

String response = annotateKafkaRebalanceResource(reconciliation, namespaceName, rebalanceName, KafkaRebalanceAnnotation.approve);
String response = annotateKafkaRebalanceResource(namespaceName, rebalanceName, KafkaRebalanceAnnotation.approve);

LOGGER.infoCr(reconciliation, "Response from the annotation process {}", response);
LOGGER.info("Response from the annotation process {}", response);
}
}

LOGGER.infoCr(reconciliation, "Verifying that annotation triggers the {} state", KafkaRebalanceState.Rebalancing);
LOGGER.info("Verifying that annotation triggers the {} state", KafkaRebalanceState.Rebalancing);

waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.Rebalancing);

LOGGER.infoCr(reconciliation, "Verifying that KafkaRebalance is in the {} state", KafkaRebalanceState.Ready);
LOGGER.info("Verifying that KafkaRebalance is in the {} state", KafkaRebalanceState.Ready);

waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.Ready);
}

public static void waitForRebalanceStatusStability(Reconciliation reconciliation, String namespaceName, String resourceName) {
public static void waitForRebalanceStatusStability(String namespaceName, String resourceName) {
int[] stableCounter = {0};

KafkaRebalanceStatus oldStatus = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(resourceName).get().getStatus();
Expand All @@ -129,16 +129,39 @@ public static void waitForRebalanceStatusStability(Reconciliation reconciliation
if (KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(resourceName).get().getStatus().equals(oldStatus)) {
stableCounter[0]++;
if (stableCounter[0] == TestConstants.GLOBAL_STABILITY_OFFSET_COUNT) {
LOGGER.infoCr(reconciliation, "KafkaRebalance status is stable for: {} poll intervals", stableCounter[0]);
LOGGER.info("KafkaRebalance status is stable for: {} poll intervals", stableCounter[0]);
return true;
}
} else {
LOGGER.infoCr(reconciliation, "KafkaRebalance status is not stable. Going to set the counter to zero");
LOGGER.info("KafkaRebalance status is not stable. Going to set the counter to zero");
stableCounter[0] = 0;
return false;
}
LOGGER.infoCr(reconciliation, "KafkaRebalance status gonna be stable in {} polls", TestConstants.GLOBAL_STABILITY_OFFSET_COUNT - stableCounter[0]);
LOGGER.info("KafkaRebalance status gonna be stable in {} polls", TestConstants.GLOBAL_STABILITY_OFFSET_COUNT - stableCounter[0]);
return false;
});
}

public static boolean waitForKafkaRebalanceReadiness(KafkaRebalance resource) {
List<String> readyConditions = Arrays.asList(KafkaRebalanceState.PendingProposal.toString(), KafkaRebalanceState.ProposalReady.toString(), KafkaRebalanceState.Ready.toString());
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "Waiting for {}: {}/{} will have desired ready state ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), readyConditions);

TestUtils.waitFor(String.format("%s: %s#%s will have desired state: %s", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), KafkaRebalanceState.Ready),
TestConstants.GLOBAL_POLL_INTERVAL, ResourceOperation.getTimeoutForKafkaRebalanceState(KafkaRebalanceState.Ready),
() -> {
List<String> actualConditions = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(resource.getMetadata().getNamespace()).resource(resource).get().getStatus().getConditions().stream().map(Condition::getType).toList();

for (String condition: actualConditions) {
if (readyConditions.contains(condition)) {
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is in ready state ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), condition);
return true;
}
}
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is not ready ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), actualConditions);

return false;
});

return true;
}
}
Loading
Loading