Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean-up the KafkaClusterCreator class after ZooKeeper removal #10997

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import io.strimzi.api.kafka.model.common.Condition;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaBuilder;
import io.strimzi.api.kafka.model.kafka.KafkaStatus;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
Expand Down Expand Up @@ -120,9 +119,9 @@ public Future<KafkaCluster> prepareKafkaCluster(
this.scalingDownBlockedNodes.addAll(kafka.removedNodes());
// We have a failure, and should try to fix issues
// Once we fix it, we call this method again, but this time with tryToFixProblems set to false
return revertScaleDown(kafka, kafkaCr, nodePools)
.compose(kafkaAndNodePools -> revertRoleChange(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePoolCrs()))
.compose(kafkaAndNodePools -> prepareKafkaCluster(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePoolCrs(), oldStorage, versionChange, kafkaStatus, false));
return revertScaleDown(nodePools)
.compose(revertedNodePools -> revertRoleChange(revertedNodePools))
.compose(revertedNodePools -> prepareKafkaCluster(kafkaCr, revertedNodePools, oldStorage, versionChange, kafkaStatus, false));
} else if (checkFailed()) {
// We have a failure, but we should not try to fix it
List<String> errors = new ArrayList<>();
Expand Down Expand Up @@ -212,70 +211,49 @@ private Future<KafkaCluster> brokerRemovalCheck(Kafka kafkaCr, KafkaCluster kafk
/**
* Reverts the broker scale down if it is not allowed because the brokers are not empty
*
* @param kafka Instance of the Kafka cluster model that contains information needed to revert the changes
* @param kafkaCr Kafka custom resource
* @param nodePoolCrs List with KafkaNodePool custom resources
*
* @return Future with KafkaAndNodePools record containing the fixed Kafka and KafkaNodePool CRs
*/
@SuppressWarnings("deprecation") // Replicas in Kafka CR are deprecated and its use here will be removed in the future in a separate PR
private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafkaCr, List<KafkaNodePool> nodePoolCrs) {
private Future<List<KafkaNodePool>> revertScaleDown(List<KafkaNodePool> nodePoolCrs) {
if (scaleDownCheckFailed) {
if (nodePoolCrs == null || nodePoolCrs.isEmpty()) {
// There are no node pools => the Kafka CR is used
int newReplicasCount = kafkaCr.getSpec().getKafka().getReplicas() + kafka.removedNodes().size();
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting scale-down of Kafka " + kafkaCr.getMetadata().getName() + " by changing number of replicas to " + newReplicasCount));
LOGGER.warnCr(reconciliation, "Reverting scale-down of Kafka {} by changing number of replicas to {}", kafkaCr.getMetadata().getName(), newReplicasCount);

Kafka newKafkaCr = new KafkaBuilder(kafkaCr)
.editSpec()
.editKafka()
.withReplicas(newReplicasCount)
.endKafka()
.endSpec()
.build();

return Future.succeededFuture(new KafkaAndNodePools(newKafkaCr, nodePoolCrs));
} else {
// Node pools are used -> we have to fix scale down in the KafkaNodePools
List<KafkaNodePool> newNodePools = new ArrayList<>();
// Node pools are used -> we have to fix scale down in the KafkaNodePools
List<KafkaNodePool> newNodePools = new ArrayList<>();

for (KafkaNodePool nodePool : nodePoolCrs) {
if (nodePool.getStatus() != null
&& nodePool.getStatus().getRoles().contains(ProcessRoles.BROKER)
&& nodePool.getStatus().getNodeIds() != null
&& nodePool.getSpec().getReplicas() < nodePool.getStatus().getNodeIds().size()) {
int newReplicasCount = nodePool.getStatus().getNodeIds().size();
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting scale-down of KafkaNodePool " + nodePool.getMetadata().getName() + " by changing number of replicas to " + newReplicasCount));
LOGGER.warnCr(reconciliation, "Reverting scale-down of KafkaNodePool {} by changing number of replicas to {}", nodePool.getMetadata().getName(), newReplicasCount);
newNodePools.add(
new KafkaNodePoolBuilder(nodePool)
.editSpec()
.withReplicas(newReplicasCount)
.endSpec()
.build());
} else {
newNodePools.add(nodePool);
}
for (KafkaNodePool nodePool : nodePoolCrs) {
if (nodePool.getStatus() != null
&& nodePool.getStatus().getRoles().contains(ProcessRoles.BROKER)
&& nodePool.getStatus().getNodeIds() != null
&& nodePool.getSpec().getReplicas() < nodePool.getStatus().getNodeIds().size()) {
int newReplicasCount = nodePool.getStatus().getNodeIds().size();
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting scale-down of KafkaNodePool " + nodePool.getMetadata().getName() + " by changing number of replicas to " + newReplicasCount));
LOGGER.warnCr(reconciliation, "Reverting scale-down of KafkaNodePool {} by changing number of replicas to {}", nodePool.getMetadata().getName(), newReplicasCount);
newNodePools.add(
new KafkaNodePoolBuilder(nodePool)
.editSpec()
.withReplicas(newReplicasCount)
.endSpec()
.build());
} else {
newNodePools.add(nodePool);
}

return Future.succeededFuture(new KafkaAndNodePools(kafkaCr, newNodePools));
}

return Future.succeededFuture(newNodePools);
} else {
// The scale-down check did not fail => return the original resources
return Future.succeededFuture(new KafkaAndNodePools(kafkaCr, nodePoolCrs));
return Future.succeededFuture(nodePoolCrs);
}
}

/**
* Reverts the role change when the broker role is removed from a node that has still assigned partition replicas
*
* @param kafkaCr Kafka custom resource
* @param nodePoolCrs List with KafkaNodePool custom resources
*
* @return Future with KafkaAndNodePools record containing the fixed Kafka and KafkaNodePool CRs
*/
private Future<KafkaAndNodePools> revertRoleChange(Kafka kafkaCr, List<KafkaNodePool> nodePoolCrs) {
private Future<List<KafkaNodePool>> revertRoleChange(List<KafkaNodePool> nodePoolCrs) {
if (usedToBeBrokersCheckFailed) {
List<KafkaNodePool> newNodePools = new ArrayList<>();

Expand All @@ -296,10 +274,10 @@ private Future<KafkaAndNodePools> revertRoleChange(Kafka kafkaCr, List<KafkaNode
}
}

return Future.succeededFuture(new KafkaAndNodePools(kafkaCr, newNodePools));
return Future.succeededFuture(newNodePools);
} else {
// The used-to-be-brokers check did not fail => return the original resources
return Future.succeededFuture(new KafkaAndNodePools(kafkaCr, nodePoolCrs));
return Future.succeededFuture(nodePoolCrs);
}
}

Expand Down Expand Up @@ -353,12 +331,4 @@ public static KafkaCluster createKafkaCluster(
String clusterId = NodePoolUtils.getOrGenerateKRaftClusterId(kafkaCr, nodePoolCrs);
return KafkaCluster.fromCrd(reconciliation, kafkaCr, pools, versions, versionChange, clusterId, sharedEnvironmentProvider);
}

/**
* Utility record to pass fixed custom resources between methods
*
* @param kafkaCr Kafka custom resource
* @param nodePoolCrs List of KafkaNodePool resources
*/
record KafkaAndNodePools(Kafka kafkaCr, List<KafkaNodePool> nodePoolCrs) { }
}
Loading