Skip to content

Commit

Permalink
Change wait for KafkaRebalance to allow Ready, ProposalReady, Pending…
Browse files Browse the repository at this point in the history
…Proposal states

Signed-off-by: Jakub Stejskal <xstejs24@gmail.com>
  • Loading branch information
Frawless committed Jul 24, 2024
1 parent d66d7b4 commit aa260ea
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalance;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceList;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.ResourceType;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaRebalanceUtils;
Expand Down Expand Up @@ -47,8 +46,8 @@ public void update(KafkaRebalance resource) {

@Override
public boolean waitForReadiness(KafkaRebalance resource) {
return KafkaRebalanceUtils.waitForKafkaRebalanceCustomResourceState(resource.getMetadata().getNamespace(), resource.getMetadata().getName(), KafkaRebalanceState.PendingProposal);
}
return KafkaRebalanceUtils.waitForKafkaRebalanceReadiness(resource);
};

public static MixedOperation<KafkaRebalance, KafkaRebalanceList, Resource<KafkaRebalance>> kafkaRebalanceClient() {
return Crds.kafkaRebalanceOperation(ResourceManager.kubeClient().getClient());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceState;
import io.strimzi.api.kafka.model.rebalance.KafkaRebalanceStatus;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.ResourceOperation;
import io.strimzi.systemtest.resources.crd.KafkaRebalanceResource;
import io.strimzi.test.TestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -27,11 +27,11 @@

public class KafkaRebalanceUtils {

private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(KafkaRebalanceUtils.class);
private static final Logger LOGGER = LogManager.getLogger(KafkaRebalanceUtils.class);

private KafkaRebalanceUtils() {}

private static Condition rebalanceStateCondition(Reconciliation reconciliation, String namespaceName, String resourceName) {
private static Condition rebalanceStateCondition(String namespaceName, String resourceName) {

List<Condition> statusConditions = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName)
.withName(resourceName).get().getStatus().getConditions().stream()
Expand All @@ -42,10 +42,10 @@ private static Condition rebalanceStateCondition(Reconciliation reconciliation,
if (statusConditions.size() == 1) {
return statusConditions.get(0);
} else if (statusConditions.size() > 1) {
LOGGER.warnCr(reconciliation, "Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status");
LOGGER.warn("Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status");
throw new RuntimeException("Multiple KafkaRebalance State Conditions were present in the KafkaRebalance status");
} else {
LOGGER.warnCr(reconciliation, "No KafkaRebalance State Conditions were present in the KafkaRebalance status");
LOGGER.warn("No KafkaRebalance State Conditions were present in the KafkaRebalance status");
throw new RuntimeException("No KafkaRebalance State Conditions were present in the KafkaRebalance status");
}
}
Expand All @@ -59,68 +59,68 @@ public static boolean waitForKafkaRebalanceCustomResourceState(String resourceNa
return waitForKafkaRebalanceCustomResourceState(kubeClient().getNamespace(), resourceName, state);
}

public static String annotateKafkaRebalanceResource(Reconciliation reconciliation, String namespaceName, String resourceName, KafkaRebalanceAnnotation annotation) {
LOGGER.infoCr(reconciliation, "Annotating KafkaRebalance: {} with annotation: {}", resourceName, annotation.toString());
public static String annotateKafkaRebalanceResource(String namespaceName, String resourceName, KafkaRebalanceAnnotation annotation) {
LOGGER.info("Annotating KafkaRebalance: {} with annotation: {}", resourceName, annotation.toString());
return ResourceManager.cmdKubeClient().namespace(namespaceName)
.execInCurrentNamespace("annotate", "kafkarebalance", resourceName, Annotations.ANNO_STRIMZI_IO_REBALANCE + "=" + annotation.toString())
.out()
.trim();
}

public static String annotateKafkaRebalanceResource(Reconciliation reconciliation, String resourceName, KafkaRebalanceAnnotation annotation) {
return annotateKafkaRebalanceResource(reconciliation, kubeClient().getNamespace(), resourceName, annotation);
public static String annotateKafkaRebalanceResource(String resourceName, KafkaRebalanceAnnotation annotation) {
return annotateKafkaRebalanceResource(kubeClient().getNamespace(), resourceName, annotation);
}

public static void doRebalancingProcessWithAutoApproval(Reconciliation reconciliation, String namespaceName, String rebalanceName) {
doRebalancingProcess(reconciliation, namespaceName, rebalanceName, true);
public static void doRebalancingProcessWithAutoApproval(String namespaceName, String rebalanceName) {
doRebalancingProcess(namespaceName, rebalanceName, true);
}

public static void doRebalancingProcess(Reconciliation reconciliation, String namespaceName, String rebalanceName) {
doRebalancingProcess(reconciliation, namespaceName, rebalanceName, false);
public static void doRebalancingProcess(String namespaceName, String rebalanceName) {
doRebalancingProcess(namespaceName, rebalanceName, false);
}

public static void doRebalancingProcess(Reconciliation reconciliation, String namespaceName, String rebalanceName, boolean autoApproval) {
LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "=")));
LOGGER.infoCr(reconciliation, KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType());
LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "=")));
public static void doRebalancingProcess(String namespaceName, String rebalanceName, boolean autoApproval) {
LOGGER.info(String.join("", Collections.nCopies(76, "=")));
LOGGER.info(KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType());
LOGGER.info(String.join("", Collections.nCopies(76, "=")));

if (!autoApproval) {
// it can sometimes happen that KafkaRebalance is already in the ProposalReady state -> race condition prevention
if (!rebalanceStateCondition(reconciliation, namespaceName, rebalanceName).getType().equals(KafkaRebalanceState.ProposalReady.name())) {
LOGGER.infoCr(reconciliation, "Verifying that KafkaRebalance resource is in {} state", KafkaRebalanceState.ProposalReady);
if (!rebalanceStateCondition(namespaceName, rebalanceName).getType().equals(KafkaRebalanceState.ProposalReady.name())) {
LOGGER.info("Verifying that KafkaRebalance resource is in {} state", KafkaRebalanceState.ProposalReady);

waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.ProposalReady);
}

LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "=")));
LOGGER.infoCr(reconciliation, KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType());
LOGGER.infoCr(reconciliation, String.join("", Collections.nCopies(76, "=")));
LOGGER.info(String.join("", Collections.nCopies(76, "=")));
LOGGER.info(KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get().getStatus().getConditions().get(0).getType());
LOGGER.info(String.join("", Collections.nCopies(76, "=")));

// using automatic-approval annotation
final KafkaRebalance kr = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(rebalanceName).get();
if (kr.getMetadata().getAnnotations().containsKey(Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL) &&
kr.getMetadata().getAnnotations().get(Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL).equals("true")) {
LOGGER.infoCr(reconciliation, "Triggering the rebalance automatically (because Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL is set to true) " +
LOGGER.info("Triggering the rebalance automatically (because Annotations.ANNO_STRIMZI_IO_REBALANCE_AUTOAPPROVAL is set to true) " +
"without an annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve");
} else {
LOGGER.infoCr(reconciliation, "Triggering the rebalance with annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve");
LOGGER.info("Triggering the rebalance with annotation {} of KafkaRebalance resource", "strimzi.io/rebalance=approve");

String response = annotateKafkaRebalanceResource(reconciliation, namespaceName, rebalanceName, KafkaRebalanceAnnotation.approve);
String response = annotateKafkaRebalanceResource(namespaceName, rebalanceName, KafkaRebalanceAnnotation.approve);

LOGGER.infoCr(reconciliation, "Response from the annotation process {}", response);
LOGGER.info("Response from the annotation process {}", response);
}
}

LOGGER.infoCr(reconciliation, "Verifying that annotation triggers the {} state", KafkaRebalanceState.Rebalancing);
LOGGER.info("Verifying that annotation triggers the {} state", KafkaRebalanceState.Rebalancing);

waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.Rebalancing);

