diff --git a/CHANGELOG.md b/CHANGELOG.md index a15b0df2..a79e9c84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti ## unreleased * [CHANGE] [#354](https://github.com/k8ssandra/cass-operator/issues/354) Remove oldDefunctLabel support since we recreate StS. Fix #335 created-by value to match expected value. +* [CHANGE] [#385](https://github.com/k8ssandra/cass-operator/issues/385) Deprecate CassandraDatacenter's RollingRestartRequested. Use CassandraTask instead. +* [ENHANCEMENT] [#385](https://github.com/k8ssandra/cass-operator/issues/385) Add rolling restart as a CassandraTask action. * [CHANGE] [#397](https://github.com/k8ssandra/cass-operator/issues/397) Remove direct dependency to k8s.io/kubernetes * [FEATURE] [#384](https://github.com/k8ssandra/cass-operator/issues/384) Add a new CassandraTask operation "replacenode" that removes the existing PVCs from the pod, deletes the pod and starts a replacement process. * [FEATURE] [#387](https://github.com/k8ssandra/cass-operator/issues/387) Add a new CassandraTask operation "upgradesstables" that allows to do SSTable upgrades after Cassandra version upgrade. diff --git a/apis/cassandra/v1beta1/cassandradatacenter_types.go b/apis/cassandra/v1beta1/cassandradatacenter_types.go index 8bda912a..711e8d9f 100644 --- a/apis/cassandra/v1beta1/cassandradatacenter_types.go +++ b/apis/cassandra/v1beta1/cassandradatacenter_types.go @@ -187,7 +187,7 @@ type CassandraDatacenterSpec struct { // The k8s service account to use for the server pods ServiceAccount string `json:"serviceAccount,omitempty"` - // Whether to do a rolling restart at the next opportunity. The operator will set this back + // DEPRECATED. Use CassandraTask for rolling restarts. Whether to do a rolling restart at the next opportunity. The operator will set this back // to false once the restart is in progress. RollingRestartRequested bool `json:"rollingRestartRequested,omitempty"` diff --git a/apis/control/v1alpha1/cassandratask_types.go b/apis/control/v1alpha1/cassandratask_types.go index e7665db4..cf2572b0 100644 --- a/apis/control/v1alpha1/cassandratask_types.go +++ b/apis/control/v1alpha1/cassandratask_types.go @@ -22,6 +22,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + RestartedAtAnnotation = "control.k8ssandra.io/restartedAt" +) + // CassandraTaskSpec defines the desired state of CassandraTask type CassandraTaskSpec struct { @@ -69,6 +73,12 @@ const ( CommandScrub CassandraCommand = "scrub" ) +const ( + KeyspaceArgument string = "keyspace_name" + RackArgument string = "rack" + SourceDatacenterArgument string = "source_datacenter" +) + type CassandraJob struct { Name string `json:"name"` diff --git a/config/crd/bases/cassandra.datastax.com_cassandradatacenters.yaml b/config/crd/bases/cassandra.datastax.com_cassandradatacenters.yaml index d8c9bf51..e364d48f 100644 --- a/config/crd/bases/cassandra.datastax.com_cassandradatacenters.yaml +++ b/config/crd/bases/cassandra.datastax.com_cassandradatacenters.yaml @@ -7452,9 +7452,9 @@ spec: type: object type: object rollingRestartRequested: - description: Whether to do a rolling restart at the next opportunity. - The operator will set this back to false once the restart is in - progress. + description: DEPRECATED. Use CassandraTask for rolling restarts. Whether + to do a rolling restart at the next opportunity. The operator will + set this back to false once the restart is in progress. type: boolean serverImage: description: 'Cassandra server image name. Use of ImageConfig to match diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 7621efda..6766f6e5 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -14,4 +14,4 @@ kind: Kustomization images: - name: controller newName: k8ssandra/cass-operator - newTag: latest + newTag: v1.13.0-dev.0bb3086-20220819 diff --git a/controllers/control/cassandratask_controller.go b/controllers/control/cassandratask_controller.go index 29b9a0ae..60cbf2b1 100644 --- a/controllers/control/cassandratask_controller.go +++ b/controllers/control/cassandratask_controller.go @@ -24,6 +24,7 @@ import ( "strconv" "time" + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -256,6 +257,7 @@ func (r *CassandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques var err error var failed, completed int +JobDefinition: for _, job := range cassTask.Spec.Jobs { taskConfig := &TaskConfiguration{ RestartPolicy: cassTask.Spec.RestartPolicy, @@ -273,7 +275,17 @@ func (r *CassandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques case api.CommandCleanup: cleanup(taskConfig) case api.CommandRestart: - r.restart(taskConfig) + // This job is targeting StatefulSets and not Pods + sts, err := r.getDatacenterStatefulSets(ctx, dc) + if err != nil { + return ctrl.Result{}, err + } + + res, err = r.restartSts(ctx, sts, taskConfig) + if err != nil { + return ctrl.Result{}, err + } + break JobDefinition case api.CommandReplaceNode: r.replace(taskConfig) case "forceupgraderacks": @@ -333,6 +345,7 @@ func (r *CassandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques cassTask.Status.Active = 0 cassTask.Status.CompletionTime = &timeNow + SetCondition(&cassTask, api.JobComplete, corev1.ConditionTrue) // Requeue for deletion later deletionTime := calculateDeletionTime(&cassTask) @@ -344,8 +357,6 @@ func (r *CassandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques cassTask.Status.Succeeded = completed cassTask.Status.Failed = failed - SetCondition(&cassTask, api.JobComplete, corev1.ConditionTrue) - if err = r.Client.Status().Update(ctx, &cassTask); err != nil { return ctrl.Result{}, err } @@ -498,6 +509,16 @@ func (r *CassandraTaskReconciler) getDatacenterPods(ctx context.Context, dc *cas return pods.Items, nil } +func (r *CassandraTaskReconciler) getDatacenterStatefulSets(ctx context.Context, dc *cassapi.CassandraDatacenter) ([]appsv1.StatefulSet, error) { + var sts appsv1.StatefulSetList + + if err := r.Client.List(ctx, &sts, client.InNamespace(dc.Namespace), client.MatchingLabels(dc.GetDatacenterLabels())); err != nil { + return nil, err + } + + return sts.Items, nil +} + // cleanupJobAnnotations removes the job annotations from the pod once it has finished func (r *CassandraTaskReconciler) cleanupJobAnnotations(ctx context.Context, dc *cassapi.CassandraDatacenter, taskId string) error { logger := log.FromContext(ctx) diff --git a/controllers/control/cassandratask_controller_test.go b/controllers/control/cassandratask_controller_test.go index 31c18213..ee36b784 100644 --- a/controllers/control/cassandratask_controller_test.go +++ b/controllers/control/cassandratask_controller_test.go @@ -13,6 +13,7 @@ import ( api "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -25,12 +26,16 @@ var ( callDetails *httphelper.CallDetails // testNamespaceName = "" testDatacenterName = "dc1" + testDc *cassdcapi.CassandraDatacenter + clusterName = "" + nodeCount = 9 + rackCount = 3 ) func createDatacenter(dcName, namespace string) func() { return func() { By("Create Datacenter, pods and set dc status to Ready") - clusterName := fmt.Sprintf("test-%s", dcName) + clusterName = fmt.Sprintf("test-%s", dcName) testNamespace := &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: namespace, @@ -43,7 +48,7 @@ func createDatacenter(dcName, namespace string) func() { Namespace: namespace, } - testDc := &cassdcapi.CassandraDatacenter{ + testDc = &cassdcapi.CassandraDatacenter{ ObjectMeta: metav1.ObjectMeta{ Name: cassdcKey.Name, Namespace: cassdcKey.Namespace, @@ -52,12 +57,19 @@ func createDatacenter(dcName, namespace string) func() { Spec: cassdcapi.CassandraDatacenterSpec{ ClusterName: clusterName, ServerType: "cassandra", - ServerVersion: "4.0.1", - Size: 3, + ServerVersion: "4.0.5", + Size: int32(nodeCount), }, Status: cassdcapi.CassandraDatacenterStatus{}, } + testDc.Spec.Racks = make([]cassdcapi.Rack, 3) + for i := 0; i < rackCount; i++ { + testDc.Spec.Racks[i] = cassdcapi.Rack{ + Name: fmt.Sprintf("r%d", i), + } + } + Expect(k8sClient.Create(context.Background(), testDc)).Should(Succeed()) patchCassdc := client.MergeFrom(testDc.DeepCopy()) @@ -70,16 +82,62 @@ func createDatacenter(dcName, namespace string) func() { } Expect(k8sClient.Status().Patch(context.Background(), testDc, patchCassdc)).Should(Succeed()) - for i := 0; i < int(testDc.Spec.Size); i++ { - createPod(namespace, clusterName, dcName, i) + createStatefulSets(cassdcKey.Namespace) + podsPerRack := nodeCount / rackCount + for _, rack := range testDc.Spec.Racks { + for j := 0; j < podsPerRack; j++ { + createPod(namespace, clusterName, dcName, rack.Name, j) + } + } + } +} + +func createStatefulSets(namespace string) { + podsPerRack := int32(nodeCount / rackCount) + + for _, rack := range testDc.Spec.Racks { + name := fmt.Sprintf("%s-%s-%s-sts", clusterName, testDc.Name, rack.Name) + stsKey := types.NamespacedName{Name: name, Namespace: namespace} + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: stsKey.Name, + Namespace: stsKey.Namespace, + Labels: testDc.GetRackLabels(rack.Name), + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: testDc.GetRackLabels(rack.Name), + }, + Replicas: &podsPerRack, + ServiceName: testDc.GetAllPodsServiceName(), + PodManagementPolicy: appsv1.ParallelPodManagement, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: testDc.GetRackLabels(rack.Name), + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "cassandra", + Image: "k8ssandra/cassandra-nothere:latest", + }, + }, + }, + }, + }, } + Expect(k8sClient.Create(context.Background(), sts)).Should(Succeed()) + + Expect(k8sClient.Get(context.TODO(), stsKey, sts)).Should(Succeed()) + sts.Status.CurrentRevision = "0" + Expect(k8sClient.Status().Update(context.TODO(), sts)) } } -func createPod(namespace, clusterName, dcName string, i int) { +func createPod(namespace, clusterName, dcName, rackName string, ordinal int) { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("test-cassdc-%s-pod%d", dcName, i), + Name: fmt.Sprintf("%s-%s-%s-sts-%d", clusterName, dcName, rackName, ordinal), Namespace: namespace, Labels: map[string]string{ cassdcapi.ClusterLabel: clusterName, @@ -111,6 +169,13 @@ func createPod(namespace, clusterName, dcName string, i int) { Expect(k8sClient.Status().Patch(context.Background(), pod, patchPod)).Should(Succeed()) } +func deleteDatacenter(namespace string) { + // Delete pods, statefulsets and dc + Expect(k8sClient.DeleteAllOf(context.TODO(), &appsv1.StatefulSet{}, client.InNamespace(namespace))).Should(Succeed()) + Expect(k8sClient.DeleteAllOf(context.TODO(), &corev1.Pod{}, client.InNamespace(namespace))).Should(Succeed()) + Expect(k8sClient.DeleteAllOf(context.TODO(), testDc, client.InNamespace(namespace))).Should(Succeed()) +} + func buildTask(command api.CassandraCommand, namespace string) (types.NamespacedName, *api.CassandraTask) { taskKey := types.NamespacedName{ Name: fmt.Sprintf("test-%s-task-%d", command, rand.Int31()), @@ -158,254 +223,365 @@ func waitForTaskCompletion(taskKey types.NamespacedName) *api.CassandraTask { return emptyTask } -var _ = Describe("Execute jobs against all pods", func() { - jobRunningRequeue = time.Duration(1 * time.Millisecond) - taskRunningRequeue = time.Duration(1 * time.Millisecond) - Context("Async jobs", func() { - var testNamespaceName string - BeforeEach(func() { - By("Creating a fake mgmt-api server") - var err error - callDetails = httphelper.NewCallDetails() - mockServer, err = httphelper.FakeExecutorServerWithDetails(callDetails) - testNamespaceName = fmt.Sprintf("test-task-%d", rand.Int31()) - Expect(err).ToNot(HaveOccurred()) - mockServer.Start() - By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName)) - }) +var _ = Describe("CassandraTask controller tests", func() { + Describe("Execute jobs against all pods", func() { + jobRunningRequeue = time.Duration(1 * time.Millisecond) + taskRunningRequeue = time.Duration(1 * time.Millisecond) + Context("Async jobs", func() { + var testNamespaceName string + BeforeEach(func() { + By("Creating a fake mgmt-api server") + var err error + callDetails = httphelper.NewCallDetails() + mockServer, err = httphelper.FakeExecutorServerWithDetails(callDetails) + testNamespaceName = fmt.Sprintf("test-task-%d", rand.Int31()) + Expect(err).ToNot(HaveOccurred()) + mockServer.Start() + By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName)) + }) - AfterEach(func() { - mockServer.Close() - }) + AfterEach(func() { + mockServer.Close() + deleteDatacenter(testNamespaceName) + }) + + When("Running rebuild in datacenter", func() { + It("Runs a rebuild task against the datacenter pods", func() { + By("Creating a task for rebuild") + taskKey := createTask(api.CommandRebuild, testNamespaceName) + + completedTask := waitForTaskCompletion(taskKey) - When("Running rebuild in datacenter", func() { - It("Runs a rebuild task against the datacenter pods", func() { + Expect(callDetails.URLCounts["/api/v1/ops/node/rebuild"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + }) + It("Runs a UpgradeSSTables task against the datacenter pods", func() { By("Creating a task for rebuild") - taskKey := createTask(api.CommandRebuild, testNamespaceName) + taskKey := createTask(api.CommandUpgradeSSTables, testNamespaceName) completedTask := waitForTaskCompletion(taskKey) - Expect(callDetails.URLCounts["/api/v1/ops/node/rebuild"]).To(Equal(3)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 3)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 3)) + Expect(callDetails.URLCounts["/api/v1/ops/tables/sstables/upgrade"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) }) - }) - It("Runs a UpgradeSSTables task against the datacenter pods", func() { - By("Creating a task for rebuild") - taskKey := createTask(api.CommandUpgradeSSTables, testNamespaceName) + When("Running cleanup twice in the same datacenter", func() { + It("Runs a cleanup task against the datacenter pods", func() { + By("Creating a task for cleanup") + taskKey := createTask(api.CommandCleanup, testNamespaceName) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) - completedTask := waitForTaskCompletion(taskKey) + // This is hacky approach to run two jobs twice in the same test - resetting the callDetails + callDetails.URLCounts = make(map[string]int) + By("Creating a task for second cleanup") + taskKey = createTask("cleanup", testNamespaceName) - Expect(callDetails.URLCounts["/api/v1/ops/tables/sstables/upgrade"]).To(Equal(3)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 3)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 3)) + completedTask = waitForTaskCompletion(taskKey) - // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + }) + }) + Context("Failing jobs", func() { + When("In a datacenter", func() { + It("Should fail once when no retryPolicy is set", func() { + By("Creating fake mgmt-api server") + callDetails := httphelper.NewCallDetails() + mockServer, err := httphelper.FakeExecutorServerWithDetailsFails(callDetails) + testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31()) + Expect(err).ToNot(HaveOccurred()) + mockServer.Start() + defer mockServer.Close() + + By("create datacenter", createDatacenter("dc1", testFailedNamespaceName)) + By("Create a task for cleanup") + taskKey := createTask(api.CommandCleanup, testFailedNamespaceName) + + completedTask := waitForTaskCompletion(taskKey) + + Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount)) + + Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount)) + }) + It("If retryPolicy is set, we should see a retry", func() { + By("Creating fake mgmt-api server") + callDetails := httphelper.NewCallDetails() + mockServer, err := httphelper.FakeExecutorServerWithDetailsFails(callDetails) + testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31()) + Expect(err).ToNot(HaveOccurred()) + mockServer.Start() + defer mockServer.Close() + + By("create datacenter", createDatacenter("dc1", testFailedNamespaceName)) + By("Creating a task for cleanup") + taskKey, task := buildTask(api.CommandCleanup, testFailedNamespaceName) + task.Spec.RestartPolicy = corev1.RestartPolicyOnFailure + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + completedTask := waitForTaskCompletion(taskKey) + + // Due to retry, we have double the amount of calls + Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(nodeCount * 2)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount*2)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount*2)) + + Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount)) + }) + }) }) - When("Running cleanup twice in the same datacenter", func() { + Context("Sync jobs", func() { + var testNamespaceName string + BeforeEach(func() { + By("Creating fake synchronous mgmt-api server") + var err error + callDetails = httphelper.NewCallDetails() + mockServer, err = httphelper.FakeServerWithoutFeaturesEndpoint(callDetails) + testNamespaceName = fmt.Sprintf("test-sync-task-%d", rand.Int31()) + Expect(err).ToNot(HaveOccurred()) + mockServer.Start() + By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName)) + }) + + AfterEach(func() { + mockServer.Close() + deleteDatacenter(testNamespaceName) + }) + It("Runs a cleanup task against the datacenter pods", func() { By("Creating a task for cleanup") taskKey := createTask(api.CommandCleanup, testNamespaceName) completedTask := waitForTaskCompletion(taskKey) - Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(3)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 3)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 3)) + Expect(callDetails.URLCounts["/api/v0/ops/keyspace/cleanup"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) - // This is hacky approach to run two jobs twice in the same test - resetting the callDetails - callDetails.URLCounts = make(map[string]int) - By("Creating a task for second cleanup") - taskKey = createTask("cleanup", testNamespaceName) + It("Runs a upgradesstables task against the datacenter pods", func() { + By("Creating a task for upgradesstables") + time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new + taskKey := createTask(api.CommandUpgradeSSTables, testNamespaceName) - completedTask = waitForTaskCompletion(taskKey) + completedTask := waitForTaskCompletion(taskKey) - Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(3)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 3)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 3)) + Expect(callDetails.URLCounts["/api/v0/ops/tables/sstables/upgrade"]).To(Equal(nodeCount)) + Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) + Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) }) - }) - }) - Context("Failing jobs", func() { - When("In a datacenter", func() { - It("Should fail once when no retryPolicy is set", func() { - By("Creating fake mgmt-api server") - callDetails := httphelper.NewCallDetails() - mockServer, err := httphelper.FakeExecutorServerWithDetailsFails(callDetails) - testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31()) - Expect(err).ToNot(HaveOccurred()) - mockServer.Start() - defer mockServer.Close() - By("create datacenter", createDatacenter("dc1", testFailedNamespaceName)) - By("Create a task for cleanup") - taskKey := createTask(api.CommandCleanup, testFailedNamespaceName) + It("Replaces a node in the datacenter", func() { + By("Creating a task for replacenode") + time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new + taskKey, task := buildTask(api.CommandReplaceNode, testNamespaceName) - completedTask := waitForTaskCompletion(taskKey) + podKey := types.NamespacedName{ + Name: fmt.Sprintf("%s-%s-r1-sts-%d", clusterName, testDatacenterName, 2), + Namespace: testNamespaceName, + } - Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(3)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 3)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 3)) + task.Spec.Jobs[0].Arguments = map[string]string{"pod_name": podKey.Name} + Expect(k8sClient.Create(context.TODO(), task)).Should(Succeed()) - Expect(completedTask.Status.Failed).To(BeNumerically(">=", 3)) - }) - It("If retryPolicy is set, we should see a retry", func() { - By("Creating fake mgmt-api server") - callDetails := httphelper.NewCallDetails() - mockServer, err := httphelper.FakeExecutorServerWithDetailsFails(callDetails) - testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31()) - Expect(err).ToNot(HaveOccurred()) - mockServer.Start() - defer mockServer.Close() + // Verify the pod2 was deleted + Eventually(func() bool { + pod := &corev1.Pod{} + err := k8sClient.Get(context.TODO(), podKey, pod) + return err != nil && errors.IsNotFound(err) + }, time.Duration(3*time.Second)).Should(BeTrue()) - By("create datacenter", createDatacenter("dc1", testFailedNamespaceName)) - By("Creating a task for cleanup") - taskKey, task := buildTask(api.CommandCleanup, testFailedNamespaceName) - task.Spec.RestartPolicy = corev1.RestartPolicyOnFailure - Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + // Recreate it so the process "finishes" + createPod(testNamespaceName, clusterName, testDatacenterName, "r1", 2) completedTask := waitForTaskCompletion(taskKey) - // Due to retry, we have double the amount of calls - Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(6)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 6)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 6)) + // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) + Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + }) + }) + Context("Task TTL", func() { + var testNamespaceName string + BeforeEach(func() { + testNamespaceName = fmt.Sprintf("test-task-%d", rand.Int31()) + testNamespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: testNamespaceName, + }, + } + Expect(k8sClient.Create(context.Background(), testNamespace)).Should(Succeed()) + }) + It("Ensures task is deleted after TTL has expired", func() { + taskKey, task := buildTask(api.CommandCleanup, testNamespaceName) + metav1.SetMetaDataLabel(&task.ObjectMeta, taskStatusLabel, completedTaskLabelValue) + ttlTime := new(int32) + *ttlTime = 1 + task.Spec.TTLSecondsAfterFinished = ttlTime + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + timeNow := metav1.Now() + task.Status.CompletionTime = &timeNow + Expect(k8sClient.Status().Update(context.TODO(), task)).Should(Succeed()) + + Eventually(func() bool { + deletedTask := api.CassandraTask{} + err := k8sClient.Get(context.TODO(), taskKey, &deletedTask) + return err != nil && errors.IsNotFound(err) + }).Should(BeTrue()) + }) + It("Ensures task is not deleted if TTL is set to 0", func() { + taskKey, task := buildTask(api.CommandCleanup, testNamespaceName) + metav1.SetMetaDataLabel(&task.ObjectMeta, taskStatusLabel, completedTaskLabelValue) + ttlTime := new(int32) + *ttlTime = 0 + task.Spec.TTLSecondsAfterFinished = ttlTime + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) + + timeNow := metav1.Now() + task.Status.CompletionTime = &timeNow - Expect(completedTask.Status.Failed).To(BeNumerically(">=", 3)) + Expect(k8sClient.Status().Update(context.TODO(), task)).Should(Succeed()) + Consistently(func() bool { + deletedTask := api.CassandraTask{} + err := k8sClient.Get(context.TODO(), taskKey, &deletedTask) + return err == nil + }).Should(BeTrue()) }) }) }) - Context("Sync jobs", func() { + Describe("Execute jobs against all StatefulSets", func() { var testNamespaceName string BeforeEach(func() { - By("Creating fake synchronous mgmt-api server") - var err error - callDetails = httphelper.NewCallDetails() - mockServer, err = httphelper.FakeServerWithoutFeaturesEndpoint(callDetails) - testNamespaceName = fmt.Sprintf("test-sync-task-%d", rand.Int31()) - Expect(err).ToNot(HaveOccurred()) - mockServer.Start() + testNamespaceName = fmt.Sprintf("test-task-%d", rand.Int31()) By("create datacenter", createDatacenter(testDatacenterName, testNamespaceName)) }) AfterEach(func() { - mockServer.Close() - }) - - It("Runs a cleanup task against the datacenter pods", func() { - By("Creating a task for cleanup") - taskKey := createTask(api.CommandCleanup, testNamespaceName) - - completedTask := waitForTaskCompletion(taskKey) - - Expect(callDetails.URLCounts["/api/v0/ops/keyspace/cleanup"]).To(Equal(3)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) - - // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) - }) - - It("Runs a upgradesstables task against the datacenter pods", func() { - By("Creating a task for upgradesstables") - time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new - taskKey := createTask(api.CommandUpgradeSSTables, testNamespaceName) - - completedTask := waitForTaskCompletion(taskKey) - - Expect(callDetails.URLCounts["/api/v0/ops/tables/sstables/upgrade"]).To(Equal(3)) - Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(Equal(0)) - Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 1)) - - // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) + deleteDatacenter(testNamespaceName) + // Expect(k8sClient.Delete(context.TODO(), testDc)).Should(Succeed()) }) - It("Replaces a node in the datacenter", func() { - By("Creating a task for replacenode") - time.Sleep(1 * time.Second) // Otherwise the CreationTimestamp could be too new - taskKey, task := buildTask(api.CommandReplaceNode, testNamespaceName) - - podKey := types.NamespacedName{ - Name: fmt.Sprintf("test-cassdc-%s-pod%d", testDatacenterName, 2), - Namespace: testNamespaceName, - } - - task.Spec.Jobs[0].Arguments = map[string]string{"pod_name": podKey.Name} - Expect(k8sClient.Create(context.TODO(), task)).Should(Succeed()) + Context("Restart", func() { + It("Restarts a single rack", func() { + stsKey := types.NamespacedName{Namespace: testNamespaceName, Name: fmt.Sprintf("%s-%s-r1-sts", clusterName, testDc.Name)} + var sts appsv1.StatefulSet + Expect(k8sClient.Get(context.TODO(), stsKey, &sts)).Should(Succeed()) - // Verify the pod2 was deleted - Eventually(func() bool { - pod := &corev1.Pod{} - err := k8sClient.Get(context.TODO(), podKey, pod) - return err != nil && errors.IsNotFound(err) - }, time.Duration(3*time.Second)).Should(BeTrue()) + // Create task to restart r1 + taskKey, task := buildTask(api.CommandRestart, testNamespaceName) + task.Spec.Jobs[0].Arguments = map[string]string{"rack": "r1"} + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) - // Recreate it so the process "finishes" - createPod(testNamespaceName, fmt.Sprintf("test-%s", testDatacenterName), testDatacenterName, 2) + Eventually(func() bool { + Expect(k8sClient.Get(context.TODO(), stsKey, &sts)).Should(Succeed()) + _, found := sts.Spec.Template.ObjectMeta.Annotations[api.RestartedAtAnnotation] + return found + }).Should(BeTrue()) + + Expect(k8sClient.Get(context.TODO(), taskKey, task)).To(Succeed()) + Expect(task.Status.CompletionTime).To(BeNil()) + + // Imitate statefulset_controller + Expect(k8sClient.Get(context.TODO(), stsKey, &sts)).Should(Succeed()) + sts.Status.UpdatedReplicas = sts.Status.Replicas + sts.Status.ReadyReplicas = sts.Status.Replicas + sts.Status.CurrentReplicas = sts.Status.Replicas + sts.Status.UpdateRevision = "1" + sts.Status.CurrentRevision = sts.Status.UpdateRevision + sts.Status.ObservedGeneration = sts.GetObjectMeta().GetGeneration() + + Expect(k8sClient.Status().Update(context.TODO(), &sts)).Should(Succeed()) + + // Set StatefulSet properties here so that the task completes.. verify first that there's been a change (but only to r1) + _ = waitForTaskCompletion(taskKey) + + // Verify other racks haven't been modified + var stsAll appsv1.StatefulSetList + Expect(k8sClient.List(context.TODO(), &stsAll, client.MatchingLabels(map[string]string{cassdcapi.DatacenterLabel: testDc.Name}), client.InNamespace(testNamespaceName))).To(Succeed()) + Expect(len(stsAll.Items)).To(Equal(rackCount)) + + for _, sts := range stsAll.Items { + if sts.Name == stsKey.Name { + continue + } + _, found := sts.Spec.Template.ObjectMeta.Annotations[api.RestartedAtAnnotation] + Expect(found).ToNot(BeTrue()) + } + }) + It("Restarts datacenter", func() { + var stsAll appsv1.StatefulSetList + Expect(k8sClient.List(context.TODO(), &stsAll, client.MatchingLabels(map[string]string{cassdcapi.DatacenterLabel: testDc.Name}), client.InNamespace(testNamespaceName))).To(Succeed()) + Expect(len(stsAll.Items)).To(Equal(rackCount)) - completedTask := waitForTaskCompletion(taskKey) + // Create task to restart all + taskKey, task := buildTask(api.CommandRestart, testNamespaceName) + Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) - // verifyPodsHaveAnnotations(testNamespaceName, string(task.UID)) - Expect(completedTask.Status.Succeeded).To(BeNumerically(">=", 1)) - }) - }) - Context("Task TTL", func() { - var testNamespaceName string - BeforeEach(func() { - testNamespaceName = fmt.Sprintf("test-task-%d", rand.Int31()) - testNamespace := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: testNamespaceName, - }, - } - Expect(k8sClient.Create(context.Background(), testNamespace)).Should(Succeed()) - }) - It("Ensures task is deleted after TTL has expired", func() { - taskKey, task := buildTask(api.CommandCleanup, testNamespaceName) - metav1.SetMetaDataLabel(&task.ObjectMeta, taskStatusLabel, completedTaskLabelValue) - ttlTime := new(int32) - *ttlTime = 1 - task.Spec.TTLSecondsAfterFinished = ttlTime - Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) - - timeNow := metav1.Now() - task.Status.CompletionTime = &timeNow - Expect(k8sClient.Status().Update(context.TODO(), task)).Should(Succeed()) - - Eventually(func() bool { - deletedTask := api.CassandraTask{} - err := k8sClient.Get(context.TODO(), taskKey, &deletedTask) - return err != nil && errors.IsNotFound(err) - }).Should(BeTrue()) - }) - It("Ensures task is not deleted if TTL is set to 0", func() { - taskKey, task := buildTask(api.CommandCleanup, testNamespaceName) - metav1.SetMetaDataLabel(&task.ObjectMeta, taskStatusLabel, completedTaskLabelValue) - ttlTime := new(int32) - *ttlTime = 0 - task.Spec.TTLSecondsAfterFinished = ttlTime - Expect(k8sClient.Create(context.Background(), task)).Should(Succeed()) - - timeNow := metav1.Now() - task.Status.CompletionTime = &timeNow - - Expect(k8sClient.Status().Update(context.TODO(), task)).Should(Succeed()) - Consistently(func() bool { - deletedTask := api.CassandraTask{} - err := k8sClient.Get(context.TODO(), taskKey, &deletedTask) - return err == nil - }).Should(BeTrue()) + Eventually(func() bool { + Expect(k8sClient.List(context.TODO(), &stsAll, client.MatchingLabels(map[string]string{cassdcapi.DatacenterLabel: testDc.Name}), client.InNamespace(testNamespaceName))).To(Succeed()) + + inflight := 0 + + for _, sts := range stsAll.Items { + if _, found := sts.Spec.Template.ObjectMeta.Annotations[api.RestartedAtAnnotation]; found { + if sts.Status.UpdateRevision == "" { + inflight++ + } + } + } + + Expect(inflight).To(BeNumerically("<=", 1)) + + for _, sts := range stsAll.Items { + if _, found := sts.Spec.Template.ObjectMeta.Annotations[api.RestartedAtAnnotation]; found { + // Imitate statefulset_controller + if sts.Status.UpdateRevision != "1" { + sts.Status.UpdatedReplicas = sts.Status.Replicas + sts.Status.ReadyReplicas = sts.Status.Replicas + sts.Status.CurrentReplicas = sts.Status.Replicas + sts.Status.UpdateRevision = "1" + sts.Status.CurrentRevision = sts.Status.UpdateRevision + sts.Status.ObservedGeneration = sts.GetObjectMeta().GetGeneration() + + Expect(k8sClient.Status().Update(context.TODO(), &sts)).Should(Succeed()) + } + } else if !found { + return false + } + } + return true + }, "5s", "50ms").Should(BeTrue()) + + _ = waitForTaskCompletion(taskKey) + }) }) }) }) diff --git a/controllers/control/jobs.go b/controllers/control/jobs.go index 096426a5..a295ee5f 100644 --- a/controllers/control/jobs.go +++ b/controllers/control/jobs.go @@ -3,14 +3,19 @@ package control import ( "context" "fmt" + "sort" + "time" "github.com/k8ssandra/cass-operator/pkg/httphelper" "github.com/k8ssandra/cass-operator/pkg/utils" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" cassapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + api "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" ) // Cleanup functionality @@ -18,7 +23,7 @@ import ( func callCleanup(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { // TODO Add more arguments configurations keyspaceName := "" - if keyspace, found := taskConfig.Arguments["keyspace_name"]; found { + if keyspace, found := taskConfig.Arguments[api.KeyspaceArgument]; found { keyspaceName = keyspace } return nodeMgmtClient.CallKeyspaceCleanup(pod, -1, keyspaceName, nil) @@ -27,7 +32,7 @@ func callCleanup(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, task func callCleanupSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { // TODO Add more arguments configurations keyspaceName := "" - if keyspace, found := taskConfig.Arguments["keyspace_name"]; found { + if keyspace, found := taskConfig.Arguments[api.KeyspaceArgument]; found { keyspaceName = keyspace } return nodeMgmtClient.CallKeyspaceCleanupEndpoint(pod, -1, keyspaceName, nil) @@ -42,7 +47,7 @@ func cleanup(taskConfig *TaskConfiguration) { // Rebuild functionality func callRebuild(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) (string, error) { - return nodeMgmtClient.CallDatacenterRebuild(pod, taskConfig.Arguments["source_datacenter"]) + return nodeMgmtClient.CallDatacenterRebuild(pod, taskConfig.Arguments[api.SourceDatacenterArgument]) } func rebuild(taskConfig *TaskConfiguration) { @@ -52,33 +57,55 @@ func rebuild(taskConfig *TaskConfiguration) { // Rolling restart functionality -func (r *CassandraTaskReconciler) callRestartSync(nodeMgmtClient httphelper.NodeMgmtClient, pod *corev1.Pod, taskConfig *TaskConfiguration) error { - podStartTime := pod.GetCreationTimestamp() - // TODO How do we verify the previous pod has actually restarted before we move on to restart the next one? - // cass-operator used to take care of this. - if podStartTime.Before(taskConfig.TaskStartTime) { - // TODO Our taskController needs events - // rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.RestartingCassandra, - // "Restarting Cassandra for pod %s", pod.Name) +func (r *CassandraTaskReconciler) restartSts(ctx context.Context, sts []appsv1.StatefulSet, taskConfig *TaskConfiguration) (ctrl.Result, error) { + // Sort to ensure we don't process StatefulSets in wrong order and restart multiple racks at the same time + sort.Slice(sts, func(i, j int) bool { + return sts[i].Name < sts[j].Name + }) + + restartTime := taskConfig.TaskStartTime.Format(time.RFC3339) + + if rackFilter, found := taskConfig.Arguments[api.RackArgument]; found { + singleSts := make([]appsv1.StatefulSet, 1) + for _, st := range sts { + if st.ObjectMeta.Labels[cassapi.RackLabel] == rackFilter { + singleSts[0] = st + sts = singleSts + break + } + } + } - // Drain the node - err := nodeMgmtClient.CallDrainEndpoint(pod) - if err != nil { - return err + for _, st := range sts { + if st.Spec.Template.ObjectMeta.Annotations == nil { + st.Spec.Template.ObjectMeta.Annotations = make(map[string]string) } + if st.Spec.Template.ObjectMeta.Annotations[api.RestartedAtAnnotation] == restartTime { + // This one has been called to restart already - is it ready? + + status := st.Status + if status.CurrentRevision == status.UpdateRevision && + status.UpdatedReplicas == status.Replicas && + status.CurrentReplicas == status.Replicas && + status.ReadyReplicas == status.Replicas && + status.ObservedGeneration == st.GetObjectMeta().GetGeneration() { + // This one has been updated, move on to the next one + continue + } - // Delete the pod - err = r.Client.Delete(context.Background(), pod) - if err != nil { - return err + // This is still restarting + return ctrl.Result{RequeueAfter: jobRunningRequeue}, nil + } + st.Spec.Template.ObjectMeta.Annotations[api.RestartedAtAnnotation] = restartTime + if err := r.Client.Update(ctx, &st); err != nil { + return ctrl.Result{}, err } - } - return nil -} + return ctrl.Result{RequeueAfter: jobRunningRequeue}, nil + } -func (r *CassandraTaskReconciler) restart(taskConfig *TaskConfiguration) { - taskConfig.SyncFunc = r.callRestartSync + // We're done + return ctrl.Result{}, nil } // UpgradeSSTables functionality diff --git a/tests/rolling_restart/rolling_restart_suite_test.go b/tests/rolling_restart/rolling_restart_suite_test.go index 71b7e0eb..fd4f8743 100644 --- a/tests/rolling_restart/rolling_restart_suite_test.go +++ b/tests/rolling_restart/rolling_restart_suite_test.go @@ -22,6 +22,7 @@ var ( namespace = "test-rolling-restart" dcName = "dc2" dcYaml = "../testdata/default-single-rack-2-node-dc.yaml" + taskYaml = "../testdata/tasks/rolling_restart.yaml" dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName) ns = ginkgo_util.NewWrapper(testName, namespace) ) @@ -48,7 +49,7 @@ func TestLifecycle(t *testing.T) { var _ = Describe(testName, func() { Context("when in a new cluster", func() { - Specify("the operator can perform a rolling restart", func() { + Specify("operator is installed and cluster is created", func() { By("deploy cass-operator with kustomize") err := kustomize.Deploy(namespace) Expect(err).ToNot(HaveOccurred()) @@ -61,9 +62,11 @@ var _ = Describe(testName, func() { ns.WaitForDatacenterReady(dcName) - step = "trigger restart" + }) + Specify("the operator can perform a rolling restart with rollingRestartRequested spec change", func() { + step := "trigger restart" json := `{"spec": {"rollingRestartRequested": true}}` - k = kubectl.PatchMerge(dcResource, json) + k := kubectl.PatchMerge(dcResource, json) ns.ExecAndLog(step, k) // Ensure we actually set the condition @@ -85,5 +88,21 @@ var _ = Describe(testName, func() { ns.WaitForDatacenterReady(dcName) }) + Specify("cassandratask can be used to do rolling restart of the cluster", func() { + step := "creating a cassandra task to do rolling restart" + k := kubectl.ApplyFiles(taskYaml) + ns.ExecAndLog(step, k) + + // Wait for the task to be completed + ns.WaitForCompleteTask("rolling-restart") + + // Verify each pod does have the annotation.. + json := `jsonpath={.items[0].metadata.annotations.control\.k8ssandra\.io/restartedAt}` + k = kubectl.Get("pods"). + WithLabel(fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName)). + WithFlag("field-selector", "status.phase=Running"). + FormatOutput(json) + ns.WaitForOutputPatternAndLog(step, k, `^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$`, 360) + }) }) }) diff --git a/tests/testdata/tasks/rolling_restart.yaml b/tests/testdata/tasks/rolling_restart.yaml new file mode 100644 index 00000000..bd5efe68 --- /dev/null +++ b/tests/testdata/tasks/rolling_restart.yaml @@ -0,0 +1,11 @@ +apiVersion: control.k8ssandra.io/v1alpha1 +kind: CassandraTask +metadata: + name: rolling-restart +spec: + datacenter: + name: dc2 + namespace: test-rolling-restart + jobs: + - name: restart-run + command: restart