diff --git a/Makefile b/Makefile index 75771d2f..2b06a749 100644 --- a/Makefile +++ b/Makefile @@ -295,4 +295,3 @@ toc-verify: .PHONY: generate-apiref generate-apiref: genref cd $(PROJECT_DIR)/hack/genref/ && $(GENREF) -o $(PROJECT_DIR)/docs/reference - \ No newline at end of file diff --git a/pkg/controllers/leaderworkerset_controller.go b/pkg/controllers/leaderworkerset_controller.go index 049da38c..07d87abf 100644 --- a/pkg/controllers/leaderworkerset_controller.go +++ b/pkg/controllers/leaderworkerset_controller.go @@ -337,28 +337,24 @@ func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws // updates the condition of the leaderworkerset to either Progressing or Available. func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (bool, error) { log := ctrl.LoggerFrom(ctx) - stsSelector := client.MatchingLabels(map[string]string{ - leaderworkerset.SetNameLabelKey: lws.Name, + podSelector := client.MatchingLabels(map[string]string{ + leaderworkerset.SetNameLabelKey: lws.Name, + leaderworkerset.WorkerIndexLabelKey: "0", }) - - // update the condition based on the status of all statefulsets owned by the lws. - var lwssts appsv1.StatefulSetList - if err := r.List(ctx, &lwssts, stsSelector, client.InNamespace(lws.Namespace)); err != nil { - log.Error(err, "Fetching statefulsets managed by leaderworkerset instance") + leaderPodList := &corev1.PodList{} + if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil { + log.Error(err, "Fetching leaderPods") return false, err } updateStatus := false readyCount, updatedCount, updatedNonBurstWorkerCount, currentNonBurstWorkerCount, updatedAndReadyCount := 0, 0, 0, 0, 0 templateHash := utils.LeaderWorkerTemplateHash(lws) + noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1 - // Iterate through all statefulsets. - for _, sts := range lwssts.Items { - if sts.Name == lws.Name { - continue - } - - index, err := strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey]) + // Iterate through all leaderPods. + for _, pod := range leaderPodList.Items { + index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey]) if err != nil { return false, err } @@ -366,18 +362,20 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l currentNonBurstWorkerCount++ } - var leaderPod corev1.Pod - if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: sts.Name}, &leaderPod); err != nil { - log.Error(err, "Fetching leader pod") - return false, err + var sts appsv1.StatefulSet + if !noWorkerSts { + if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil { + log.Error(err, "Fetching worker statefulSet") + return false, err + } } var ready, updated bool - if statefulsetutils.StatefulsetReady(sts) && podutils.PodRunningAndReady(leaderPod) { + if (noWorkerSts || statefulsetutils.StatefulsetReady(sts)) && podutils.PodRunningAndReady(pod) { ready = true readyCount++ } - if sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash && leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash { + if (noWorkerSts || sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash) && pod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash { updated = true updatedCount++ if index < int(*lws.Spec.Replicas) { @@ -424,7 +422,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l return updateStatus || updateCondition, nil } -// Updates status and condition of LeaderWorkerSet and returns whether or not an upate actually occurred. +// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred. func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error { updateStatus := false log := ctrl.LoggerFrom(ctx) @@ -502,17 +500,18 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le if err := r.List(ctx, &stsList, stsSelector, client.InNamespace(lws.Namespace)); err != nil { return 0, 0, err } - sortedSts := utils.SortByIndex(func(sts appsv1.StatefulSet) (int, error) { return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey]) }, stsList.Items, int(stsReplicas)) templateHash := utils.LeaderWorkerTemplateHash(lws) + // Once size==1, no worker statefulSets will be created. + noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1 processReplica := func(index int32) (ready bool) { nominatedName := fmt.Sprintf("%s-%d", lws.Name, index) // It can happen that the leader pod or the worker statefulset hasn't created yet // or under rebuilding, which also indicates not ready. - if nominatedName != sortedPods[index].Name || nominatedName != sortedSts[index].Name { + if nominatedName != sortedPods[index].Name || (!noWorkerSts && nominatedName != sortedSts[index].Name) { return false } @@ -521,6 +520,10 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le return false } + if noWorkerSts { + return true + } + stsTemplateHash := sortedSts[index].Labels[leaderworkerset.TemplateRevisionHashKey] return stsTemplateHash == templateHash && statefulsetutils.StatefulsetReady(sortedSts[index]) } diff --git a/pkg/controllers/pod_controller.go b/pkg/controllers/pod_controller.go index 1aae734b..7b4d5b73 100644 --- a/pkg/controllers/pod_controller.go +++ b/pkg/controllers/pod_controller.go @@ -102,6 +102,11 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return ctrl.Result{}, nil } + // Once size = 1, no need to create worker statefulSets. + if *leaderWorkerSet.Spec.LeaderWorkerTemplate.Size == 1 { + return ctrl.Result{}, nil + } + // logic for handling leader pod if leaderWorkerSet.Spec.StartupPolicy == leaderworkerset.LeaderReadyStartupPolicy && !k8spodutils.IsPodReady(&pod) { log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.") diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index d4d2fa09..8d51a390 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -91,6 +91,23 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() { } }) + ginkgo.It("Can create/update a lws with size=1", func() { + lws = testing.BuildLeaderWorkerSet(ns.Name).Replica(4).MaxSurge(1).Size(1).RestartPolicy(v1.RecreateGroupOnPodRestart).Obj() + testing.MustCreateLws(ctx, k8sClient, lws) + + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) + testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true) + testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready") + + testing.UpdateWorkerTemplate(ctx, k8sClient, lws) + // Happen during rolling update. + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 5) + + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4) + testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true) + testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready") + }) + ginkgo.It("Can perform a rolling update", func() { lws := testing.BuildLeaderWorkerSet(ns.Name).Obj() testing.MustCreateLws(ctx, k8sClient, lws) diff --git a/test/integration/controllers/leaderworkerset_test.go b/test/integration/controllers/leaderworkerset_test.go index 9d3211c8..593f7af8 100644 --- a/test/integration/controllers/leaderworkerset_test.go +++ b/test/integration/controllers/leaderworkerset_test.go @@ -162,13 +162,21 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { }), ginkgo.Entry("group size is 1", &testCase{ makeLeaderWorkerSet: func(nsName string) *testing.LeaderWorkerSetWrapper { - return testing.BuildLeaderWorkerSet(nsName).Size(1) + return testing.BuildLeaderWorkerSet(nsName).Name("iiiiiiiiiiiiiiiii").Size(1) }, updates: []*update{ { - checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) { - testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, deployment, 2) - testing.ExpectValidWorkerStatefulSets(ctx, deployment, k8sClient, true) + checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2) + testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true) + }, + }, + { + lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.SetLeaderPodsToReady(ctx, k8sClient, lws, 0, int(*lws.Spec.Replicas)) + }, + checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { + testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready") }, }, }, @@ -1511,7 +1519,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { }, }, }), - ginkgo.Entry("create a leaderworkerset with spec.startupPolicyy=LeaderReady", &testCase{ + ginkgo.Entry("create a leaderworkerset with spec.startupPolicy=LeaderReady", &testCase{ makeLeaderWorkerSet: func(nsName string) *testing.LeaderWorkerSetWrapper { return testing.BuildLeaderWorkerSet(nsName).Replica(4).StartupPolicy(leaderworkerset.LeaderReadyStartupPolicy) }, @@ -1540,7 +1548,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { }, }, }), - ginkgo.Entry("create a leaderworkerset with spec.startupPolicyy=LeaderCreated", &testCase{ + ginkgo.Entry("create a leaderworkerset with spec.startupPolicy=LeaderCreated", &testCase{ makeLeaderWorkerSet: func(nsName string) *testing.LeaderWorkerSetWrapper { return testing.BuildLeaderWorkerSet(nsName).Replica(4).StartupPolicy(leaderworkerset.LeaderCreatedStartupPolicy) }, diff --git a/test/testutils/util.go b/test/testutils/util.go index 4c749025..e5ef4448 100644 --- a/test/testutils/util.go +++ b/test/testutils/util.go @@ -510,4 +510,17 @@ func SetLeaderPodsToReady(ctx context.Context, k8sClient client.Client, lws *lea for i := start; i < end; i++ { SetLeaderPodToReady(ctx, k8sClient, fmt.Sprintf("%s-%d", leaderSts.Name, i), lws) } + + // If size=1, we should trigger the leader sts update or the controller will not run reconciliation. + gomega.Eventually(func() error { + var sts appsv1.StatefulSet + if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &sts); err != nil { + return err + } + sts.Status.ReadyReplicas = *sts.Spec.Replicas + sts.Status.Replicas = *sts.Spec.Replicas + sts.Status.CurrentRevision = "" + sts.Status.UpdateRevision = "" + return k8sClient.Status().Update(ctx, &sts) + }, Timeout, Interval).Should(gomega.Succeed()) } diff --git a/test/testutils/validators.go b/test/testutils/validators.go index 73e1cd72..34030ce1 100644 --- a/test/testutils/validators.go +++ b/test/testutils/validators.go @@ -182,8 +182,12 @@ func ExpectValidWorkerStatefulSets(ctx context.Context, leaderWorkerSet *leaderw if err := k8sClient.List(ctx, &statefulSetList, client.InNamespace(lws.Namespace), &client.MatchingLabels{leaderworkerset.SetNameLabelKey: lws.Name}); err != nil { return err } - if leaderPodScheduled && int(*leaderSts.Spec.Replicas) != len(statefulSetList.Items)-1 { - return fmt.Errorf("running worker statefulsets replicas not right, want %d, got %d", *leaderSts.Spec.Replicas, len(statefulSetList.Items)-1) + stsNumber := *leaderSts.Spec.Replicas + if *lws.Spec.LeaderWorkerTemplate.Size == 1 { + stsNumber = 0 + } + if leaderPodScheduled && len(statefulSetList.Items)-1 != int(stsNumber) { + return fmt.Errorf("running worker statefulsets replicas not right, want %d, got %d", len(statefulSetList.Items)-1, stsNumber) } if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" && !leaderPodScheduled && len(statefulSetList.Items) != 1 { return fmt.Errorf("when exclusive placement is enabled, only expect sts count to be 1") diff --git a/test/testutils/wrappers.go b/test/testutils/wrappers.go index 43048ff5..3740c8b5 100644 --- a/test/testutils/wrappers.go +++ b/test/testutils/wrappers.go @@ -34,6 +34,10 @@ func (lwsWrapper *LeaderWorkerSetWrapper) Obj() *leaderworkerset.LeaderWorkerSet return &lwsWrapper.LeaderWorkerSet } +func (lwsWrapper *LeaderWorkerSetWrapper) Name(name string) *LeaderWorkerSetWrapper { + lwsWrapper.ObjectMeta.Name = name + return lwsWrapper +} func (lwsWrapper *LeaderWorkerSetWrapper) Replica(count int) *LeaderWorkerSetWrapper { lwsWrapper.Spec.Replicas = ptr.To[int32](int32(count)) return lwsWrapper