Skip to content

Commit

Permalink
Add env var to disable creation of PodDisruptionBudget
Browse files Browse the repository at this point in the history
Closes #6996

Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
  • Loading branch information
katheris committed Sep 19, 2024
1 parent 926a850 commit 005e902
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public class ClusterOperatorConfig {
*/
/* test */ static final ConfigParameter<String> POD_SECURITY_PROVIDER_RESTRICTED_CLASS = new ConfigParameter<>("POD_SECURITY_PROVIDER_RESTRICTED_CLASS", STRING, "io.strimzi.plugin.security.profiles.impl.RestrictedPodSecurityProvider", CONFIG_VALUES);

/**
* Set true to generate Pod Disruption Budgets
*/
public static final ConfigParameter<Boolean> POD_DISRUPTION_BUDGET_GENERATION = new ConfigParameter<>("STRIMZI_POD_DISRUPTION_BUDGET_GENERATION", BOOLEAN, "true", CONFIG_VALUES);

/**
* The configured Kafka versions
*/
Expand Down Expand Up @@ -598,6 +603,13 @@ public LeaderElectionManagerConfig getLeaderElectionConfig() {
}
}

/**
* @return Indicates whether Pod Disruption Budgets should be generated
*/
public boolean isPodDisruptionBudgetGeneration() {
return get(POD_DISRUPTION_BUDGET_GENERATION);
}

