From 3686fd112e1363b1d1160664d92c2a54b368906f Mon Sep 17 00:00:00 2001 From: respringer Date: Mon, 5 Oct 2020 13:33:08 -0500 Subject: [PATCH] Termination grace time and prestop drain (#269) * default DefaultTerminationGracePeriodSeconds to 120 * adding test for prestop * upgraded the sleep test * comment fixes * Extracted function BuildMgmtApiWgetAction * Set prestop hook * WIP - introduced secure and insecure methods for making wget mgmt api actions * WIP - default to insecure mgmt api if nothing is specified * WIP - insecure node drain in prestop works * WIP - insecure node drain in prestop works * Test now looks for successful drain * WIP - working on secure node drain call * WIP - updating test * rename * WIP - fixing tests * disabled drain check for now * introduced new oss test file * WIP - moving to oss * removed test-jks * always remove test-jks --- mage/kubectl/lib.go | 4 + operator/pkg/httphelper/security.go | 115 ++++++++++++----- .../construct_podtemplatespec.go | 30 ++++- operator/pkg/utils/crypto_test.go | 17 ++- tests/terminate/terminate_suite_test.go | 9 +- .../test_mtls_mgmt_api_suite_test.go | 12 +- ...lt-single-rack-single-node-prestop-dc.yaml | 36 ++++++ ...ls.yaml => oss-one-node-dc-with-mtls.yaml} | 6 +- .../oss-one-node-dc-without-mtls.yaml | 25 ++++ .../timeout_prestop_termination_suite_test.go | 122 ++++++++++++++++++ 10 files changed, 330 insertions(+), 46 deletions(-) create mode 100644 tests/testdata/default-single-rack-single-node-prestop-dc.yaml rename tests/testdata/{dse-one-node-dc-with-mtls.yaml => oss-one-node-dc-with-mtls.yaml} (89%) create mode 100644 tests/testdata/oss-one-node-dc-without-mtls.yaml create mode 100644 tests/timeout_prestop_termination/timeout_prestop_termination_suite_test.go diff --git a/mage/kubectl/lib.go b/mage/kubectl/lib.go index 93c2b3525..2a6a54909 100644 --- a/mage/kubectl/lib.go +++ b/mage/kubectl/lib.go @@ -163,6 +163,10 @@ func CreateFromFiles(paths ...string) KCmd { return KCmd{Command: "create", Args: args} } +func Logs(args ...string) KCmd { + return KCmd{Command: "logs", Args: args} +} + func Get(args ...string) KCmd { return KCmd{Command: "get", Args: args} } diff --git a/operator/pkg/httphelper/security.go b/operator/pkg/httphelper/security.go index c86e3c907..0122ab81d 100644 --- a/operator/pkg/httphelper/security.go +++ b/operator/pkg/httphelper/security.go @@ -19,6 +19,18 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +const ( + WgetNodeDrainEndpoint = "localhost:8080/api/v0/ops/node/drain" + // TODO: Get endpoint from configured HTTPGet probe + livenessEndpoint = "localhost:8080/api/v0/probes/liveness" + // TODO: Get endpoint from configured HTTPGet probe + readinessEndpoint = "localhost:8080/api/v0/probes/readiness" + + caCertPath = "/management-api-certs/ca.crt" + tlsCrt = "/management-api-certs/tls.crt" + tlsKey = "/management-api-certs/tls.key" +) + // API for Node Management mAuth Config func GetManagementApiProtocol(dc *api.CassandraDatacenter) (string, error) { provider, err := BuildManagmenetApiSecurityProvider(dc) @@ -84,6 +96,8 @@ func ValidateManagementApiConfig(dc *api.CassandraDatacenter, client client.Clie // SPI for adding new mechanisms for securing the management API type ManagementApiSecurityProvider interface { BuildHttpClient(client client.Client, ctx context.Context) (HttpClient, error) + BuildMgmtApiWgetAction(endpoint string) *corev1.ExecAction + BuildMgmtApiWgetPostAction(endpoint string, postData string) *corev1.ExecAction AddServerSecurity(pod *corev1.PodTemplateSpec) error GetProtocol() string ValidateConfig(client client.Client, ctx context.Context) []error @@ -93,7 +107,8 @@ type InsecureManagementApiSecurityProvider struct { } func buildInsecureManagementApiSecurityProvider(dc *api.CassandraDatacenter) (ManagementApiSecurityProvider, error) { - if dc.Spec.ManagementApiAuth.Insecure != nil { + // If both are nil, then default to insecure + if dc.Spec.ManagementApiAuth.Insecure != nil || (dc.Spec.ManagementApiAuth.Manual == nil && dc.Spec.ManagementApiAuth.Insecure == nil) { return &InsecureManagementApiSecurityProvider{}, nil } return nil, nil @@ -134,10 +149,75 @@ func (provider *ManualManagementApiSecurityProvider) GetProtocol() string { return "https" } +func GetMgmtApiWgetAction(dc *api.CassandraDatacenter, endpoint string) (*corev1.ExecAction, error) { + provider, err := BuildManagmenetApiSecurityProvider(dc) + if err != nil { + return nil, err + } + return provider.BuildMgmtApiWgetAction(endpoint), nil +} + +func GetMgmtApiWgetPostAction(dc *api.CassandraDatacenter, endpoint string, postData string) (*corev1.ExecAction, error) { + provider, err := BuildManagmenetApiSecurityProvider(dc) + if err != nil { + return nil, err + } + return provider.BuildMgmtApiWgetPostAction(endpoint, postData), nil +} + +func (provider *InsecureManagementApiSecurityProvider) BuildMgmtApiWgetAction(endpoint string) *corev1.ExecAction { + return &corev1.ExecAction{ + Command: []string{ + "wget", + "--output-document", "/dev/null", + "--no-check-certificate", + fmt.Sprintf("http://%s", endpoint), + }, + } +} + +func (provider *ManualManagementApiSecurityProvider) BuildMgmtApiWgetAction(endpoint string) *corev1.ExecAction { + return &corev1.ExecAction{ + Command: []string{ + "wget", + "--output-document", "/dev/null", + "--no-check-certificate", + "--certificate", tlsCrt, + "--private-key", tlsKey, + "--ca-certificate", caCertPath, + fmt.Sprintf("https://%s", endpoint), + }, + } +} + +func (provider *InsecureManagementApiSecurityProvider) BuildMgmtApiWgetPostAction(endpoint string, postData string) *corev1.ExecAction { + return &corev1.ExecAction{ + Command: []string{ + "wget", + "--output-document", "/dev/null", + "--no-check-certificate", + fmt.Sprintf("--post-data='%s'", postData), + fmt.Sprintf("http://%s", endpoint), + }, + } +} + +func (provider *ManualManagementApiSecurityProvider) BuildMgmtApiWgetPostAction(endpoint string, postData string) *corev1.ExecAction { + return &corev1.ExecAction{ + Command: []string{ + "wget", + "--output-document", "/dev/null", + "--no-check-certificate", + "--certificate", tlsCrt, + "--private-key", tlsKey, + "--ca-certificate", caCertPath, + fmt.Sprintf("--post-data='%s'", postData), + fmt.Sprintf("https://%s", endpoint), + }, + } +} + func (provider *ManualManagementApiSecurityProvider) AddServerSecurity(pod *corev1.PodTemplateSpec) error { - caCertPath := "/management-api-certs/ca.crt" - tlsCrt := "/management-api-certs/tls.crt" - tlsKey := "/management-api-certs/tls.key" // find the container var container *corev1.Container = nil @@ -217,8 +297,6 @@ func (provider *ManualManagementApiSecurityProvider) AddServerSecurity(pod *core container.Env = append(envVars, container.Env...) // Update Liveness probe to account for mutual auth (can't just use HTTP probe now) - // TODO: Get endpoint from configured HTTPGet probe - livenessEndpoint := "https://localhost:8080/api/v0/probes/liveness" if container.LivenessProbe == nil { container.LivenessProbe = &corev1.Probe{ Handler: corev1.Handler{}, @@ -226,21 +304,10 @@ func (provider *ManualManagementApiSecurityProvider) AddServerSecurity(pod *core } container.LivenessProbe.Handler.HTTPGet = nil container.LivenessProbe.Handler.TCPSocket = nil - container.LivenessProbe.Handler.Exec = &corev1.ExecAction{ - Command: []string{ - "wget", - "--output-document", "/dev/null", - "--no-check-certificate", - "--certificate", tlsCrt, - "--private-key", tlsKey, - "--ca-certificate", caCertPath, - livenessEndpoint, - }, - } + container.LivenessProbe.Handler.Exec = provider.BuildMgmtApiWgetAction(livenessEndpoint) // Update Readiness probe to account for mutual auth (can't just use HTTP probe now) // TODO: Get endpoint from configured HTTPGet probe - readinessEndpoint := "https://localhost:8080/api/v0/probes/readiness" if container.ReadinessProbe == nil { container.ReadinessProbe = &corev1.Probe{ Handler: corev1.Handler{}, @@ -248,17 +315,7 @@ func (provider *ManualManagementApiSecurityProvider) AddServerSecurity(pod *core } container.ReadinessProbe.Handler.HTTPGet = nil container.ReadinessProbe.Handler.TCPSocket = nil - container.ReadinessProbe.Handler.Exec = &corev1.ExecAction{ - Command: []string{ - "wget", - "--output-document", "/dev/null", - "--no-check-certificate", - "--certificate", tlsCrt, - "--private-key", tlsKey, - "--ca-certificate", caCertPath, - readinessEndpoint, - }, - } + container.ReadinessProbe.Handler.Exec = provider.BuildMgmtApiWgetAction(readinessEndpoint) return nil } diff --git a/operator/pkg/reconciliation/construct_podtemplatespec.go b/operator/pkg/reconciliation/construct_podtemplatespec.go index e1f9b1054..f6357316a 100644 --- a/operator/pkg/reconciliation/construct_podtemplatespec.go +++ b/operator/pkg/reconciliation/construct_podtemplatespec.go @@ -10,6 +10,7 @@ import ( "reflect" api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1" + "github.com/datastax/cass-operator/operator/pkg/httphelper" "github.com/datastax/cass-operator/operator/pkg/images" "github.com/datastax/cass-operator/operator/pkg/oplabels" "github.com/datastax/cass-operator/operator/pkg/utils" @@ -20,10 +21,11 @@ import ( ) const ( - ServerConfigContainerName = "server-config-init" - CassandraContainerName = "cassandra" - PvcName = "server-data" - SystemLoggerContainerName = "server-system-logger" + DefaultTerminationGracePeriodSeconds = 120 + ServerConfigContainerName = "server-config-init" + CassandraContainerName = "cassandra" + PvcName = "server-data" + SystemLoggerContainerName = "server-system-logger" ) // calculateNodeAffinity provides a way to pin all pods within a statefulset to the same zone @@ -316,6 +318,20 @@ func buildContainers(dc *api.CassandraDatacenter, baseTemplate *corev1.PodTempla cassContainer.ReadinessProbe = probe(8080, "/api/v0/probes/readiness", 20, 10) } + if cassContainer.Lifecycle == nil { + cassContainer.Lifecycle = &corev1.Lifecycle{} + } + + if cassContainer.Lifecycle.PreStop == nil { + action, err := httphelper.GetMgmtApiWgetPostAction(dc, httphelper.WgetNodeDrainEndpoint, "") + if err != nil { + return err + } + cassContainer.Lifecycle.PreStop = &corev1.Handler{ + Exec: action, + } + } + // Combine env vars envDefaults := []corev1.EnvVar{ @@ -437,6 +453,12 @@ func buildPodTemplateSpec(dc *api.CassandraDatacenter, zone string, rackName str baseTemplate.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet } + if baseTemplate.Spec.TerminationGracePeriodSeconds == nil { + // Note: we cannot take the address of a constant + gracePeriodSeconds := int64(DefaultTerminationGracePeriodSeconds) + baseTemplate.Spec.TerminationGracePeriodSeconds = &gracePeriodSeconds + } + // Adds custom registry pull secret if needed _ = images.AddDefaultRegistryImagePullSecrets(&baseTemplate.Spec) diff --git a/operator/pkg/utils/crypto_test.go b/operator/pkg/utils/crypto_test.go index a33a29382..18bb745f6 100644 --- a/operator/pkg/utils/crypto_test.go +++ b/operator/pkg/utils/crypto_test.go @@ -2,18 +2,20 @@ package utils import ( "crypto/rsa" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "crypto/x509" "encoding/pem" "fmt" - "testing" "io/ioutil" + "os" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func Test_newCA(t *testing.T) { var verify_key *rsa.PrivateKey - pem_key, cert, err := GetNewCAandKey("cassandradatacenter-webhook-service","somenamespace") + pem_key, cert, err := GetNewCAandKey("cassandradatacenter-webhook-service", "somenamespace") certpool := x509.NewCertPool() key, _ := pem.Decode([]byte(pem_key)) if block, _ := pem.Decode([]byte(cert)); block != nil { @@ -42,7 +44,6 @@ func Test_newCA(t *testing.T) { if verify_cert_key, ok := cert.PublicKey.(*rsa.PublicKey); !ok { t.Errorf("Error: couldn't typecast cert key") - } else { verify_key_public, _ := verify_key.Public().(*rsa.PublicKey) if fmt.Sprintf("%+v", verify_key_public) != fmt.Sprintf("%+v", verify_cert_key) { @@ -79,6 +80,10 @@ func Test_GetJKS(t *testing.T) { if len(jks) == 0 { t.Errorf("JKS blob too small") } - ioutil.WriteFile("test-jks", jks, 0644) + defer func() { + _ = os.Remove("test-jks") + }() + + ioutil.WriteFile("test-jks", jks, 0644) } diff --git a/tests/terminate/terminate_suite_test.go b/tests/terminate/terminate_suite_test.go index ae93f952e..a4e7f7142 100644 --- a/tests/terminate/terminate_suite_test.go +++ b/tests/terminate/terminate_suite_test.go @@ -17,8 +17,8 @@ import ( var ( testName = "Cluster resource cleanup after termination" namespace = "test-terminate-cleanup" - dcName = "dc2" - dcYaml = "../testdata/default-single-rack-single-node-dc.yaml" + dcName = "dc1" + dcYaml = "../testdata/oss-one-node-dc-without-mtls.yaml" operatorYaml = "../testdata/operator.yaml" dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName) dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName) @@ -77,6 +77,11 @@ var _ = Describe(testName, func() { k = kubectl.DeleteFromFiles(dcYaml) ns.ExecAndLog(step, k) + k = kubectl.Logs(). + WithLabel("statefulset.kubernetes.io/pod-name=cluster1-dc1-r1-sts-0"). + WithFlag("container", "cassandra") + ns.WaitForOutputContainsAndLog(step, k, "node/drain status=200 OK", 30) + step = "checking that the dc no longer exists" json = "jsonpath={.items}" k = kubectl.Get("CassandraDatacenter"). diff --git a/tests/test_mtls_mgmt_api/test_mtls_mgmt_api_suite_test.go b/tests/test_mtls_mgmt_api/test_mtls_mgmt_api_suite_test.go index 252f67666..c930fab6e 100644 --- a/tests/test_mtls_mgmt_api/test_mtls_mgmt_api_suite_test.go +++ b/tests/test_mtls_mgmt_api/test_mtls_mgmt_api_suite_test.go @@ -18,7 +18,7 @@ var ( testName = "test mtls protecting mgmt api" namespace = "test-mtls-for-mgmt-api" dcName = "dc1" - dcYaml = "../testdata/dse-one-node-dc-with-mtls.yaml" + dcYaml = "../testdata/oss-one-node-dc-with-mtls.yaml" operatorYaml = "../testdata/operator.yaml" dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName) dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName) @@ -61,7 +61,8 @@ var _ = Describe(testName, func() { k = kubectl.ApplyFiles(dcYaml) ns.ExecAndLog(step, k) - ns.WaitForDatacenterReady(dcName) + // This takes a while sometimes in my dev environment + ns.WaitForDatacenterReadyWithTimeouts(dcName, 600, 120) step = "scale up to 2 nodes" json := "{\"spec\": {\"size\": 2}}" @@ -75,6 +76,13 @@ var _ = Describe(testName, func() { k = kubectl.DeleteFromFiles(dcYaml) ns.ExecAndLog(step, k) + // TODO FIXME: re-enable this when the following issue is fixed: + // https://github.com/datastax/management-api-for-apache-cassandra/issues/42 + // k = kubectl.Logs(). + // WithLabel("statefulset.kubernetes.io/pod-name=cluster1-dc1-r1-sts-0"). + // WithFlag("container", "cassandra") + // ns.WaitForOutputContainsAndLog(step, k, "node/drain status=200 OK", 30) + step = "checking that the dc no longer exists" json = "jsonpath={.items}" k = kubectl.Get("CassandraDatacenter"). diff --git a/tests/testdata/default-single-rack-single-node-prestop-dc.yaml b/tests/testdata/default-single-rack-single-node-prestop-dc.yaml new file mode 100644 index 000000000..3c349a508 --- /dev/null +++ b/tests/testdata/default-single-rack-single-node-prestop-dc.yaml @@ -0,0 +1,36 @@ +apiVersion: cassandra.datastax.com/v1beta1 +kind: CassandraDatacenter +metadata: + name: dc2 +spec: + clusterName: cluster2 + serverType: dse + serverVersion: "6.8.4" + managementApiAuth: + insecure: {} + size: 1 + podTemplateSpec: + spec: + containers: + - name: "cassandra" + lifecycle: + preStop: + exec: + command: ["/bin/sleep", "6000s"] + storageConfig: + cassandraDataVolumeClaimSpec: + storageClassName: server-storage + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + racks: + - name: r1 + config: + jvm-server-options: + initial_heap_size: "800m" + max_heap_size: "800m" + cassandra-yaml: + file_cache_size_in_mb: 100 + memtable_space_in_mb: 100 diff --git a/tests/testdata/dse-one-node-dc-with-mtls.yaml b/tests/testdata/oss-one-node-dc-with-mtls.yaml similarity index 89% rename from tests/testdata/dse-one-node-dc-with-mtls.yaml rename to tests/testdata/oss-one-node-dc-with-mtls.yaml index 23ac0448d..a25e825c4 100644 --- a/tests/testdata/dse-one-node-dc-with-mtls.yaml +++ b/tests/testdata/oss-one-node-dc-with-mtls.yaml @@ -4,8 +4,8 @@ metadata: name: dc1 spec: clusterName: cluster1 - serverType: dse - serverVersion: "6.8.4" + serverType: cassandra + serverVersion: "3.11.7" managementApiAuth: manual: clientSecretName: mgmt-api-client-credentials @@ -22,6 +22,6 @@ spec: racks: - name: r1 config: - jvm-server-options: + jvm-options: initial_heap_size: "800m" max_heap_size: "800m" diff --git a/tests/testdata/oss-one-node-dc-without-mtls.yaml b/tests/testdata/oss-one-node-dc-without-mtls.yaml new file mode 100644 index 000000000..ea11f17f6 --- /dev/null +++ b/tests/testdata/oss-one-node-dc-without-mtls.yaml @@ -0,0 +1,25 @@ +apiVersion: cassandra.datastax.com/v1beta1 +kind: CassandraDatacenter +metadata: + name: dc1 +spec: + clusterName: cluster1 + serverType: cassandra + serverVersion: "3.11.7" + managementApiAuth: + insecure: {} + size: 1 + storageConfig: + cassandraDataVolumeClaimSpec: + storageClassName: server-storage + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + racks: + - name: r1 + config: + jvm-options: + initial_heap_size: "800m" + max_heap_size: "800m" diff --git a/tests/timeout_prestop_termination/timeout_prestop_termination_suite_test.go b/tests/timeout_prestop_termination/timeout_prestop_termination_suite_test.go new file mode 100644 index 000000000..6f707af88 --- /dev/null +++ b/tests/timeout_prestop_termination/timeout_prestop_termination_suite_test.go @@ -0,0 +1,122 @@ +// Copyright DataStax, Inc. +// Please see the included license file for details. + +// This test ensures that the timeout grace period of 120 seconds correctly fires. +// Note that it overrides the default prestop hook in order to force the termination to take too long. + +package timeout_prestop_termination + +import ( + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + ginkgo_util "github.com/datastax/cass-operator/mage/ginkgo" + "github.com/datastax/cass-operator/mage/kubectl" +) + +var ( + testName = "Test termination timeout" + namespace = "test-terminate-timeout" + dcName = "dc2" + dcYaml = "../testdata/default-single-rack-single-node-prestop-dc.yaml" + operatorYaml = "../testdata/operator.yaml" + dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName) + dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName) + ns = ginkgo_util.NewWrapper(testName, namespace) +) + +func TestLifecycle(t *testing.T) { + AfterSuite(func() { + logPath := fmt.Sprintf("%s/aftersuite", ns.LogDir) + kubectl.DumpAllLogs(logPath).ExecV() + fmt.Printf("\n\tPost-run logs dumped at: %s\n\n", logPath) + ns.Terminate() + }) + + RegisterFailHandler(Fail) + RunSpecs(t, testName) +} + +var _ = Describe(testName, func() { + Context("when in a new cluster", func() { + Specify("the operator times out cluster termination after 120 seconds", func() { + By("creating a namespace") + err := kubectl.CreateNamespace(namespace).ExecV() + Expect(err).ToNot(HaveOccurred()) + + step := "setting up cass-operator resources via helm chart" + ns.HelmInstall("../../charts/cass-operator-chart") + + step = "waiting for the operator to become ready" + json := "jsonpath={.items[0].status.containerStatuses[0].ready}" + k := kubectl.Get("pods"). + WithLabel("name=cass-operator"). + WithFlag("field-selector", "status.phase=Running"). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "true", 120) + + step = "creating a datacenter resource with 1 racks/1 nodes" + k = kubectl.ApplyFiles(dcYaml) + ns.ExecAndLog(step, k) + + step = "waiting for the node to become ready" + json = "jsonpath={.items[*].status.containerStatuses[0].ready}" + k = kubectl.Get("pods"). + WithLabel(dcLabel). + WithFlag("field-selector", "status.phase=Running"). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "true", 1200) + + step = "checking the cassandra operator progress status is set to Ready" + json = "jsonpath={.status.cassandraOperatorProgress}" + k = kubectl.Get(dcResource). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "Ready", 30) + + // The dc has a prestop hook to wait for 100 minutes, + // but the termination grace period of 120 seconds should override it + + startTime := time.Now() + + step = "deleting the dc" + k = kubectl.DeleteFromFiles(dcYaml) + ns.ExecAndLog(step, k) + + step = "checking that the dc no longer exists" + json = "jsonpath={.items}" + k = kubectl.Get("CassandraDatacenter"). + WithLabel(dcLabel). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "[]", 300) + + elapsedTime := startTime.Sub(time.Now()) + + Expect(elapsedTime.Minutes() <= 3).To(BeTrue(), "Stop should have completed in under 3 minutes") + + step = "checking that no dc pods remain" + json = "jsonpath={.items}" + k = kubectl.Get("pods"). + WithLabel(dcLabel). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "[]", 300) + + step = "checking that no dc services remain" + json = "jsonpath={.items}" + k = kubectl.Get("services"). + WithLabel(dcLabel). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "[]", 300) + + step = "checking that no dc stateful sets remain" + json = "jsonpath={.items}" + k = kubectl.Get("statefulsets"). + WithLabel(dcLabel). + FormatOutput(json) + ns.WaitForOutputAndLog(step, k, "[]", 300) + }) + }) +})