diff --git a/.azure/templates/default_variables.yaml b/.azure/templates/default_variables.yaml index 47bfb1101c1..900fbd4c258 100644 --- a/.azure/templates/default_variables.yaml +++ b/.azure/templates/default_variables.yaml @@ -14,7 +14,7 @@ variables: branch: $(Build.SourceBranchName) commit_message: $(Build.SourceVersionMessage) test_cluster: minikube - test_kubectl_version: v1.15.0 + test_kubectl_version: v1.16.0 test_nsenter_version: 2.32 test_helm_version: v2.16.3 test_minikube_version: v1.2.0 diff --git a/.travis.yml b/.travis.yml index 41bf79af771..04213ea2c41 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ env: - DOCKER_ORG=strimzici - DOCKER_REGISTRY=docker.io - TEST_CLUSTER=minikube - - TEST_KUBECTL_VERSION=v1.15.0 + - TEST_KUBECTL_VERSION=v1.16.0 - TEST_NSENTER_VERSION=2.32 - TEST_HELM_VERSION=v2.16.3 - TEST_MINIKUBE_VERSION=v1.2.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d31c831ed8..51ce8f2d025 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,13 @@ ## 0.19.0 +* Add support for scale subresource to make scaling of following resources easier: + * KafkaConnect + * KafkaConnectS2I + * KafkaBridge + * KafkaMirrorMaker + * KafkaMirrorMaker2 + * KafkaConnector * Remove deprecated `Kafka.spec.topicOperator` classes and deployment logic * Use Java 11 as the Java runtime diff --git a/api/src/main/java/io/strimzi/api/kafka/model/Kafka.java b/api/src/main/java/io/strimzi/api/kafka/model/Kafka.java index 774ecb11f93..d0195500a8d 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/Kafka.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/Kafka.java @@ -48,7 +48,7 @@ @Crd.Spec.Version(name = Kafka.V1ALPHA1, served = true, storage = false) }, subresources = @Crd.Spec.Subresources( - status = @Crd.Spec.Subresources.Status() + status = @Crd.Spec.Subresources.Status() ), additionalPrinterColumns = { @Crd.Spec.AdditionalPrinterColumn( diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaBridge.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaBridge.java index 3c3c11bb71e..7c9e3180ad6 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaBridge.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaBridge.java @@ -51,7 +51,12 @@ ) }, subresources = @Crd.Spec.Subresources( - status = @Crd.Spec.Subresources.Status() + status = @Crd.Spec.Subresources.Status(), + scale = @Crd.Spec.Subresources.Scale( + specReplicasPath = KafkaBridge.SPEC_REPLICAS_PATH, + statusReplicasPath = KafkaBridge.STATUS_REPLICAS_PATH, + labelSelectorPath = KafkaBridge.LABEL_SELECTOR_PATH + ) ), additionalPrinterColumns = { @Crd.Spec.AdditionalPrinterColumn( @@ -95,6 +100,9 @@ public class KafkaBridge extends CustomResource implements UnknownPropertyPreser public static final String CRD_NAME = RESOURCE_PLURAL + "." + RESOURCE_GROUP; public static final String SHORT_NAME = "kb"; public static final List RESOURCE_SHORTNAMES = singletonList(SHORT_NAME); + public static final String SPEC_REPLICAS_PATH = ".spec.replicas"; + public static final String STATUS_REPLICAS_PATH = ".status.replicas"; + public static final String LABEL_SELECTOR_PATH = ".status.selector"; private String apiVersion; private ObjectMeta metadata; diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnect.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnect.java index 32d46bcfb78..b3f7ae65448 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnect.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnect.java @@ -56,7 +56,12 @@ ) }, subresources = @Crd.Spec.Subresources( - status = @Crd.Spec.Subresources.Status() + status = @Crd.Spec.Subresources.Status(), + scale = @Crd.Spec.Subresources.Scale( + specReplicasPath = KafkaConnect.SPEC_REPLICAS_PATH, + statusReplicasPath = KafkaConnect.STATUS_REPLICAS_PATH, + labelSelectorPath = KafkaConnect.LABEL_SELECTOR_PATH + ) ), additionalPrinterColumns = { @Crd.Spec.AdditionalPrinterColumn( @@ -93,6 +98,9 @@ public class KafkaConnect extends CustomResource implements UnknownPropertyPrese public static final String CRD_NAME = RESOURCE_PLURAL + "." + RESOURCE_GROUP; public static final String SHORT_NAME = "kc"; public static final List RESOURCE_SHORTNAMES = singletonList(SHORT_NAME); + public static final String SPEC_REPLICAS_PATH = ".spec.replicas"; + public static final String STATUS_REPLICAS_PATH = ".status.replicas"; + public static final String LABEL_SELECTOR_PATH = ".status.selector"; private String apiVersion; private KafkaConnectSpec spec; diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnectS2I.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnectS2I.java index ba4af079e1a..0437e7909d5 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnectS2I.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnectS2I.java @@ -59,7 +59,12 @@ ) }, subresources = @Crd.Spec.Subresources( - status = @Crd.Spec.Subresources.Status() + status = @Crd.Spec.Subresources.Status(), + scale = @Crd.Spec.Subresources.Scale( + specReplicasPath = KafkaConnectS2I.SPEC_REPLICAS_PATH, + statusReplicasPath = KafkaConnectS2I.STATUS_REPLICAS_PATH, + labelSelectorPath = KafkaConnectS2I.LABEL_SELECTOR_PATH + ) ), additionalPrinterColumns = { @Crd.Spec.AdditionalPrinterColumn( @@ -96,6 +101,9 @@ public class KafkaConnectS2I extends CustomResource implements UnknownPropertyPr public static final String CRD_NAME = RESOURCE_PLURAL + "." + RESOURCE_GROUP; public static final String SHORT_NAME = "kcs2i"; public static final List RESOURCE_SHORTNAMES = singletonList(SHORT_NAME); + public static final String SPEC_REPLICAS_PATH = ".spec.replicas"; + public static final String STATUS_REPLICAS_PATH = ".status.replicas"; + public static final String LABEL_SELECTOR_PATH = ".status.selector"; private String apiVersion; private ObjectMeta metadata; diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnector.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnector.java index da44a79acd9..cbcd1f05b80 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnector.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaConnector.java @@ -46,7 +46,11 @@ storage = true) }, subresources = @Crd.Spec.Subresources( - status = @Crd.Spec.Subresources.Status() + status = @Crd.Spec.Subresources.Status(), + scale = @Crd.Spec.Subresources.Scale( + specReplicasPath = KafkaConnector.SPEC_REPLICAS_PATH, + statusReplicasPath = KafkaConnector.STATUS_REPLICAS_PATH + ) ) ) ) @@ -71,6 +75,8 @@ public class KafkaConnector extends CustomResource implements UnknownPropertyPre public static final String RESOURCE_KIND = "KafkaConnector"; public static final String RESOURCE_LIST_KIND = RESOURCE_KIND + "List"; public static final String SHORT_NAME = "kctr"; + public static final String SPEC_REPLICAS_PATH = ".spec.tasksMax"; + public static final String STATUS_REPLICAS_PATH = ".status.tasksMax"; private KafkaConnectorSpec spec; private KafkaConnectorStatus status; diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker.java index a68ecebda1b..e73bb3265e2 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker.java @@ -56,7 +56,12 @@ ) }, subresources = @Crd.Spec.Subresources( - status = @Crd.Spec.Subresources.Status() + status = @Crd.Spec.Subresources.Status(), + scale = @Crd.Spec.Subresources.Scale( + specReplicasPath = KafkaMirrorMaker.SPEC_REPLICAS_PATH, + statusReplicasPath = KafkaMirrorMaker.STATUS_REPLICAS_PATH, + labelSelectorPath = KafkaMirrorMaker.LABEL_SELECTOR_PATH + ) ), additionalPrinterColumns = { @Crd.Spec.AdditionalPrinterColumn( @@ -108,6 +113,9 @@ public class KafkaMirrorMaker extends CustomResource implements UnknownPropertyP public static final String CRD_NAME = RESOURCE_PLURAL + "." + RESOURCE_GROUP; public static final String SHORT_NAME = "kmm"; public static final List RESOURCE_SHORTNAMES = singletonList(SHORT_NAME); + public static final String SPEC_REPLICAS_PATH = ".spec.replicas"; + public static final String STATUS_REPLICAS_PATH = ".status.replicas"; + public static final String LABEL_SELECTOR_PATH = ".status.selector"; private String apiVersion; private ObjectMeta metadata; diff --git a/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2.java b/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2.java index c1de162b96c..4e769d93b4a 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2.java @@ -50,7 +50,12 @@ ) }, subresources = @Crd.Spec.Subresources( - status = @Crd.Spec.Subresources.Status() + status = @Crd.Spec.Subresources.Status(), + scale = @Crd.Spec.Subresources.Scale( + specReplicasPath = KafkaMirrorMaker2.SPEC_REPLICAS_PATH, + statusReplicasPath = KafkaMirrorMaker2.STATUS_REPLICAS_PATH, + labelSelectorPath = KafkaMirrorMaker2.LABEL_SELECTOR_PATH + ) ), additionalPrinterColumns = { @Crd.Spec.AdditionalPrinterColumn( @@ -87,6 +92,9 @@ public class KafkaMirrorMaker2 extends CustomResource implements UnknownProperty public static final String CRD_NAME = RESOURCE_PLURAL + "." + RESOURCE_GROUP; public static final String SHORT_NAME = "kmm2"; public static final List RESOURCE_SHORTNAMES = singletonList(SHORT_NAME); + public static final String SPEC_REPLICAS_PATH = ".spec.replicas"; + public static final String STATUS_REPLICAS_PATH = ".status.replicas"; + public static final String LABEL_SELECTOR_PATH = ".status.selector"; private String apiVersion; private KafkaMirrorMaker2Spec spec; diff --git a/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaBridgeStatus.java b/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaBridgeStatus.java index bd6e65dca45..6c1d8730ee9 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaBridgeStatus.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaBridgeStatus.java @@ -6,8 +6,10 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.fabric8.kubernetes.api.model.LabelSelector; import io.strimzi.api.kafka.model.Constants; import io.strimzi.crdgenerator.annotations.Description; +import io.strimzi.crdgenerator.annotations.KubeLink; import io.sundr.builder.annotations.Buildable; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -27,6 +29,8 @@ public class KafkaBridgeStatus extends Status { private static final long serialVersionUID = 1L; private String url; + private int replicas; + private LabelSelector podSelector; @Description("The URL at which external client applications can access the Kafka Bridge.") public String getUrl() { @@ -36,4 +40,25 @@ public String getUrl() { public void setUrl(String url) { this.url = url; } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("The current number of pods being used to provide this resource.") + public int getReplicas() { + return replicas; + } + + public void setReplicas(int replicas) { + this.replicas = replicas; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @KubeLink(group = "meta", version = "v1", kind = "labelselector") + @Description("Label selector for pods providing this resource.") + public LabelSelector getPodSelector() { + return podSelector; + } + + public void setPodSelector(LabelSelector podSelector) { + this.podSelector = podSelector; + } } diff --git a/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaConnectStatus.java b/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaConnectStatus.java index 73f18a8c889..f88771f584b 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaConnectStatus.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaConnectStatus.java @@ -6,9 +6,11 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.fabric8.kubernetes.api.model.LabelSelector; import io.strimzi.api.kafka.model.Constants; import io.strimzi.api.kafka.model.connect.ConnectorPlugin; import io.strimzi.crdgenerator.annotations.Description; +import io.strimzi.crdgenerator.annotations.KubeLink; import io.sundr.builder.annotations.Buildable; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -31,6 +33,8 @@ public class KafkaConnectStatus extends Status { private String url; private List connectorPlugins; + private int replicas; + private LabelSelector podSelector; @Description("The URL of the REST API endpoint for managing and monitoring Kafka Connect connectors.") public String getUrl() { @@ -50,4 +54,25 @@ public List getConnectorPlugins() { public void setConnectorPlugins(List connectorPlugins) { this.connectorPlugins = connectorPlugins; } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("The current number of pods being used to provide this resource.") + public int getReplicas() { + return replicas; + } + + public void setReplicas(int replicas) { + this.replicas = replicas; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @KubeLink(group = "meta", version = "v1", kind = "labelselector") + @Description("Label selector for pods providing this resource.") + public LabelSelector getPodSelector() { + return podSelector; + } + + public void setPodSelector(LabelSelector podSelector) { + this.podSelector = podSelector; + } } diff --git a/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaConnectorStatus.java b/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaConnectorStatus.java index c5f68f2c67a..1f3816534ed 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaConnectorStatus.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaConnectorStatus.java @@ -29,6 +29,7 @@ public class KafkaConnectorStatus extends Status { private static final long serialVersionUID = 1L; private Map connectorStatus; + private int tasksMax; @JsonInclude(JsonInclude.Include.NON_NULL) @Description("The connector status, as reported by the Kafka Connect REST API.") @@ -39,4 +40,14 @@ public Map getConnectorStatus() { public void setConnectorStatus(Map connectorStatus) { this.connectorStatus = connectorStatus; } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("The maximum number of tasks for the Kafka Connector.") + public int getTasksMax() { + return tasksMax; + } + + public void setTasksMax(int tasksMax) { + this.tasksMax = tasksMax; + } } diff --git a/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaMirrorMakerStatus.java b/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaMirrorMakerStatus.java index 6929d1ec6c0..fa5c5635cb7 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaMirrorMakerStatus.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/status/KafkaMirrorMakerStatus.java @@ -6,7 +6,10 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.fabric8.kubernetes.api.model.LabelSelector; import io.strimzi.api.kafka.model.Constants; +import io.strimzi.crdgenerator.annotations.Description; +import io.strimzi.crdgenerator.annotations.KubeLink; import io.sundr.builder.annotations.Buildable; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -24,4 +27,28 @@ @ToString(callSuper = true) public class KafkaMirrorMakerStatus extends Status { private static final long serialVersionUID = 1L; + + private int replicas; + private LabelSelector podSelector; + + @JsonInclude(JsonInclude.Include.NON_NULL) + @Description("The current number of pods being used to provide this resource.") + public int getReplicas() { + return replicas; + } + + public void setReplicas(int replicas) { + this.replicas = replicas; + } + + @JsonInclude(JsonInclude.Include.NON_NULL) + @KubeLink(group = "meta", version = "v1", kind = "labelselector") + @Description("Label selector for pods providing this resource.") + public LabelSelector getPodSelector() { + return podSelector; + } + + public void setPodSelector(LabelSelector podSelector) { + this.podSelector = podSelector; + } } diff --git a/api/src/test/java/io/strimzi/api/kafka/model/AbstractCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/AbstractCrdIT.java index cca8601830a..57413d54271 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/AbstractCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/AbstractCrdIT.java @@ -4,6 +4,7 @@ */ package io.strimzi.api.kafka.model; +import com.fasterxml.jackson.databind.JsonNode; import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.VersionInfo; @@ -32,12 +33,15 @@ protected void assumeKube1_11Plus() { && Integer.parseInt(version.getMinor().split("\\D")[0]) >= 11); } - protected void createDelete(Class resourceClass, String resource) { + private T loadResource(Class resourceClass, String resource) { String ssStr = TestUtils.readResource(resourceClass, resource); assertThat("Class path resource " + resource + " was missing", ssStr, is(notNullValue())); createDelete(ssStr); - T model = TestUtils.fromYaml(resource, resourceClass, false); + return TestUtils.fromYaml(resource, resourceClass, false); + } + protected void createDelete(Class resourceClass, String resource) { + T model = loadResource(resourceClass, resource); String modelStr = TestUtils.toYamlString(model); assertDoesNotThrow(() -> createDelete(modelStr), "Create delete failed after first round-trip -- maybe a problem with a defaulted value?\nApplied string: " + modelStr); } @@ -68,6 +72,41 @@ private void createDelete(String ssStr) { } } + protected void createScaleDelete(Class resourceClass, String resource) { + T model = loadResource(resourceClass, resource); + String modelKind = model.getKind(); + String modelName = model.getMetadata().getName(); + String modelStr = TestUtils.toYamlString(model); + createScaleDelete(modelKind, modelName, modelStr); + } + + private void createScaleDelete(String kind, String name, String ssStr) { + RuntimeException creationOrScaleException = null; + RuntimeException deletionException = null; + try { + try { + cmdKubeClient().applyContent(ssStr); + cmdKubeClient().scaleByName(kind, name, 10); + } catch (RuntimeException t) { + creationOrScaleException = t; + } + } finally { + try { + cmdKubeClient().deleteContent(ssStr); + } catch (RuntimeException t) { + deletionException = t; + } + } + if (creationOrScaleException != null) { + if (deletionException != null) { + creationOrScaleException.addSuppressed(deletionException); + } + throw creationOrScaleException; + } else if (deletionException != null) { + throw deletionException; + } + } + protected void assertMissingRequiredPropertiesMessage(String message, String... requiredProperties) { for (String requiredProperty: requiredProperties) { assertThat("Could not find" + requiredProperty + " in message: " + message, message, anyOf( @@ -77,6 +116,19 @@ protected void assertMissingRequiredPropertiesMessage(String message, String... } } + protected void waitForCrd(String resource, String name) { + cluster.cmdClient().waitFor(resource, name, crd -> { + JsonNode json = (JsonNode) crd; + if (json != null + && json.hasNonNull("status") + && json.get("status").hasNonNull("conditions")) { + return true; + } + + return false; + }); + } + @BeforeEach public void setupTests() { cluster.before(); diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaBridgeCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaBridgeCrdIT.java index f1f964cb277..054218d740d 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaBridgeCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaBridgeCrdIT.java @@ -31,6 +31,11 @@ void testKafkaBridgeV1alpha1() { createDelete(KafkaBridge.class, "KafkaBridgeV1alpha1.yaml"); } + @Test + void testKafkaBridgeScaling() { + createScaleDelete(KafkaBridge.class, "KafkaBridge.yaml"); + } + @Test void testKafkaBridgeMinimal() { createDelete(KafkaBridge.class, "KafkaBridge-minimal.yaml"); @@ -109,7 +114,7 @@ void testKafkaBridgeWithMissingTracingType() { void setupEnvironment() { cluster.createNamespace(NAMESPACE); cluster.createCustomResources(TestUtils.CRD_KAFKA_BRIDGE); - cluster.cmdClient().waitForResourceCreation("crd", "kafkabridges.kafka.strimzi.io"); + waitForCrd("crd", "kafkabridges.kafka.strimzi.io"); } @AfterAll diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaConnectCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaConnectCrdIT.java index a4b56b887d9..15bc4bb2d0c 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaConnectCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaConnectCrdIT.java @@ -29,6 +29,11 @@ void testKafkaConnectV1alpha1() { createDelete(KafkaConnect.class, "KafkaConnectV1alpha1.yaml"); } + @Test + void testKafkaConnectScaling() { + createScaleDelete(KafkaConnect.class, "KafkaConnect.yaml"); + } + @Test void testKafkaConnectV1beta1() { createDelete(KafkaConnect.class, "KafkaConnectV1beta1.yaml"); @@ -110,7 +115,7 @@ public void testKafkaConnectWithInvalidExternalConfiguration() { void setupEnvironment() { cluster.createNamespace(NAMESPACE); cluster.createCustomResources(TestUtils.CRD_KAFKA_CONNECT); - cluster.cmdClient().waitForResourceCreation("crd", "kafkaconnects.kafka.strimzi.io"); + waitForCrd("crd", "kafkaconnects.kafka.strimzi.io"); } @AfterAll diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaConnectorCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaConnectorCrdIT.java new file mode 100644 index 00000000000..a0f97b6c66e --- /dev/null +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaConnectorCrdIT.java @@ -0,0 +1,43 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.api.kafka.model; + +import io.strimzi.test.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * The purpose of this test is to confirm that we can create a + * resource from the POJOs, serialize it and create the resource in K8S. + * I.e. that such instance resources obtained from POJOs are valid according to the schema + * validation done by K8S. + */ +public class KafkaConnectorCrdIT extends AbstractCrdIT { + public static final String NAMESPACE = "kafkaconnector-crd-it"; + + @Test + void testKafkaConnector() { + createDelete(KafkaConnector.class, "KafkaConnector.yaml"); + } + + @Test + void testKafkaConnectorScaling() { + createScaleDelete(KafkaConnector.class, "KafkaConnector.yaml"); + } + + @BeforeAll + void setupEnvironment() { + cluster.createNamespace(NAMESPACE); + cluster.createCustomResources(TestUtils.CRD_KAFKA_CONNECTOR); + waitForCrd("crd", "kafkaconnectors.kafka.strimzi.io"); + } + + @AfterAll + void teardownEnvironment() { + cluster.deleteCustomResources(); + cluster.deleteNamespaces(); + } +} diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java index 49a21aae679..ea244a237c6 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaCrdIT.java @@ -32,6 +32,11 @@ void testKafkaV1alpha1() { createDelete(Kafka.class, "KafkaV1alpha1.yaml"); } + @Test + void testKafkaIsNotScaling() { + assertThrows(KubeClusterException.class, () -> createScaleDelete(Kafka.class, "Kafka.yaml")); + } + @Test void testKafkaV1Beta1() { createDelete(Kafka.class, "KafkaV1beta1.yaml"); @@ -174,7 +179,7 @@ void testJmxOptionsWithoutRequiredQueryKeys() { void setupEnvironment() { cluster.createNamespace(NAMESPACE); cluster.createCustomResources(TestUtils.CRD_KAFKA); - cluster.cmdClient().waitForResourceCreation("crd", "kafkas.kafka.strimzi.io"); + waitForCrd("crd", "kafkas.kafka.strimzi.io"); } @AfterAll diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2CrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2CrdIT.java index be55e41ec06..72d569021f7 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2CrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaMirrorMaker2CrdIT.java @@ -30,6 +30,11 @@ void testKafkaMirrorMaker2V1alpha1() { createDelete(KafkaMirrorMaker2.class, "KafkaMirrorMaker2V1alpha1.yaml"); } + @Test + void testKafkaMirrorMaker2Scaling() { + createScaleDelete(KafkaMirrorMaker2.class, "KafkaMirrorMaker2.yaml"); + } + @Test void testKafkaMirrorMaker2Minimal() { createDelete(KafkaMirrorMaker2.class, "KafkaMirrorMaker2-minimal.yaml"); @@ -106,7 +111,7 @@ public void testKafkaMirrorMaker2WithInvalidExternalConfiguration() { void setupEnvironment() { cluster.createNamespace(NAMESPACE); cluster.createCustomResources(TestUtils.CRD_KAFKA_MIRROR_MAKER_2); - cluster.cmdClient().waitForResourceCreation("crd", "kafkamirrormaker2s.kafka.strimzi.io"); + waitForCrd("crd", "kafkamirrormaker2s.kafka.strimzi.io"); } @AfterAll diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaMirrorMakerCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaMirrorMakerCrdIT.java index 5c70286b23a..5b381253e29 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaMirrorMakerCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaMirrorMakerCrdIT.java @@ -28,6 +28,11 @@ void testKafkaMirrorMakerV1alpha1() { createDelete(KafkaMirrorMaker.class, "KafkaMirrorMakerV1alpha1.yaml"); } + @Test + void testKafkaMirrorMakerScaling() { + createScaleDelete(KafkaMirrorMaker.class, "KafkaMirrorMaker.yaml"); + } + @Test void testKafkaMirrorMakerV1beta1() { createDelete(KafkaMirrorMaker.class, "KafkaMirrorMakerV1beta1.yaml"); @@ -98,7 +103,7 @@ void testKafkaMirrorMakerWithCommitAndAbort() { void setupEnvironment() { cluster.createNamespace(NAMESPACE); cluster.createCustomResources(TestUtils.CRD_KAFKA_MIRROR_MAKER); - cluster.cmdClient().waitForResourceCreation("crd", "kafkamirrormakers.kafka.strimzi.io"); + waitForCrd("crd", "kafkamirrormakers.kafka.strimzi.io"); } @AfterAll diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaRebalanceCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaRebalanceCrdIT.java index f080265d65c..83f0a4bcdc0 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaRebalanceCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaRebalanceCrdIT.java @@ -5,10 +5,13 @@ package io.strimzi.api.kafka.model; import io.strimzi.test.TestUtils; +import io.strimzi.test.k8s.exceptions.KubeClusterException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertThrows; + /** * The purpose of this test is to confirm that we can create a * resource from the POJOs, serialize it and create the resource in K8S. @@ -19,6 +22,11 @@ public class KafkaRebalanceCrdIT extends AbstractCrdIT { public static final String NAMESPACE = "kafkarebalance-crd-it"; + @Test + void testKafkaRebalanceIsNotScaling() { + assertThrows(KubeClusterException.class, () -> createScaleDelete(KafkaRebalance.class, "KafkaRebalance.yaml")); + } + @Test void testKafkaRebalanceMinimal() { createDelete(KafkaRebalance.class, "KafkaRebalance-minimal.yaml"); @@ -38,6 +46,7 @@ void testKafkaRebalanceWithGoalsSkipHardGoalCheck() { void setupEnvironment() { cluster.createNamespace(NAMESPACE); cluster.createCustomResources(TestUtils.CRD_KAFKA_REBALANCE); + waitForCrd("crd", "kafkarebalances.kafka.strimzi.io"); } @AfterAll diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaTopicCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaTopicCrdIT.java index f492c660ddb..25ea2c6d583 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaTopicCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaTopicCrdIT.java @@ -27,6 +27,12 @@ void testKafkaTopicV1alpha1() { assumeKube1_11Plus(); createDelete(KafkaTopic.class, "KafkaTopicV1alpha1.yaml"); } + + @Test + void testKafkaTopicIsNotScaling() { + assertThrows(KubeClusterException.class, () -> createScaleDelete(KafkaTopic.class, "KafkaTopic.yaml")); + } + @Test void testKafkaTopicV1beta1() { createDelete(KafkaTopic.class, "KafkaTopicV1beta1.yaml"); @@ -55,7 +61,7 @@ void testKafkaTopicWithMissingProperty() { void setupEnvironment() { cluster.createNamespace(NAMESPACE); cluster.createCustomResources(TestUtils.CRD_TOPIC); - cluster.cmdClient().waitForResourceCreation("crd", "kafkatopics.kafka.strimzi.io"); + waitForCrd("crd", "kafkatopics.kafka.strimzi.io"); } @AfterAll diff --git a/api/src/test/java/io/strimzi/api/kafka/model/KafkaUserCrdIT.java b/api/src/test/java/io/strimzi/api/kafka/model/KafkaUserCrdIT.java index 04c2324e804..1f61fe491df 100644 --- a/api/src/test/java/io/strimzi/api/kafka/model/KafkaUserCrdIT.java +++ b/api/src/test/java/io/strimzi/api/kafka/model/KafkaUserCrdIT.java @@ -5,10 +5,13 @@ package io.strimzi.api.kafka.model; import io.strimzi.test.TestUtils; +import io.strimzi.test.k8s.exceptions.KubeClusterException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertThrows; + /** * The purpose of this test is to confirm that we can create a * resource from the POJOs, serialize it and create the resource in K8S. @@ -24,6 +27,11 @@ void testKafkaUserV1alpha1() { createDelete(KafkaUser.class, "KafkaUserV1alpha1.yaml"); } + @Test + void testKafkaUserIsNotScaling() { + assertThrows(KubeClusterException.class, () -> createScaleDelete(KafkaUser.class, "KafkaUser.yaml")); + } + @Test void testKafkaUserV1beta1() { createDelete(KafkaUser.class, "KafkaUserV1beta1.yaml"); @@ -43,7 +51,7 @@ void testKafkaUserWithExtraProperty() { void setupEnvironment() { cluster.createNamespace(NAMESPACE); cluster.createCustomResources(TestUtils.CRD_KAFKA_USER); - cluster.cmdClient().waitForResourceCreation("crd", "kafkausers.kafka.strimzi.io"); + waitForCrd("crd", "kafkausers.kafka.strimzi.io"); } @AfterAll diff --git a/api/src/test/resources/io/strimzi/api/kafka/model/KafkaConnector.out.yaml b/api/src/test/resources/io/strimzi/api/kafka/model/KafkaConnector.out.yaml index dd5cc196c61..db936233d87 100644 --- a/api/src/test/resources/io/strimzi/api/kafka/model/KafkaConnector.out.yaml +++ b/api/src/test/resources/io/strimzi/api/kafka/model/KafkaConnector.out.yaml @@ -25,4 +25,5 @@ status: trace: "org.apache.kafka.common.errors.RecordTooLargeException\n" - id: 1 state: "PAUSED" - worker_id: "whatever:8083" \ No newline at end of file + worker_id: "whatever:8083" + tasksMax: 0 \ No newline at end of file diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java index 7f6fe4f4224..ceb44b7d60a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java @@ -566,12 +566,36 @@ Future maybeUpdateConnectorStatus(Reconciliation reconciliation, KafkaConn StatusUtils.setStatusConditionAndObservedGeneration(connector, status, error != null ? Future.failedFuture(error) : Future.succeededFuture()); status.setConnectorStatus(statusResult); + status.setTasksMax(getActualTaskCount(connector, statusResult)); + return maybeUpdateStatusCommon(connectorOperator, connector, reconciliation, status, (connector1, status1) -> { return new KafkaConnectorBuilder(connector1).withStatus(status1).build(); }); } + /** + * The tasksMax are mirrored in the KafkaConnector.status where they are used by the scale subresource. + * However, .spec.tasksMax is not always set and has no default value in Strimzi (only in Kafka Connect). So when + * it is not set, we try to count the tasks from the status. And if these are missing as well, we just set it to 0. + * + * @param connector The KafkaConnector instance of the reconciled connector + * @param statusResult The status from the Connect REST API + * @return Number of tasks which should be set in the status + */ + protected int getActualTaskCount(KafkaConnector connector, Map statusResult) { + if (connector.getSpec() != null + && connector.getSpec().getTasksMax() != null) { + return connector.getSpec().getTasksMax(); + } else if (statusResult != null + && statusResult.containsKey("tasks") + && statusResult.get("tasks") instanceof List) { + return ((List) statusResult.get("tasks")).size(); + } else { + return 0; + } + } + protected JsonObject asJson(KafkaConnectorSpec spec) { JsonObject connectorConfigJson = new JsonObject(); if (spec.getConfig() != null) { diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java index 60156c48bce..0e2bcdf7241 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java @@ -5,6 +5,7 @@ package io.strimzi.operator.cluster.operator.assembly; import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.fabric8.kubernetes.api.model.ServiceAccount; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.Resource; @@ -112,6 +113,9 @@ protected Future createOrUpdate(Reconciliation reconciliation, KafkaBridge kafkaBridgeStatus.setUrl(KafkaBridgeResources.url(bridge.getCluster(), namespace, port)); } + kafkaBridgeStatus.setReplicas(bridge.getReplicas()); + kafkaBridgeStatus.setPodSelector(new LabelSelectorBuilder().withMatchLabels(bridge.getSelectorLabels().toMap()).build()); + updateStatus(assemblyResource, reconciliation, kafkaBridgeStatus).onComplete(statusResult -> { // If both features succeeded, createOrUpdate succeeded as well // If one or both of them failed, we prefer the reconciliation failure as the main error diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java index 6a147af11a4..93e9a29a50a 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java @@ -5,6 +5,7 @@ package io.strimzi.operator.cluster.operator.assembly; import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.fabric8.kubernetes.api.model.ServiceAccount; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.Resource; @@ -154,6 +155,9 @@ protected Future createOrUpdate(Reconciliation reconciliation, KafkaConnec kafkaConnectStatus.setUrl(KafkaConnectResources.url(connect.getCluster(), namespace, KafkaConnectCluster.REST_API_PORT)); } + kafkaConnectStatus.setReplicas(connect.getReplicas()); + kafkaConnectStatus.setPodSelector(new LabelSelectorBuilder().withMatchLabels(connect.getSelectorLabels().toMap()).build()); + this.maybeUpdateStatusCommon(resourceOperator, kafkaConnect, reconciliation, kafkaConnectStatus, (connect1, status) -> new KafkaConnectBuilder(connect1).withStatus(status).build()).onComplete(statusResult -> { // If both features succeeded, createOrUpdate succeeded as well diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectS2IAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectS2IAssemblyOperator.java index e7a2e0d4bd4..b62ba634841 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectS2IAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectS2IAssemblyOperator.java @@ -5,6 +5,7 @@ package io.strimzi.operator.cluster.operator.assembly; import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.fabric8.kubernetes.api.model.ServiceAccount; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.Resource; @@ -164,6 +165,9 @@ public Future createOrUpdate(Reconciliation reconciliation, KafkaConnectS2 kafkaConnectS2Istatus.setBuildConfigName(KafkaConnectS2IResources.buildConfigName(connect.getCluster())); } + kafkaConnectS2Istatus.setReplicas(connect.getReplicas()); + kafkaConnectS2Istatus.setPodSelector(new LabelSelectorBuilder().withMatchLabels(connect.getSelectorLabels().toMap()).build()); + updateStatus(kafkaConnectS2I, reconciliation, kafkaConnectS2Istatus).onComplete(statusResult -> { // If both features succeeded, createOrUpdate succeeded as well // If one or both of them failed, we prefer the reconciliation failure as the main error diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java index 5c51fd55ea1..ea118ccf670 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java @@ -13,6 +13,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.strimzi.operator.common.Annotations; import io.strimzi.operator.common.operator.resource.NetworkPolicyOperator; @@ -160,6 +161,9 @@ protected Future createOrUpdate(Reconciliation reconciliation, KafkaMirror StatusUtils.setStatusConditionAndObservedGeneration(kafkaMirrorMaker2, kafkaMirrorMaker2Status, reconciliationResult); kafkaMirrorMaker2Status.setUrl(KafkaMirrorMaker2Resources.url(mirrorMaker2Cluster.getCluster(), namespace, KafkaMirrorMaker2Cluster.REST_API_PORT)); + kafkaMirrorMaker2Status.setReplicas(mirrorMaker2Cluster.getReplicas()); + kafkaMirrorMaker2Status.setPodSelector(new LabelSelectorBuilder().withMatchLabels(mirrorMaker2Cluster.getSelectorLabels().toMap()).build()); + this.maybeUpdateStatusCommon(resourceOperator, kafkaMirrorMaker2, reconciliation, kafkaMirrorMaker2Status, (mirrormaker2, status) -> new KafkaMirrorMaker2Builder(mirrormaker2).withStatus(status).build()).onComplete(statusResult -> { // If both features succeeded, createOrUpdate succeeded as well diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java index 27dc1586b53..3c10e0dac25 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperator.java @@ -5,6 +5,7 @@ package io.strimzi.operator.cluster.operator.assembly; import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.fabric8.kubernetes.api.model.ServiceAccount; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.Resource; @@ -106,6 +107,9 @@ protected Future createOrUpdate(Reconciliation reconciliation, KafkaMirror .onComplete(reconciliationResult -> { StatusUtils.setStatusConditionAndObservedGeneration(assemblyResource, kafkaMirrorMakerStatus, reconciliationResult); + kafkaMirrorMakerStatus.setReplicas(mirror.getReplicas()); + kafkaMirrorMakerStatus.setPodSelector(new LabelSelectorBuilder().withMatchLabels(mirror.getSelectorLabels().toMap()).build()); + updateStatus(assemblyResource, reconciliation, kafkaMirrorMakerStatus).onComplete(statusResult -> { // If both features succeeded, createOrUpdate succeeded as well // If one or both of them failed, we prefer the reconciliation failure as the main error diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java index ddd69e1c793..8a9ff74df0e 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperatorTest.java @@ -180,6 +180,8 @@ public void testCreateOrUpdateCreatesCluster(VertxTestContext context) { // Verify status List capturedStatuses = bridgeCaptor.getAllValues(); assertThat(capturedStatuses.get(0).getStatus().getUrl(), is("http://foo-bridge-service.test.svc:8080")); + assertThat(capturedStatuses.get(0).getStatus().getReplicas(), is(bridge.getReplicas())); + assertThat(capturedStatuses.get(0).getStatus().getPodSelector().getMatchLabels(), is(bridge.getSelectorLabels().toMap())); assertThat(capturedStatuses.get(0).getStatus().getConditions().get(0).getStatus(), is("True")); assertThat(capturedStatuses.get(0).getStatus().getConditions().get(0).getType(), is("Ready")); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperatorTest.java index e30c8f20bdd..b31b0760151 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperatorTest.java @@ -191,6 +191,8 @@ public void createKafkaConnectCluster(VertxTestContext context, KafkaConnect clu KafkaConnectStatus connectStatus = capturedConnects.get(0).getStatus(); assertThat(connectStatus.getUrl(), is("http://foo-connect-api.test.svc:8083")); + assertThat(connectStatus.getReplicas(), is(connect.getReplicas())); + assertThat(connectStatus.getPodSelector().getMatchLabels(), is(connect.getSelectorLabels().toMap())); assertThat(connectStatus.getConditions().get(0).getStatus(), is("True")); assertThat(connectStatus.getConditions().get(0).getType(), is("Ready")); @@ -851,6 +853,8 @@ public void assertCreateClusterWitDuplicateOlderConnect(VertxTestContext context assertThat(capturedConnects, hasSize(1)); KafkaConnectStatus connectStatus = capturedConnects.get(0).getStatus(); assertThat(connectStatus.getUrl(), is("http://foo-connect-api.test.svc:8083")); + assertThat(connectStatus.getReplicas(), is(connect.getReplicas())); + assertThat(connectStatus.getPodSelector().getMatchLabels(), is(connect.getSelectorLabels().toMap())); assertThat(connectStatus.getConditions().get(0).getStatus(), is("True")); assertThat(connectStatus.getConditions().get(0).getType(), is("Ready")); if (connectorOperator) { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectorIT.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectorIT.java index 3c70507e793..ea1e11cbfaf 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectorIT.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectorIT.java @@ -223,6 +223,7 @@ private void assertConnectorIsRunning(VertxTestContext context, KubernetesClient KafkaConnector kafkaConnector = Crds.kafkaConnectorOperation(client).inNamespace(namespace).withName(connectorName).get(); assertThat(kafkaConnector, notNullValue()); assertThat(kafkaConnector.getStatus(), notNullValue()); + assertThat(kafkaConnector.getStatus().getTasksMax(), is(1)); assertThat(kafkaConnector.getStatus().getConnectorStatus(), notNullValue()); assertThat(kafkaConnector.getStatus().getConnectorStatus().get("connector"), instanceOf(Map.class)); assertThat(((Map) kafkaConnector.getStatus().getConnectorStatus().get("connector")).get("state"), is("RUNNING")); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperatorTest.java index 2af11895e64..4bd515ae726 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperatorTest.java @@ -167,6 +167,8 @@ public void testCreateCluster(VertxTestContext context) { // Verify status List capturedMirrorMaker2s = mirrorMaker2Captor.getAllValues(); assertThat(capturedMirrorMaker2s.get(0).getStatus().getUrl(), is("http://foo-mirrormaker2-api.test.svc:8083")); + assertThat(capturedMirrorMaker2s.get(0).getStatus().getReplicas(), is(mirrorMaker2.getReplicas())); + assertThat(capturedMirrorMaker2s.get(0).getStatus().getPodSelector().getMatchLabels(), is(mirrorMaker2.getSelectorLabels().toMap())); assertThat(capturedMirrorMaker2s.get(0).getStatus().getConditions().get(0).getStatus(), is("True")); assertThat(capturedMirrorMaker2s.get(0).getStatus().getConditions().get(0).getType(), is("Ready")); async.flag(); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperatorTest.java index 3681e362fd3..b1ec2d86667 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMakerAssemblyOperatorTest.java @@ -173,6 +173,8 @@ public void testCreateCluster(VertxTestContext context) { List capturedMM = statusCaptor.getAllValues(); assertThat(capturedMM, hasSize(1)); KafkaMirrorMaker mm = capturedMM.get(0); + assertThat(mm.getStatus().getReplicas(), is(mirror.getReplicas())); + assertThat(mm.getStatus().getPodSelector().getMatchLabels(), is(mirror.getSelectorLabels().toMap())); assertThat(mm.getStatus().getConditions().get(0).getType(), is("Ready")); assertThat(mm.getStatus().getConditions().get(0).getStatus(), is("True")); diff --git a/crd-generator/src/main/java/io/strimzi/crdgenerator/CrdGenerator.java b/crd-generator/src/main/java/io/strimzi/crdgenerator/CrdGenerator.java index 384d714818d..4d84c4bba95 100644 --- a/crd-generator/src/main/java/io/strimzi/crdgenerator/CrdGenerator.java +++ b/crd-generator/src/main/java/io/strimzi/crdgenerator/CrdGenerator.java @@ -254,11 +254,31 @@ private ObjectNode buildSpec(Crd.Spec crd, Class crdCl result.set("additionalPrinterColumns", cols); } if (crd.subresources().status().length != 0) { - ObjectNode statusNode = nf.objectNode(); - if (crd.subresources().status().length > 0) { - statusNode.set("status", nf.objectNode()); + ObjectNode subresources = nf.objectNode(); + + if (crd.subresources().status().length == 1) { + subresources.set("status", nf.objectNode()); + } else if (crd.subresources().status().length > 1) { + throw new RuntimeException("Each custom resource definition can have only one status sub-resource."); + } + + if (crd.subresources().scale().length == 1) { + Crd.Spec.Subresources.Scale scale = crd.subresources().scale()[0]; + + ObjectNode scaleNode = nf.objectNode(); + scaleNode.put("specReplicasPath", scale.specReplicasPath()); + scaleNode.put("statusReplicasPath", scale.statusReplicasPath()); + + if (!scale.labelSelectorPath().isEmpty()) { + scaleNode.put("labelSelectorPath", scale.labelSelectorPath()); + } + + subresources.set("scale", scaleNode); + } else if (crd.subresources().scale().length > 1) { + throw new RuntimeException("Each custom resource definition can have only one scale sub-resource."); } - result.set("subresources", statusNode); + + result.set("subresources", subresources); } result.set("validation", buildValidation(crdClass)); return result; diff --git a/crd-generator/src/main/java/io/strimzi/crdgenerator/annotations/Crd.java b/crd-generator/src/main/java/io/strimzi/crdgenerator/annotations/Crd.java index dad1c48436c..0cccda495d5 100644 --- a/crd-generator/src/main/java/io/strimzi/crdgenerator/annotations/Crd.java +++ b/crd-generator/src/main/java/io/strimzi/crdgenerator/annotations/Crd.java @@ -4,6 +4,8 @@ */ package io.strimzi.crdgenerator.annotations; +import io.fabric8.kubernetes.api.model.extensions.Scale; + import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @@ -108,7 +110,10 @@ * @return The subresources of a custom resources that this is the definition for. * @see Kubernetes 1.11 API documtation */ - Subresources subresources() default @Subresources(status = {}); + Subresources subresources() default @Subresources( + status = {}, + scale = {} + ); /** * The subresources of a custom resources that this is the definition for. @@ -116,9 +121,20 @@ */ @interface Subresources { Status[] status(); + Scale[] scale() default {}; @interface Status { } + + /** + * The scale subresource of a custom resources that this is the definition for. + * @see Kubernetes 1.18 API documtation + */ + @interface Scale { + String specReplicasPath(); + String statusReplicasPath(); + String labelSelectorPath() default ""; + } } /** @@ -165,4 +181,4 @@ String type(); } } -} \ No newline at end of file +} diff --git a/crd-generator/src/test/java/io/strimzi/crdgenerator/CrdGeneratorTest.java b/crd-generator/src/test/java/io/strimzi/crdgenerator/CrdGeneratorTest.java index 9329c7c52b4..5a5e378399c 100644 --- a/crd-generator/src/test/java/io/strimzi/crdgenerator/CrdGeneratorTest.java +++ b/crd-generator/src/test/java/io/strimzi/crdgenerator/CrdGeneratorTest.java @@ -26,6 +26,15 @@ public void simpleTest() throws IOException, URISyntaxException { assertEquals(CrdTestUtils.readResource("simpleTest.yaml"), s); } + @Test + public void simpleTestWithSubresources() throws IOException, URISyntaxException { + CrdGenerator crdGenerator = new CrdGenerator(new YAMLMapper().configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)); + StringWriter w = new StringWriter(); + crdGenerator.generate(ExampleWithSubresourcesCrd.class, w); + String s = w.toString(); + assertEquals(CrdTestUtils.readResource("simpleTestWithSubresources.yaml"), s); + } + @Test public void generateHelmMetadataLabels() throws IOException { Map labels = new LinkedHashMap<>(); diff --git a/crd-generator/src/test/java/io/strimzi/crdgenerator/ExampleWithSubresourcesCrd.java b/crd-generator/src/test/java/io/strimzi/crdgenerator/ExampleWithSubresourcesCrd.java new file mode 100644 index 00000000000..43533ca2e89 --- /dev/null +++ b/crd-generator/src/test/java/io/strimzi/crdgenerator/ExampleWithSubresourcesCrd.java @@ -0,0 +1,51 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.crdgenerator; + +import io.fabric8.kubernetes.client.CustomResource; +import io.strimzi.crdgenerator.annotations.Crd; + +@Crd( + apiVersion = "apiextensions.k8s.io/v1beta1", + spec = @Crd.Spec( + group = "crdgenerator.strimzi.io", + names = @Crd.Spec.Names( + kind = "ExampleWithSubresources", + plural = "exampleswithsubresources", + categories = {"strimzi"}), + scope = "Namespaced", + version = "v1alpha1", + versions = { + @Crd.Spec.Version(name = "v1alpha1", served = true, storage = true), + @Crd.Spec.Version(name = "v1beta1", served = true, storage = false) + }, + subresources = @Crd.Spec.Subresources( + status = @Crd.Spec.Subresources.Status(), + scale = @Crd.Spec.Subresources.Scale( + specReplicasPath = ".spec.replicas", + statusReplicasPath = ".status.replicas", + labelSelectorPath = ".status.selector" + ) + ), + additionalPrinterColumns = { + @Crd.Spec.AdditionalPrinterColumn( + name = "Foo", + description = "The foo", + jsonPath = "...", + type = "integer" + ) + } + )) +public class ExampleWithSubresourcesCrd extends CustomResource { + private String replicas; + + public String getReplicas() { + return replicas; + } + + public void setReplicas(String replicas) { + this.replicas = replicas; + } +} diff --git a/crd-generator/src/test/resources/io/strimzi/crdgenerator/simpleTestWithSubresources.yaml b/crd-generator/src/test/resources/io/strimzi/crdgenerator/simpleTestWithSubresources.yaml new file mode 100644 index 00000000000..db9acd49ad7 --- /dev/null +++ b/crd-generator/src/test/resources/io/strimzi/crdgenerator/simpleTestWithSubresources.yaml @@ -0,0 +1,38 @@ +apiVersion: "apiextensions.k8s.io/v1beta1" +kind: "CustomResourceDefinition" +metadata: + name: "exampleswithsubresources.crdgenerator.strimzi.io" +spec: + group: "crdgenerator.strimzi.io" + versions: + - name: "v1alpha1" + served: true + storage: true + - name: "v1beta1" + served: true + storage: false + version: "v1alpha1" + scope: "Namespaced" + names: + kind: "ExampleWithSubresources" + listKind: "ExampleWithSubresourcesList" + singular: "examplewithsubresources" + plural: "exampleswithsubresources" + categories: + - "strimzi" + additionalPrinterColumns: + - name: "Foo" + description: "The foo" + JSONPath: "..." + type: "integer" + subresources: + status: {} + scale: + specReplicasPath: ".spec.replicas" + statusReplicasPath: ".status.replicas" + labelSelectorPath: ".status.selector" + validation: + openAPIV3Schema: + properties: + replicas: + type: "string" diff --git a/documentation/modules/appendix_crds.adoc b/documentation/modules/appendix_crds.adoc index 98d7a3bb410..255777c12d6 100644 --- a/documentation/modules/appendix_crds.adoc +++ b/documentation/modules/appendix_crds.adoc @@ -1938,6 +1938,12 @@ Used in: xref:type-KafkaConnect-{context}[`KafkaConnect`] |string |connectorPlugins 1.2+<.<|The list of connector plugins available in this Kafka Connect deployment. |xref:type-ConnectorPlugin-{context}[`ConnectorPlugin`] array +|podSelector 1.2+<.<|Label selector for pods providing this resource. See external documentation of https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[meta/v1 labelselector]. + + +|https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[LabelSelector] +|replicas 1.2+<.<|The current number of pods being used to provide this resource. +|integer |==== [id='type-ConnectorPlugin-{context}'] @@ -2048,6 +2054,12 @@ Used in: xref:type-KafkaConnectS2I-{context}[`KafkaConnectS2I`] |xref:type-ConnectorPlugin-{context}[`ConnectorPlugin`] array |buildConfigName 1.2+<.<|The name of the build configuration. |string +|podSelector 1.2+<.<|Label selector for pods providing this resource. See external documentation of https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[meta/v1 labelselector]. + + +|https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[LabelSelector] +|replicas 1.2+<.<|The current number of pods being used to provide this resource. +|integer |==== [id='type-KafkaTopic-{context}'] @@ -2458,6 +2470,12 @@ Used in: xref:type-KafkaMirrorMaker-{context}[`KafkaMirrorMaker`] |xref:type-Condition-{context}[`Condition`] array |observedGeneration 1.2+<.<|The generation of the CRD that was last reconciled by the operator. |integer +|podSelector 1.2+<.<|Label selector for pods providing this resource. See external documentation of https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[meta/v1 labelselector]. + + +|https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[LabelSelector] +|replicas 1.2+<.<|The current number of pods being used to provide this resource. +|integer |==== [id='type-KafkaBridge-{context}'] @@ -2623,6 +2641,12 @@ Used in: xref:type-KafkaBridge-{context}[`KafkaBridge`] |integer |url 1.2+<.<|The URL at which external client applications can access the Kafka Bridge. |string +|podSelector 1.2+<.<|Label selector for pods providing this resource. See external documentation of https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[meta/v1 labelselector]. + + +|https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[LabelSelector] +|replicas 1.2+<.<|The current number of pods being used to provide this resource. +|integer |==== [id='type-KafkaConnector-{context}'] @@ -2672,6 +2696,8 @@ Used in: xref:type-KafkaConnector-{context}[`KafkaConnector`] |integer |connectorStatus 1.2+<.<|The connector status, as reported by the Kafka Connect REST API. |map +|tasksMax 1.2+<.<|The maximum number of tasks for the Kafka Connector. +|integer |==== [id='type-KafkaMirrorMaker2-{context}'] @@ -2838,6 +2864,12 @@ Used in: xref:type-KafkaMirrorMaker2-{context}[`KafkaMirrorMaker2`] |xref:type-ConnectorPlugin-{context}[`ConnectorPlugin`] array |connectors 1.2+<.<|List of MirrorMaker 2.0 connector statuses, as reported by the Kafka Connect REST API. |map array +|podSelector 1.2+<.<|Label selector for pods providing this resource. See external documentation of https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[meta/v1 labelselector]. + + +|https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#labelselector-v1-meta[LabelSelector] +|replicas 1.2+<.<|The current number of pods being used to provide this resource. +|integer |==== [id='type-KafkaRebalance-{context}'] diff --git a/helm-charts/strimzi-kafka-operator/templates/041-Crd-kafkaconnect.yaml b/helm-charts/strimzi-kafka-operator/templates/041-Crd-kafkaconnect.yaml index f3978637b87..b90166f43c2 100644 --- a/helm-charts/strimzi-kafka-operator/templates/041-Crd-kafkaconnect.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/041-Crd-kafkaconnect.yaml @@ -37,6 +37,10 @@ spec: type: integer subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -1196,5 +1200,27 @@ spec: description: The class of the connector plugin. description: The list of connector plugins available in this Kafka Connect deployment. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of the Kafka Connect cluster. {{- end -}} diff --git a/helm-charts/strimzi-kafka-operator/templates/042-Crd-kafkaconnects2i.yaml b/helm-charts/strimzi-kafka-operator/templates/042-Crd-kafkaconnects2i.yaml index 92c8b798b9e..05b40850541 100644 --- a/helm-charts/strimzi-kafka-operator/templates/042-Crd-kafkaconnects2i.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/042-Crd-kafkaconnects2i.yaml @@ -37,6 +37,10 @@ spec: type: integer subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -1213,5 +1217,27 @@ spec: buildConfigName: type: string description: The name of the build configuration. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of the Kafka Connect Source-to-Image (S2I) cluster. {{- end -}} diff --git a/helm-charts/strimzi-kafka-operator/templates/045-Crd-kafkamirrormaker.yaml b/helm-charts/strimzi-kafka-operator/templates/045-Crd-kafkamirrormaker.yaml index 2d34c114281..406cf219f7f 100644 --- a/helm-charts/strimzi-kafka-operator/templates/045-Crd-kafkamirrormaker.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/045-Crd-kafkamirrormaker.yaml @@ -47,6 +47,10 @@ spec: priority: 1 subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -1286,5 +1290,27 @@ spec: type: integer description: The generation of the CRD that was last reconciled by the operator. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of Kafka MirrorMaker. {{- end -}} diff --git a/helm-charts/strimzi-kafka-operator/templates/046-Crd-kafkabridge.yaml b/helm-charts/strimzi-kafka-operator/templates/046-Crd-kafkabridge.yaml index a0af114eec1..1c581ec65a1 100644 --- a/helm-charts/strimzi-kafka-operator/templates/046-Crd-kafkabridge.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/046-Crd-kafkabridge.yaml @@ -39,6 +39,10 @@ spec: priority: 1 subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -888,5 +892,27 @@ spec: type: string description: The URL at which external client applications can access the Kafka Bridge. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of the Kafka Bridge. {{- end -}} diff --git a/helm-charts/strimzi-kafka-operator/templates/047-Crd-kafkaconnector.yaml b/helm-charts/strimzi-kafka-operator/templates/047-Crd-kafkaconnector.yaml index 441360b9771..83d101093a8 100644 --- a/helm-charts/strimzi-kafka-operator/templates/047-Crd-kafkaconnector.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/047-Crd-kafkaconnector.yaml @@ -29,6 +29,9 @@ spec: - strimzi subresources: status: {} + scale: + specReplicasPath: .spec.tasksMax + statusReplicasPath: .status.tasksMax validation: openAPIV3Schema: properties: @@ -88,5 +91,8 @@ spec: type: object description: The connector status, as reported by the Kafka Connect REST API. + tasksMax: + type: integer + description: The maximum number of tasks for the Kafka Connector. description: The status of the Kafka Connector. {{- end -}} diff --git a/helm-charts/strimzi-kafka-operator/templates/048-Crd-kafkamirrormaker2.yaml b/helm-charts/strimzi-kafka-operator/templates/048-Crd-kafkamirrormaker2.yaml index 16fd84cd5f5..3e564491d54 100644 --- a/helm-charts/strimzi-kafka-operator/templates/048-Crd-kafkamirrormaker2.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/048-Crd-kafkamirrormaker2.yaml @@ -34,6 +34,10 @@ spec: type: integer subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -1308,5 +1312,27 @@ spec: type: object description: List of MirrorMaker 2.0 connector statuses, as reported by the Kafka Connect REST API. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of the Kafka MirrorMaker 2.0 cluster. {{- end -}} diff --git a/install/cluster-operator/041-Crd-kafkaconnect.yaml b/install/cluster-operator/041-Crd-kafkaconnect.yaml index 9e4381abe26..89987d278bb 100644 --- a/install/cluster-operator/041-Crd-kafkaconnect.yaml +++ b/install/cluster-operator/041-Crd-kafkaconnect.yaml @@ -32,6 +32,10 @@ spec: type: integer subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -1191,4 +1195,26 @@ spec: description: The class of the connector plugin. description: The list of connector plugins available in this Kafka Connect deployment. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of the Kafka Connect cluster. diff --git a/install/cluster-operator/042-Crd-kafkaconnects2i.yaml b/install/cluster-operator/042-Crd-kafkaconnects2i.yaml index 2d7ec624026..7b00704dedf 100644 --- a/install/cluster-operator/042-Crd-kafkaconnects2i.yaml +++ b/install/cluster-operator/042-Crd-kafkaconnects2i.yaml @@ -32,6 +32,10 @@ spec: type: integer subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -1208,4 +1212,26 @@ spec: buildConfigName: type: string description: The name of the build configuration. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of the Kafka Connect Source-to-Image (S2I) cluster. diff --git a/install/cluster-operator/045-Crd-kafkamirrormaker.yaml b/install/cluster-operator/045-Crd-kafkamirrormaker.yaml index ad36e4cef71..211a59d1a58 100644 --- a/install/cluster-operator/045-Crd-kafkamirrormaker.yaml +++ b/install/cluster-operator/045-Crd-kafkamirrormaker.yaml @@ -42,6 +42,10 @@ spec: priority: 1 subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -1281,4 +1285,26 @@ spec: type: integer description: The generation of the CRD that was last reconciled by the operator. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of Kafka MirrorMaker. diff --git a/install/cluster-operator/046-Crd-kafkabridge.yaml b/install/cluster-operator/046-Crd-kafkabridge.yaml index 86bcf4514af..826e9b3db19 100644 --- a/install/cluster-operator/046-Crd-kafkabridge.yaml +++ b/install/cluster-operator/046-Crd-kafkabridge.yaml @@ -34,6 +34,10 @@ spec: priority: 1 subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -883,4 +887,26 @@ spec: type: string description: The URL at which external client applications can access the Kafka Bridge. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of the Kafka Bridge. diff --git a/install/cluster-operator/047-Crd-kafkaconnector.yaml b/install/cluster-operator/047-Crd-kafkaconnector.yaml index 9c7e7ee7cf3..506dc39447d 100644 --- a/install/cluster-operator/047-Crd-kafkaconnector.yaml +++ b/install/cluster-operator/047-Crd-kafkaconnector.yaml @@ -24,6 +24,9 @@ spec: - strimzi subresources: status: {} + scale: + specReplicasPath: .spec.tasksMax + statusReplicasPath: .status.tasksMax validation: openAPIV3Schema: properties: @@ -83,4 +86,7 @@ spec: type: object description: The connector status, as reported by the Kafka Connect REST API. + tasksMax: + type: integer + description: The maximum number of tasks for the Kafka Connector. description: The status of the Kafka Connector. diff --git a/install/cluster-operator/048-Crd-kafkamirrormaker2.yaml b/install/cluster-operator/048-Crd-kafkamirrormaker2.yaml index a254c585fe4..a4c42ca1649 100644 --- a/install/cluster-operator/048-Crd-kafkamirrormaker2.yaml +++ b/install/cluster-operator/048-Crd-kafkamirrormaker2.yaml @@ -29,6 +29,10 @@ spec: type: integer subresources: status: {} + scale: + specReplicasPath: .spec.replicas + statusReplicasPath: .status.replicas + labelSelectorPath: .status.selector validation: openAPIV3Schema: properties: @@ -1303,4 +1307,26 @@ spec: type: object description: List of MirrorMaker 2.0 connector statuses, as reported by the Kafka Connect REST API. + podSelector: + type: object + properties: + matchExpressions: + type: array + items: + type: object + properties: + key: + type: string + operator: + type: string + values: + type: array + items: + type: string + matchLabels: + type: object + description: Label selector for pods providing this resource. + replicas: + type: integer + description: The current number of pods being used to provide this resource. description: The status of the Kafka MirrorMaker 2.0 cluster. diff --git a/install/strimzi-admin/010-ClusterRole-strimzi-admin.yaml b/install/strimzi-admin/010-ClusterRole-strimzi-admin.yaml index 24c3662232a..1d40fddb9bb 100644 --- a/install/strimzi-admin/010-ClusterRole-strimzi-admin.yaml +++ b/install/strimzi-admin/010-ClusterRole-strimzi-admin.yaml @@ -13,13 +13,19 @@ rules: resources: - kafkas - kafkaconnects + - kafkaconnects/scale - kafkaconnects2is + - kafkaconnects2is/scale - kafkamirrormakers + - kafkamirrormakers/scale - kafkausers - kafkatopics - kafkabridges + - kafkabridges/scale - kafkaconnectors + - kafkaconnectors/scale - kafkamirrormaker2s + - kafkamirrormaker2s/scale - kafkarebalances verbs: - get diff --git a/test/src/main/java/io/strimzi/test/k8s/cmdClient/BaseCmdKubeClient.java b/test/src/main/java/io/strimzi/test/k8s/cmdClient/BaseCmdKubeClient.java index 5b21ff9161f..144ae027e81 100644 --- a/test/src/main/java/io/strimzi/test/k8s/cmdClient/BaseCmdKubeClient.java +++ b/test/src/main/java/io/strimzi/test/k8s/cmdClient/BaseCmdKubeClient.java @@ -213,6 +213,15 @@ public K deleteNamespace(String name) { return (K) this; } + @Override + @SuppressWarnings("unchecked") + public K scaleByName(String kind, String name, int replicas) { + try (Context context = defaultContext()) { + Exec.exec(null, namespacedCommand("scale", kind, name, "--replicas", Integer.toString(replicas))); + return (K) this; + } + } + @Override public ExecResult execInPod(String pod, String... command) { List cmd = namespacedCommand("exec", pod, "--"); @@ -265,7 +274,7 @@ enum ExType { } @SuppressWarnings("unchecked") - private K waitFor(String resource, String name, Predicate ready) { + public K waitFor(String resource, String name, Predicate condition) { long timeoutMs = 570_000L; long pollMs = 1_000L; ObjectMapper mapper = new ObjectMapper(); @@ -274,7 +283,7 @@ private K waitFor(String resource, String name, Predicate ready) { String jsonString = Exec.exec(namespacedCommand("get", resource, name, "-o", "json")).out(); LOGGER.trace("{}", jsonString); JsonNode actualObj = mapper.readTree(jsonString); - return ready.test(actualObj); + return condition.test(actualObj); } catch (KubeClusterException.NotFound e) { return false; } catch (IOException e) { diff --git a/test/src/main/java/io/strimzi/test/k8s/cmdClient/KubeCmdClient.java b/test/src/main/java/io/strimzi/test/k8s/cmdClient/KubeCmdClient.java index 0277b5966fa..8cc291f7ec8 100644 --- a/test/src/main/java/io/strimzi/test/k8s/cmdClient/KubeCmdClient.java +++ b/test/src/main/java/io/strimzi/test/k8s/cmdClient/KubeCmdClient.java @@ -4,12 +4,14 @@ */ package io.strimzi.test.k8s.cmdClient; +import com.fasterxml.jackson.databind.JsonNode; import io.strimzi.test.executor.ExecResult; import java.io.File; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; @@ -66,6 +68,16 @@ default K delete(String... files) { K deleteNamespace(String name); + /** + * Scale resource using the scale subresource + * + * @param kind Kind of the resource which should be scaled + * @param name Name of the resource which should be scaled + * @param replicas Number of replicas to which the resource should be scaled + * @return This kube client + */ + K scaleByName(String kind, String name, int replicas); + /** * Execute the given {@code command} in the given {@code pod}. * @param pod The pod @@ -111,6 +123,15 @@ default K delete(String... files) { */ ExecResult exec(boolean throwError, boolean logToOutput, String... command); + /** + * Wait for the resource with the given {@code name} to be reach the state defined by the predicate. + * @param resource The resource type. + * @param name The resource name. + * @param condition Predicate to test if the desired state was achieved + * @return This kube client. + */ + K waitFor(String resource, String name, Predicate condition); + /** * Wait for the resource with the given {@code name} to be created. * @param resourceType The resource type.