@Override
public String toString() {
return "ClusterOperatorConfig{" +
Expand All @@ -620,6 +632,7 @@ public String toString() {
"\n\toperatorName='" + getOperatorName() + '\'' +
"\n\tpodSecurityProviderClass='" + getPodSecurityProviderClass() + '\'' +
"\n\tleaderElectionConfig='" + getLeaderElectionConfig() + '\'' +
"\n\tpodDisruptionBudgetGeneration=" + isPodDisruptionBudgetGeneration() +
"}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public abstract class AbstractAssemblyOperator<C extends KubernetesClient, T ext
protected final List<LocalObjectReference> imagePullSecrets;
protected final KafkaVersion.Lookup versions;
protected long operationTimeoutMs;
protected final boolean isPodDisruptionBudgetGeneration;

/**
* @param vertx The Vertx instance
Expand Down Expand Up @@ -93,6 +94,7 @@ protected AbstractAssemblyOperator(Vertx vertx, PlatformFeaturesAvailability pfa
this.imagePullSecrets = config.getImagePullSecrets();
this.versions = config.versions();
this.operationTimeoutMs = config.getOperationTimeoutMs();
this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration();
}

protected Future<Boolean> delete(Reconciliation reconciliation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public abstract class AbstractConnectOperator<C extends KubernetesClient, T exte

private final boolean isNetworkPolicyGeneration;
private final boolean continueOnManualRUFailure;
private final boolean isPodDisruptionBudgetGeneration;

protected final Function<Vertx, KafkaConnectApi> connectClientProvider;
protected final ImagePullPolicy imagePullPolicy;
Expand Down Expand Up @@ -161,6 +162,7 @@ public AbstractConnectOperator(Vertx vertx, PlatformFeaturesAvailability pfa, St
this.sharedEnvironmentProvider = supplier.sharedEnvironmentProvider;
this.port = port;
this.continueOnManualRUFailure = config.featureGates().continueOnManualRUFailureEnabled();
this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration();
}

@Override
Expand Down Expand Up @@ -257,6 +259,24 @@ protected Future<ConfigMap> generateMetricsAndLoggingConfigMap(Reconciliation re
.compose(metricsAndLoggingCm -> Future.succeededFuture(kafkaConnectCluster.generateMetricsAndLogConfigMap(metricsAndLoggingCm)));
}

/**
* Reconciles the PodDisruptionBudget for the Connect cluster.
*
* @param reconciliation The reconciliation
* @param namespace Namespace of the Connect cluster
* @param connect KafkaConnectCluster object
*
* @return Future for tracking the asynchronous result of reconciling the PodDisruptionBudget
*/
protected Future<Void> connectPodDisruptionBudget(Reconciliation reconciliation, String namespace, KafkaConnectCluster connect) {
if (isPodDisruptionBudgetGeneration) {
return podDisruptionBudgetOperator.reconcile(reconciliation, namespace, connect.getComponentName(), connect.generatePodDisruptionBudget())
.mapEmpty();
} else {
return Future.succeededFuture();
}
}

/**
* Dynamically updates loggers in the Kafka Connect cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected Future<KafkaBridgeStatus> createOrUpdate(Reconciliation reconciliation
.compose(scale -> serviceOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.serviceName(bridge.getCluster()), bridge.generateService()))
.compose(i -> MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperations, bridge.logging(), null))
.compose(metricsAndLogging -> configMapOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.metricsAndLogConfigMapName(reconciliation.name()), bridge.generateMetricsAndLogConfigMap(metricsAndLogging)))
.compose(i -> podDisruptionBudgetOperator.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generatePodDisruptionBudget()))
.compose(i -> isPodDisruptionBudgetGeneration ? podDisruptionBudgetOperator.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generatePodDisruptionBudget()) : Future.succeededFuture())
.compose(i -> VertxUtil.authTlsHash(secretOperations, namespace, auth, trustedCertificates))
.compose(hash -> deploymentOperations.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generateDeployment(Collections.singletonMap(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(hash)), pfa.isOpenshift(), imagePullPolicy, imagePullSecrets)))
.compose(i -> deploymentOperations.scaleUp(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas(), operationTimeoutMs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected Future<KafkaConnectStatus> createOrUpdate(Reconciliation reconciliatio
return configMapOperations.reconcile(reconciliation, namespace, logAndMetricsConfigMap.getMetadata().getName(), logAndMetricsConfigMap);
})
.compose(i -> ReconcilerUtils.reconcileJmxSecret(reconciliation, secretOperations, connect))
.compose(i -> podDisruptionBudgetOperator.reconcile(reconciliation, namespace, connect.getComponentName(), connect.generatePodDisruptionBudget()))
.compose(i -> connectPodDisruptionBudget(reconciliation, namespace, connect))
.compose(i -> generateAuthHash(namespace, kafkaConnect.getSpec()))
.compose(hash -> {
podAnnotations.put(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(hash));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ protected Future<KafkaMirrorMaker2Status> createOrUpdate(Reconciliation reconcil
return configMapOperations.reconcile(reconciliation, namespace, logAndMetricsConfigMap.getMetadata().getName(), logAndMetricsConfigMap);
})
.compose(i -> ReconcilerUtils.reconcileJmxSecret(reconciliation, secretOperations, mirrorMaker2Cluster))
.compose(i -> podDisruptionBudgetOperator.reconcile(reconciliation, namespace, mirrorMaker2Cluster.getComponentName(), mirrorMaker2Cluster.generatePodDisruptionBudget()))
.compose(i -> connectPodDisruptionBudget(reconciliation, namespace, mirrorMaker2Cluster))
.compose(i -> generateAuthHash(namespace, kafkaMirrorMaker2.getSpec()))
.compose(hash -> {
podAnnotations.put(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(hash));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected Future<KafkaMirrorMakerStatus> createOrUpdate(Reconciliation reconcili
annotations.put(Annotations.ANNO_STRIMZI_LOGGING_HASH, Util.hashStub(logAndMetricsConfigMap.getData().get(mirror.logging().configMapKey())));
return configMapOperations.reconcile(reconciliation, namespace, KafkaMirrorMakerResources.metricsAndLogConfigMapName(reconciliation.name()), logAndMetricsConfigMap);
})
.compose(i -> podDisruptionBudgetOperator.reconcile(reconciliation, namespace, mirror.getComponentName(), mirror.generatePodDisruptionBudget()))
.compose(i -> isPodDisruptionBudgetGeneration ? podDisruptionBudgetOperator.reconcile(reconciliation, namespace, mirror.getComponentName(), mirror.generatePodDisruptionBudget()) : Future.succeededFuture())
.compose(i -> Future.join(VertxUtil.authTlsHash(secretOperations, namespace, authConsumer, trustedCertificatesConsumer),
VertxUtil.authTlsHash(secretOperations, namespace, authProducer, trustedCertificatesProducer)))
.compose(hashFut -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public class KafkaReconciler {
private final ImagePullPolicy imagePullPolicy;
private final List<LocalObjectReference> imagePullSecrets;
private final List<Integer> previousNodeIds;
private final boolean isPodDisruptionBudgetGeneration;

// Objects used during the reconciliation
/* test */ final Reconciliation reconciliation;
Expand Down Expand Up @@ -213,6 +214,7 @@ public KafkaReconciler(
this.imagePullPolicy = config.getImagePullPolicy();
this.imagePullSecrets = config.getImagePullSecrets();
this.previousNodeIds = kafkaCr.getStatus() != null ? kafkaCr.getStatus().getRegisteredNodeIds() : null;
this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration();

this.stsOperator = supplier.stsOperations;
this.strimziPodSetOperator = supplier.strimziPodSetOperator;
Expand Down Expand Up @@ -765,9 +767,13 @@ protected Future<Void> jmxSecret() {
* @return Completes when the PDB was successfully created or updated
*/
protected Future<Void> podDisruptionBudget() {
return podDisruptionBudgetOperator
if (isPodDisruptionBudgetGeneration) {
return podDisruptionBudgetOperator
.reconcile(reconciliation, reconciliation.namespace(), KafkaResources.kafkaComponentName(reconciliation.name()), kafka.generatePodDisruptionBudget())
.map((Void) null);
.mapEmpty();
} else {
return Future.succeededFuture();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class ZooKeeperReconciler {
private final int adminSessionTimeoutMs;
private final ImagePullPolicy imagePullPolicy;
private final List<LocalObjectReference> imagePullSecrets;
private final boolean isPodDisruptionBudgetGeneration;

private final StatefulSetOperator stsOperator;
private final StrimziPodSetOperator strimziPodSetOperator;
Expand Down Expand Up @@ -161,6 +162,7 @@ public ZooKeeperReconciler(
this.imagePullPolicy = config.getImagePullPolicy();
this.imagePullSecrets = config.getImagePullSecrets();
this.isKRaftMigrationRollback = isKRaftMigrationRollback;
this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration();

this.stsOperator = supplier.stsOperations;
this.strimziPodSetOperator = supplier.strimziPodSetOperator;
Expand Down Expand Up @@ -492,9 +494,13 @@ protected Future<Void> loggingAndMetricsConfigMap() {
* @return Completes when the PDB was successfully created or updated
*/
protected Future<Void> podDisruptionBudget() {
return podDisruptionBudgetOperator
.reconcile(reconciliation, reconciliation.namespace(), KafkaResources.zookeeperComponentName(reconciliation.name()), zk.generatePodDisruptionBudget())
.map((Void) null);
if (isPodDisruptionBudgetGeneration) {
return podDisruptionBudgetOperator
.reconcile(reconciliation, reconciliation.namespace(), KafkaResources.zookeeperComponentName(reconciliation.name()), zk.generatePodDisruptionBudget())
.mapEmpty();
} else {
return Future.succeededFuture();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ClusterOperatorConfigTest {
ENV_VARS.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-ContinueReconciliationOnManualRollingUpdateFailure");
ENV_VARS.put(ClusterOperatorConfig.DNS_CACHE_TTL.key(), "10");
ENV_VARS.put(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.key(), "my.package.CustomPodSecurityProvider");
ENV_VARS.put(ClusterOperatorConfig.POD_DISRUPTION_BUDGET_GENERATION.key(), "false");
}

@Test
Expand All @@ -57,6 +58,7 @@ public void testDefaultConfig() {
envVars.remove(ClusterOperatorConfig.CONNECT_BUILD_TIMEOUT_MS.key());
envVars.remove(ClusterOperatorConfig.FEATURE_GATES.key());
envVars.remove(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.key());
envVars.remove(ClusterOperatorConfig.POD_DISRUPTION_BUDGET_GENERATION.key());

ClusterOperatorConfig config = ClusterOperatorConfig.buildFromMap(envVars, KafkaVersionTestUtils.getKafkaVersionLookup());

Expand All @@ -71,6 +73,7 @@ public void testDefaultConfig() {
assertThat(config.isPodSetReconciliationOnly(), is(false));
assertThat(config.getPodSecurityProviderClass(), is(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.defaultValue()));
assertThat(config.getLeaderElectionConfig(), is(nullValue()));
assertThat(config.isPodDisruptionBudgetGeneration(), is(true));
}

@Test
Expand Down Expand Up @@ -103,6 +106,7 @@ public void testEnvVars() {
assertThat(config.featureGates().continueOnManualRUFailureEnabled(), is(false));
assertThat(config.getDnsCacheTtlSec(), is(10));
assertThat(config.getPodSecurityProviderClass(), is("my.package.CustomPodSecurityProvider"));
assertThat(config.isPodDisruptionBudgetGeneration(), is(false));
}

@Test
Expand All @@ -120,6 +124,7 @@ public void testEnvVarsDefault() {
assertThat(config.featureGates().continueOnManualRUFailureEnabled(), is(true));
assertThat(config.getDnsCacheTtlSec(), is(Integer.parseInt(ClusterOperatorConfig.DNS_CACHE_TTL.defaultValue())));
assertThat(config.getPodSecurityProviderClass(), is(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.defaultValue()));
assertThat(config.isPodDisruptionBudgetGeneration(), is(true));
}

private Map<String, String> envWithImages() {
Expand Down

0 comments on commit 005e902

Please sign in to comment.