diff --git a/api/leaderworkerset/v1/leaderworkerset_types.go b/api/leaderworkerset/v1/leaderworkerset_types.go index dd581beb..3e259929 100644 --- a/api/leaderworkerset/v1/leaderworkerset_types.go +++ b/api/leaderworkerset/v1/leaderworkerset_types.go @@ -175,13 +175,13 @@ type LeaderWorkerSetStatus struct { // Conditions track the condition of the leaderworkerset. Conditions []metav1.Condition `json:"conditions,omitempty"` - // ReadyReplicas track the number of groups that are in ready state. + // ReadyReplicas track the number of groups that are in ready state (updated or not). ReadyReplicas int32 `json:"readyReplicas,omitempty"` - // UpdatedReplicas track the number of groups that have been updated. + // UpdatedReplicas track the number of groups that have been updated (ready or not). UpdatedReplicas int32 `json:"updatedReplicas,omitempty"` - // Replicas track the active total number of groups. + // Replicas track the total number of groups that have been created (updated or not, ready or not) Replicas int32 `json:"replicas,omitempty"` // HPAPodSelector for pods that belong to the LeaderWorkerSet object, this is diff --git a/config/crd/bases/leaderworkerset.x-k8s.io_leaderworkersets.yaml b/config/crd/bases/leaderworkerset.x-k8s.io_leaderworkersets.yaml index 75d44d07..6965ceb5 100644 --- a/config/crd/bases/leaderworkerset.x-k8s.io_leaderworkersets.yaml +++ b/config/crd/bases/leaderworkerset.x-k8s.io_leaderworkersets.yaml @@ -15392,16 +15392,17 @@ spec: type: string readyReplicas: description: ReadyReplicas track the number of groups that are in - ready state. + ready state (updated or not). format: int32 type: integer replicas: - description: Replicas track the active total number of groups. + description: Replicas track the total number of groups that have been + created (updated or not, ready or not) format: int32 type: integer updatedReplicas: description: UpdatedReplicas track the number of groups that have - been updated. + been updated (ready or not). format: int32 type: integer type: object diff --git a/pkg/controllers/leaderworkerset_controller.go b/pkg/controllers/leaderworkerset_controller.go index aa3cc07f..03a6eaa1 100644 --- a/pkg/controllers/leaderworkerset_controller.go +++ b/pkg/controllers/leaderworkerset_controller.go @@ -354,8 +354,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l } updateStatus := false - readyCount := 0 - updatedCount := 0 + readyCount, updatedCount, updatedAndReadyCount := 0, 0, 0 templateHash := utils.LeaderWorkerTemplateHash(lws) // Iterate through all statefulsets. @@ -364,22 +363,21 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l continue } - // this is the worker statefulset. - if statefulsetutils.StatefulsetReady(sts) { - - // the worker pods are OK. - // need to check leader pod for this group. - var leaderPod corev1.Pod - if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: sts.Name}, &leaderPod); err != nil { - log.Error(err, "Fetching leader pod") - return false, err - } - if podutils.PodRunningAndReady(leaderPod) { - readyCount++ + var leaderPod corev1.Pod + if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: sts.Name}, &leaderPod); err != nil { + log.Error(err, "Fetching leader pod") + return false, err + } - if sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash && leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash { - updatedCount++ - } + var ready bool + if statefulsetutils.StatefulsetReady(sts) && podutils.PodRunningAndReady(leaderPod) { + ready = true + readyCount++ + } + if sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash && leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash { + updatedCount++ + if ready { + updatedAndReadyCount++ } } } @@ -394,7 +392,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l updateStatus = true } - condition := makeCondition(updatedCount == int(*lws.Spec.Replicas)) + condition := makeCondition(updatedAndReadyCount == int(*lws.Spec.Replicas)) updateCondition := setCondition(lws, condition) // if condition changed, record events if updateCondition { @@ -416,7 +414,7 @@ func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leade } // retrieve the current number of replicas -- the number of leaders - replicas := int(*sts.Spec.Replicas) + replicas := int(sts.Status.Replicas) if lws.Status.Replicas != int32(replicas) { lws.Status.Replicas = int32(replicas) updateStatus = true diff --git a/test/integration/controllers/leaderworkerset_test.go b/test/integration/controllers/leaderworkerset_test.go index d8375be2..bef70d8b 100644 --- a/test/integration/controllers/leaderworkerset_test.go +++ b/test/integration/controllers/leaderworkerset_test.go @@ -22,6 +22,7 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" v1 "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -91,12 +92,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { updates: []*update{ { lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { - var leaderworkerset leaderworkerset.LeaderWorkerSet - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed()) - testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3)) + testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3)) var leaderSts appsv1.StatefulSet - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed()) - gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, &leaderworkerset, 2, 3)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed()) + gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 2, 3)).To(gomega.Succeed()) }, checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) { testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient) @@ -113,12 +112,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { updates: []*update{ { lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { - var leaderworkerset leaderworkerset.LeaderWorkerSet - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed()) - testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3)) + testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3)) var leaderSts appsv1.StatefulSet - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed()) - testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset) + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed()) + testing.DeleteLeaderPods(ctx, k8sClient, lws) }, checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) { testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient) @@ -135,12 +132,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { updates: []*update{ { lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { - var leaderworkerset leaderworkerset.LeaderWorkerSet - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed()) - testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(0)) + testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(0)) var leaderSts appsv1.StatefulSet - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed()) - testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset) + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed()) + testing.DeleteLeaderPods(ctx, k8sClient, lws) }, checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) { testing.ExpectValidReplicasCount(ctx, deployment, 0, k8sClient) @@ -157,12 +152,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { updates: []*update{ { lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { - var leaderworkerset leaderworkerset.LeaderWorkerSet - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed()) - testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3)) + testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3)) var leaderSts appsv1.StatefulSet - gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed()) - gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, &leaderworkerset, 0, 3)).To(gomega.Succeed()) + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed()) + gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 0, 3)).To(gomega.Succeed()) }, checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) { testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient) @@ -298,7 +291,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { updates: []*update{ { checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { - var scale v1.Scale + var scale autoscalingv1.Scale gomega.Expect(k8sClient.SubResource("scale").Get(ctx, lws, &scale)).To(gomega.Succeed()) gomega.Expect(int32(scale.Spec.Replicas)).To(gomega.Equal(*lws.Spec.Replicas)) gomega.Expect(int32(scale.Status.Replicas)).To(gomega.Equal(lws.Status.Replicas)) @@ -307,24 +300,17 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { }, { lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) { - var scale v1.Scale - gomega.Expect(k8sClient.SubResource("scale").Get(ctx, lws, &scale)).To(gomega.Succeed()) - scale.Spec.Replicas = 3 - lwsUnstructed, _ := ToUnstructured(lws) - lwsUnstructed.SetAPIVersion("leaderworkerset.x-k8s.io/v1") - lwsUnstructed.SetKind("LeaderWorkerSet") - scaleUnstructed, _ := ToUnstructured(scale.DeepCopy()) - scaleUnstructed.SetAPIVersion("autoscaling/v1") - scaleUnstructed.SetKind("Scale") - gomega.Expect(k8sClient.SubResource("scale").Update(ctx, lwsUnstructed, client.WithSubResourceBody(scaleUnstructed))).To(gomega.Succeed()) + dep := &leaderworkerset.LeaderWorkerSet{ObjectMeta: metav1.ObjectMeta{Namespace: lws.Namespace, Name: lws.Name}} + scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: 3}} + gomega.Expect(k8sClient.SubResource("scale").Update(ctx, dep, client.WithSubResourceBody(scale))).To(gomega.Succeed()) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { gomega.Eventually(func() (int32, error) { var leaderWorkerSet leaderworkerset.LeaderWorkerSet if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet); err != nil { - return -1, err + return 0, err } - return leaderWorkerSet.Status.Replicas, nil + return *leaderWorkerSet.Spec.Replicas, nil }, testing.Timeout, testing.Interval).Should(gomega.Equal(int32(3))) }, }, @@ -451,13 +437,6 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { updates: []*update{ { checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { - gomega.Eventually(func() (int32, error) { - var leaderWorkerSet leaderworkerset.LeaderWorkerSet - if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet); err != nil { - return -1, err - } - return leaderWorkerSet.Status.Replicas, nil - }, testing.Timeout, testing.Interval).Should(gomega.Equal(int32(2))) testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient) testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true) testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing") @@ -561,7 +540,8 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready") testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 2) testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient) - testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 1) + // 3-index status is unready but template already updated. + testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 2) }, }, { @@ -884,7 +864,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() { return k8sClient.Update(ctx, &leaderworkerset) }, testing.Timeout, testing.Interval).Should(gomega.Succeed()) // Manually delete leader pods here because we have no statefulset controller. - testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset) + testing.DeleteLeaderPods(ctx, k8sClient, &leaderworkerset) }, checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) { testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient) diff --git a/test/testutils/util.go b/test/testutils/util.go index 9044f83f..62d5f2ee 100644 --- a/test/testutils/util.go +++ b/test/testutils/util.go @@ -76,14 +76,18 @@ func CreateWorkerPodsForLeaderPod(ctx context.Context, leaderPod corev1.Pod, k8s }).Should(gomega.Succeed()) } -func DeleteLeaderPods(ctx context.Context, k8sClient client.Client, lws leaderworkerset.LeaderWorkerSet) { +func DeleteLeaderPods(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet) { // delete pods with the highest indexes var leaders corev1.PodList gomega.Expect(k8sClient.List(ctx, &leaders, client.InNamespace(lws.Namespace), &client.MatchingLabels{leaderworkerset.WorkerIndexLabelKey: "0"})).To(gomega.Succeed()) + + var leaderWorkerSet leaderworkerset.LeaderWorkerSet + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet)).To(gomega.Succeed()) + // we don't have "slice" package before go1.21, could only manually delete pods with largest index for i := range leaders.Items { index, _ := strconv.Atoi(leaders.Items[i].Name[len(leaders.Items[i].Name)-1:]) - if index >= int(*lws.Spec.Replicas) { + if index >= int(*leaderWorkerSet.Spec.Replicas) { gomega.Expect(k8sClient.Delete(ctx, &leaders.Items[i])).To(gomega.Succeed()) // delete worker statefulset on behalf of kube-controller-manager var sts appsv1.StatefulSet @@ -360,8 +364,13 @@ func ValidatePodExclusivePlacementTerms(pod corev1.Pod) bool { func UpdateReplicaCount(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, count int32) { gomega.Eventually(func() error { - lws.Spec.Replicas = ptr.To[int32](count) - return k8sClient.Update(ctx, lws) + var leaderworkerset leaderworkerset.LeaderWorkerSet + if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset); err != nil { + return err + } + + leaderworkerset.Spec.Replicas = ptr.To[int32](count) + return k8sClient.Update(ctx, &leaderworkerset) }, Timeout, Interval).Should(gomega.Succeed()) }