diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/ClusterOperatorConfig.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/ClusterOperatorConfig.java index f1a3b9b7a8..df00e87294 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/ClusterOperatorConfig.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/ClusterOperatorConfig.java @@ -249,6 +249,11 @@ public class ClusterOperatorConfig { */ /* test */ static final ConfigParameter POD_SECURITY_PROVIDER_RESTRICTED_CLASS = new ConfigParameter<>("POD_SECURITY_PROVIDER_RESTRICTED_CLASS", STRING, "io.strimzi.plugin.security.profiles.impl.RestrictedPodSecurityProvider", CONFIG_VALUES); + /** + * Set true to generate Pod Disruption Budgets + */ + public static final ConfigParameter POD_DISRUPTION_BUDGET_GENERATION = new ConfigParameter<>("STRIMZI_POD_DISRUPTION_BUDGET_GENERATION", BOOLEAN, "true", CONFIG_VALUES); + /** * The configured Kafka versions */ @@ -598,6 +603,13 @@ public LeaderElectionManagerConfig getLeaderElectionConfig() { } } + /** + * @return Indicates whether Pod Disruption Budgets should be generated + */ + public boolean isPodDisruptionBudgetGeneration() { + return get(POD_DISRUPTION_BUDGET_GENERATION); + } + @Override public String toString() { return "ClusterOperatorConfig{" + @@ -620,6 +632,7 @@ public String toString() { "\n\toperatorName='" + getOperatorName() + '\'' + "\n\tpodSecurityProviderClass='" + getPodSecurityProviderClass() + '\'' + "\n\tleaderElectionConfig='" + getLeaderElectionConfig() + '\'' + + "\n\tpodDisruptionBudgetGeneration=" + isPodDisruptionBudgetGeneration() + "}"; } } diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractAssemblyOperator.java index b9ace39f44..44f1276d3f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractAssemblyOperator.java @@ -63,6 +63,7 @@ public abstract class AbstractAssemblyOperator imagePullSecrets; protected final KafkaVersion.Lookup versions; protected long operationTimeoutMs; + protected final boolean isPodDisruptionBudgetGeneration; /** * @param vertx The Vertx instance @@ -93,6 +94,7 @@ protected AbstractAssemblyOperator(Vertx vertx, PlatformFeaturesAvailability pfa this.imagePullSecrets = config.getImagePullSecrets(); this.versions = config.versions(); this.operationTimeoutMs = config.getOperationTimeoutMs(); + this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration(); } protected Future delete(Reconciliation reconciliation) { diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java index 5be006abe8..8b16d5103f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java @@ -98,6 +98,7 @@ public abstract class AbstractConnectOperator connectClientProvider; protected final ImagePullPolicy imagePullPolicy; @@ -161,6 +162,7 @@ public AbstractConnectOperator(Vertx vertx, PlatformFeaturesAvailability pfa, St this.sharedEnvironmentProvider = supplier.sharedEnvironmentProvider; this.port = port; this.continueOnManualRUFailure = config.featureGates().continueOnManualRUFailureEnabled(); + this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration(); } @Override @@ -257,6 +259,24 @@ protected Future generateMetricsAndLoggingConfigMap(Reconciliation re .compose(metricsAndLoggingCm -> Future.succeededFuture(kafkaConnectCluster.generateMetricsAndLogConfigMap(metricsAndLoggingCm))); } + /** + * Reconciles the PodDisruptionBudget for the Connect cluster. + * + * @param reconciliation The reconciliation + * @param namespace Namespace of the Connect cluster + * @param connect KafkaConnectCluster object + * + * @return Future for tracking the asynchronous result of reconciling the PodDisruptionBudget + */ + protected Future connectPodDisruptionBudget(Reconciliation reconciliation, String namespace, KafkaConnectCluster connect) { + if (isPodDisruptionBudgetGeneration) { + return podDisruptionBudgetOperator.reconcile(reconciliation, namespace, connect.getComponentName(), connect.generatePodDisruptionBudget()) + .mapEmpty(); + } else { + return Future.succeededFuture(); + } + } + /** * Dynamically updates loggers in the Kafka Connect cluster. * diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java index a5b6ddc5b3..0594d2090f 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java @@ -97,7 +97,7 @@ protected Future createOrUpdate(Reconciliation reconciliation .compose(scale -> serviceOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.serviceName(bridge.getCluster()), bridge.generateService())) .compose(i -> MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperations, bridge.logging(), null)) .compose(metricsAndLogging -> configMapOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.metricsAndLogConfigMapName(reconciliation.name()), bridge.generateMetricsAndLogConfigMap(metricsAndLogging))) - .compose(i -> podDisruptionBudgetOperator.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generatePodDisruptionBudget())) + .compose(i -> isPodDisruptionBudgetGeneration ? podDisruptionBudgetOperator.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generatePodDisruptionBudget()) : Future.succeededFuture()) .compose(i -> VertxUtil.authTlsHash(secretOperations, namespace, auth, trustedCertificates)) .compose(hash -> deploymentOperations.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generateDeployment(Collections.singletonMap(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(hash)), pfa.isOpenshift(), imagePullPolicy, imagePullSecrets))) .compose(i -> deploymentOperations.scaleUp(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas(), operationTimeoutMs)) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java index be270f6efa..3993355381 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java @@ -204,7 +204,7 @@ protected Future createOrUpdate(Reconciliation reconciliatio return configMapOperations.reconcile(reconciliation, namespace, logAndMetricsConfigMap.getMetadata().getName(), logAndMetricsConfigMap); }) .compose(i -> ReconcilerUtils.reconcileJmxSecret(reconciliation, secretOperations, connect)) - .compose(i -> podDisruptionBudgetOperator.reconcile(reconciliation, namespace, connect.getComponentName(), connect.generatePodDisruptionBudget())) + .compose(i -> connectPodDisruptionBudget(reconciliation, namespace, connect)) .compose(i -> generateAuthHash(namespace, kafkaConnect.getSpec())) .compose(hash -> { podAnnotations.put(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(hash)); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java index 0c105bb05f..250642ec2c 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java @@ -142,7 +142,7 @@ protected Future createOrUpdate(Reconciliation reconcil return configMapOperations.reconcile(reconciliation, namespace, logAndMetricsConfigMap.getMetadata().getName(), logAndMetricsConfigMap); }) .compose(i -> ReconcilerUtils.reconcileJmxSecret(reconciliation, secretOperations, mirrorMaker2Cluster)) - .compose(i -> podDisruptionBudgetOperator.reconcile(reconciliation, namespace, mirrorMaker2Cluster.getComponentName(), mirrorMaker2Cluster.generatePodDisruptionBudget())) + .compose(i -> connectPodDisruptionBudget(reconciliation, namespace, mirrorMaker2Cluster)) .compose(i -> generateAuthHash(namespace, kafkaMirrorMaker2.getSpec())) .compose(hash -> { podAnnotations.put(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(hash)); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java index ff4ca450e6..a0755206a2 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java @@ -110,7 +110,7 @@ protected Future createOrUpdate(Reconciliation reconcili annotations.put(Annotations.ANNO_STRIMZI_LOGGING_HASH, Util.hashStub(logAndMetricsConfigMap.getData().get(mirror.logging().configMapKey()))); return configMapOperations.reconcile(reconciliation, namespace, KafkaMirrorMakerResources.metricsAndLogConfigMapName(reconciliation.name()), logAndMetricsConfigMap); }) - .compose(i -> podDisruptionBudgetOperator.reconcile(reconciliation, namespace, mirror.getComponentName(), mirror.generatePodDisruptionBudget())) + .compose(i -> isPodDisruptionBudgetGeneration ? podDisruptionBudgetOperator.reconcile(reconciliation, namespace, mirror.getComponentName(), mirror.generatePodDisruptionBudget()) : Future.succeededFuture()) .compose(i -> Future.join(VertxUtil.authTlsHash(secretOperations, namespace, authConsumer, trustedCertificatesConsumer), VertxUtil.authTlsHash(secretOperations, namespace, authProducer, trustedCertificatesProducer))) .compose(hashFut -> { diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java index 25b1cd991d..e2802c8979 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java @@ -122,6 +122,7 @@ public class KafkaReconciler { private final ImagePullPolicy imagePullPolicy; private final List imagePullSecrets; private final List previousNodeIds; + private final boolean isPodDisruptionBudgetGeneration; // Objects used during the reconciliation /* test */ final Reconciliation reconciliation; @@ -213,6 +214,7 @@ public KafkaReconciler( this.imagePullPolicy = config.getImagePullPolicy(); this.imagePullSecrets = config.getImagePullSecrets(); this.previousNodeIds = kafkaCr.getStatus() != null ? kafkaCr.getStatus().getRegisteredNodeIds() : null; + this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration(); this.stsOperator = supplier.stsOperations; this.strimziPodSetOperator = supplier.strimziPodSetOperator; @@ -765,9 +767,13 @@ protected Future jmxSecret() { * @return Completes when the PDB was successfully created or updated */ protected Future podDisruptionBudget() { - return podDisruptionBudgetOperator + if (isPodDisruptionBudgetGeneration) { + return podDisruptionBudgetOperator .reconcile(reconciliation, reconciliation.namespace(), KafkaResources.kafkaComponentName(reconciliation.name()), kafka.generatePodDisruptionBudget()) - .map((Void) null); + .mapEmpty(); + } else { + return Future.succeededFuture(); + } } /** diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ZooKeeperReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ZooKeeperReconciler.java index 8330f130e5..426b7d8cf7 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ZooKeeperReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ZooKeeperReconciler.java @@ -87,6 +87,7 @@ public class ZooKeeperReconciler { private final int adminSessionTimeoutMs; private final ImagePullPolicy imagePullPolicy; private final List imagePullSecrets; + private final boolean isPodDisruptionBudgetGeneration; private final StatefulSetOperator stsOperator; private final StrimziPodSetOperator strimziPodSetOperator; @@ -161,6 +162,7 @@ public ZooKeeperReconciler( this.imagePullPolicy = config.getImagePullPolicy(); this.imagePullSecrets = config.getImagePullSecrets(); this.isKRaftMigrationRollback = isKRaftMigrationRollback; + this.isPodDisruptionBudgetGeneration = config.isPodDisruptionBudgetGeneration(); this.stsOperator = supplier.stsOperations; this.strimziPodSetOperator = supplier.strimziPodSetOperator; @@ -492,9 +494,13 @@ protected Future loggingAndMetricsConfigMap() { * @return Completes when the PDB was successfully created or updated */ protected Future podDisruptionBudget() { - return podDisruptionBudgetOperator - .reconcile(reconciliation, reconciliation.namespace(), KafkaResources.zookeeperComponentName(reconciliation.name()), zk.generatePodDisruptionBudget()) - .map((Void) null); + if (isPodDisruptionBudgetGeneration) { + return podDisruptionBudgetOperator + .reconcile(reconciliation, reconciliation.namespace(), KafkaResources.zookeeperComponentName(reconciliation.name()), zk.generatePodDisruptionBudget()) + .mapEmpty(); + } else { + return Future.succeededFuture(); + } } /** diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java index b758139513..b50910235f 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorConfigTest.java @@ -47,6 +47,7 @@ public class ClusterOperatorConfigTest { ENV_VARS.put(ClusterOperatorConfig.FEATURE_GATES.key(), "-ContinueReconciliationOnManualRollingUpdateFailure"); ENV_VARS.put(ClusterOperatorConfig.DNS_CACHE_TTL.key(), "10"); ENV_VARS.put(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.key(), "my.package.CustomPodSecurityProvider"); + ENV_VARS.put(ClusterOperatorConfig.POD_DISRUPTION_BUDGET_GENERATION.key(), "false"); } @Test @@ -57,6 +58,7 @@ public void testDefaultConfig() { envVars.remove(ClusterOperatorConfig.CONNECT_BUILD_TIMEOUT_MS.key()); envVars.remove(ClusterOperatorConfig.FEATURE_GATES.key()); envVars.remove(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.key()); + envVars.remove(ClusterOperatorConfig.POD_DISRUPTION_BUDGET_GENERATION.key()); ClusterOperatorConfig config = ClusterOperatorConfig.buildFromMap(envVars, KafkaVersionTestUtils.getKafkaVersionLookup()); @@ -71,6 +73,7 @@ public void testDefaultConfig() { assertThat(config.isPodSetReconciliationOnly(), is(false)); assertThat(config.getPodSecurityProviderClass(), is(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.defaultValue())); assertThat(config.getLeaderElectionConfig(), is(nullValue())); + assertThat(config.isPodDisruptionBudgetGeneration(), is(true)); } @Test @@ -103,6 +106,7 @@ public void testEnvVars() { assertThat(config.featureGates().continueOnManualRUFailureEnabled(), is(false)); assertThat(config.getDnsCacheTtlSec(), is(10)); assertThat(config.getPodSecurityProviderClass(), is("my.package.CustomPodSecurityProvider")); + assertThat(config.isPodDisruptionBudgetGeneration(), is(false)); } @Test @@ -120,6 +124,7 @@ public void testEnvVarsDefault() { assertThat(config.featureGates().continueOnManualRUFailureEnabled(), is(true)); assertThat(config.getDnsCacheTtlSec(), is(Integer.parseInt(ClusterOperatorConfig.DNS_CACHE_TTL.defaultValue()))); assertThat(config.getPodSecurityProviderClass(), is(ClusterOperatorConfig.POD_SECURITY_PROVIDER_CLASS.defaultValue())); + assertThat(config.isPodDisruptionBudgetGeneration(), is(true)); } private Map envWithImages() {