LOGGER.infoCr(reconciliation, "Verifying that KafkaRebalance is in the {} state", KafkaRebalanceState.Ready);
LOGGER.info("Verifying that KafkaRebalance is in the {} state", KafkaRebalanceState.Ready);

waitForKafkaRebalanceCustomResourceState(namespaceName, rebalanceName, KafkaRebalanceState.Ready);
}

public static void waitForRebalanceStatusStability(Reconciliation reconciliation, String namespaceName, String resourceName) {
public static void waitForRebalanceStatusStability(String namespaceName, String resourceName) {
int[] stableCounter = {0};

KafkaRebalanceStatus oldStatus = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(resourceName).get().getStatus();
Expand All @@ -129,16 +129,39 @@ public static void waitForRebalanceStatusStability(Reconciliation reconciliation
if (KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(namespaceName).withName(resourceName).get().getStatus().equals(oldStatus)) {
stableCounter[0]++;
if (stableCounter[0] == TestConstants.GLOBAL_STABILITY_OFFSET_COUNT) {
LOGGER.infoCr(reconciliation, "KafkaRebalance status is stable for: {} poll intervals", stableCounter[0]);
LOGGER.info("KafkaRebalance status is stable for: {} poll intervals", stableCounter[0]);
return true;
}
} else {
LOGGER.infoCr(reconciliation, "KafkaRebalance status is not stable. Going to set the counter to zero");
LOGGER.info("KafkaRebalance status is not stable. Going to set the counter to zero");
stableCounter[0] = 0;
return false;
}
LOGGER.infoCr(reconciliation, "KafkaRebalance status gonna be stable in {} polls", TestConstants.GLOBAL_STABILITY_OFFSET_COUNT - stableCounter[0]);
LOGGER.info("KafkaRebalance status gonna be stable in {} polls", TestConstants.GLOBAL_STABILITY_OFFSET_COUNT - stableCounter[0]);
return false;
});
}

public static boolean waitForKafkaRebalanceReadiness(KafkaRebalance resource) {
List<String> readyConditions = Arrays.asList(KafkaRebalanceState.PendingProposal.toString(), KafkaRebalanceState.ProposalReady.toString(), KafkaRebalanceState.Ready.toString());
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "Waiting for {}: {}/{} will have desired ready state ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), readyConditions);

TestUtils.waitFor(String.format("%s: %s#%s will have desired state: %s", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), KafkaRebalanceState.Ready),
TestConstants.GLOBAL_POLL_INTERVAL, ResourceOperation.getTimeoutForKafkaRebalanceState(KafkaRebalanceState.Ready),
() -> {
List<String> actualConditions = KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(resource.getMetadata().getNamespace()).resource(resource).get().getStatus().getConditions().stream().map(Condition::getType).toList();

for (String condition: actualConditions) {
if (readyConditions.contains(condition)) {
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is in ready state ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), condition);
return true;
}
}
LOGGER.log(ResourceManager.getInstance().determineLogLevel(), "{}: {}/{} is not ready ({})", resource.getKind(), resource.getMetadata().getNamespace(), resource.getMetadata().getName(), actualConditions);

return false;
});

return true;
}
}
Loading

0 comments on commit aa260ea

Please sign in to comment.