Skip to content

Commit

Permalink
Merge pull request #111 from kerthcet/fix/updatedReplicas
Browse files Browse the repository at this point in the history
Change the semantics of leaderWorkerSet Replicas
  • Loading branch information
k8s-ci-robot authored Apr 19, 2024
2 parents 3ad6d2d + 83af7f7 commit 78268be
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 71 deletions.
6 changes: 3 additions & 3 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ type LeaderWorkerSetStatus struct {
// Conditions track the condition of the leaderworkerset.
Conditions []metav1.Condition `json:"conditions,omitempty"`

// ReadyReplicas track the number of groups that are in ready state.
// ReadyReplicas track the number of groups that are in ready state (updated or not).
ReadyReplicas int32 `json:"readyReplicas,omitempty"`

// UpdatedReplicas track the number of groups that have been updated.
// UpdatedReplicas track the number of groups that have been updated (ready or not).
UpdatedReplicas int32 `json:"updatedReplicas,omitempty"`

// Replicas track the active total number of groups.
// Replicas track the total number of groups that have been created (updated or not, ready or not)
Replicas int32 `json:"replicas,omitempty"`

// HPAPodSelector for pods that belong to the LeaderWorkerSet object, this is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15392,16 +15392,17 @@ spec:
type: string
readyReplicas:
description: ReadyReplicas track the number of groups that are in
ready state.
ready state (updated or not).
format: int32
type: integer
replicas:
description: Replicas track the active total number of groups.
description: Replicas track the total number of groups that have been
created (updated or not, ready or not)
format: int32
type: integer
updatedReplicas:
description: UpdatedReplicas track the number of groups that have
been updated.
been updated (ready or not).
format: int32
type: integer
type: object
Expand Down
36 changes: 17 additions & 19 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
}

updateStatus := false
readyCount := 0
updatedCount := 0
readyCount, updatedCount, updatedAndReadyCount := 0, 0, 0
templateHash := utils.LeaderWorkerTemplateHash(lws)

// Iterate through all statefulsets.
Expand All @@ -364,22 +363,21 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
continue
}

// this is the worker statefulset.
if statefulsetutils.StatefulsetReady(sts) {

// the worker pods are OK.
// need to check leader pod for this group.
var leaderPod corev1.Pod
if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: sts.Name}, &leaderPod); err != nil {
log.Error(err, "Fetching leader pod")
return false, err
}
if podutils.PodRunningAndReady(leaderPod) {
readyCount++
var leaderPod corev1.Pod
if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: sts.Name}, &leaderPod); err != nil {
log.Error(err, "Fetching leader pod")
return false, err
}

if sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash && leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash {
updatedCount++
}
var ready bool
if statefulsetutils.StatefulsetReady(sts) && podutils.PodRunningAndReady(leaderPod) {
ready = true
readyCount++
}
if sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash && leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash {
updatedCount++
if ready {
updatedAndReadyCount++
}
}
}
Expand All @@ -394,7 +392,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
updateStatus = true
}

condition := makeCondition(updatedCount == int(*lws.Spec.Replicas))
condition := makeCondition(updatedAndReadyCount == int(*lws.Spec.Replicas))
updateCondition := setCondition(lws, condition)
// if condition changed, record events
if updateCondition {
Expand All @@ -416,7 +414,7 @@ func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leade
}

// retrieve the current number of replicas -- the number of leaders
replicas := int(*sts.Spec.Replicas)
replicas := int(sts.Status.Replicas)
if lws.Status.Replicas != int32(replicas) {
lws.Status.Replicas = int32(replicas)
updateStatus = true
Expand Down
64 changes: 22 additions & 42 deletions test/integration/controllers/leaderworkerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
v1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -91,12 +92,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderworkerset leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed())
testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3))
testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3))
var leaderSts appsv1.StatefulSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed())
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, &leaderworkerset, 2, 3)).To(gomega.Succeed())
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed())
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 2, 3)).To(gomega.Succeed())
},
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient)
Expand All @@ -113,12 +112,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderworkerset leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed())
testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3))
testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3))
var leaderSts appsv1.StatefulSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed())
testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset)
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed())
testing.DeleteLeaderPods(ctx, k8sClient, lws)
},
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient)
Expand All @@ -135,12 +132,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderworkerset leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed())
testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(0))
testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(0))
var leaderSts appsv1.StatefulSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed())
testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset)
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed())
testing.DeleteLeaderPods(ctx, k8sClient, lws)
},
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidReplicasCount(ctx, deployment, 0, k8sClient)
Expand All @@ -157,12 +152,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderworkerset leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed())
testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3))
testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3))
var leaderSts appsv1.StatefulSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed())
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, &leaderworkerset, 0, 3)).To(gomega.Succeed())
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed())
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 0, 3)).To(gomega.Succeed())
},
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient)
Expand Down Expand Up @@ -298,7 +291,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
var scale v1.Scale
var scale autoscalingv1.Scale
gomega.Expect(k8sClient.SubResource("scale").Get(ctx, lws, &scale)).To(gomega.Succeed())
gomega.Expect(int32(scale.Spec.Replicas)).To(gomega.Equal(*lws.Spec.Replicas))
gomega.Expect(int32(scale.Status.Replicas)).To(gomega.Equal(lws.Status.Replicas))
Expand All @@ -307,24 +300,17 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
},
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var scale v1.Scale
gomega.Expect(k8sClient.SubResource("scale").Get(ctx, lws, &scale)).To(gomega.Succeed())
scale.Spec.Replicas = 3
lwsUnstructed, _ := ToUnstructured(lws)
lwsUnstructed.SetAPIVersion("leaderworkerset.x-k8s.io/v1")
lwsUnstructed.SetKind("LeaderWorkerSet")
scaleUnstructed, _ := ToUnstructured(scale.DeepCopy())
scaleUnstructed.SetAPIVersion("autoscaling/v1")
scaleUnstructed.SetKind("Scale")
gomega.Expect(k8sClient.SubResource("scale").Update(ctx, lwsUnstructed, client.WithSubResourceBody(scaleUnstructed))).To(gomega.Succeed())
dep := &leaderworkerset.LeaderWorkerSet{ObjectMeta: metav1.ObjectMeta{Namespace: lws.Namespace, Name: lws.Name}}
scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: 3}}
gomega.Expect(k8sClient.SubResource("scale").Update(ctx, dep, client.WithSubResourceBody(scale))).To(gomega.Succeed())
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
gomega.Eventually(func() (int32, error) {
var leaderWorkerSet leaderworkerset.LeaderWorkerSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet); err != nil {
return -1, err
return 0, err
}
return leaderWorkerSet.Status.Replicas, nil
return *leaderWorkerSet.Spec.Replicas, nil
}, testing.Timeout, testing.Interval).Should(gomega.Equal(int32(3)))
},
},
Expand Down Expand Up @@ -451,13 +437,6 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
gomega.Eventually(func() (int32, error) {
var leaderWorkerSet leaderworkerset.LeaderWorkerSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet); err != nil {
return -1, err
}
return leaderWorkerSet.Status.Replicas, nil
}, testing.Timeout, testing.Interval).Should(gomega.Equal(int32(2)))
testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient)
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
Expand Down Expand Up @@ -561,7 +540,8 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 2)
testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient)
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 1)
// 3-index status is unready but template already updated.
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 2)
},
},
{
Expand Down Expand Up @@ -884,7 +864,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
return k8sClient.Update(ctx, &leaderworkerset)
}, testing.Timeout, testing.Interval).Should(gomega.Succeed())
// Manually delete leader pods here because we have no statefulset controller.
testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset)
testing.DeleteLeaderPods(ctx, k8sClient, &leaderworkerset)
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient)
Expand Down
17 changes: 13 additions & 4 deletions test/testutils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,18 @@ func CreateWorkerPodsForLeaderPod(ctx context.Context, leaderPod corev1.Pod, k8s
}).Should(gomega.Succeed())
}

func DeleteLeaderPods(ctx context.Context, k8sClient client.Client, lws leaderworkerset.LeaderWorkerSet) {
func DeleteLeaderPods(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet) {
// delete pods with the highest indexes
var leaders corev1.PodList
gomega.Expect(k8sClient.List(ctx, &leaders, client.InNamespace(lws.Namespace), &client.MatchingLabels{leaderworkerset.WorkerIndexLabelKey: "0"})).To(gomega.Succeed())

var leaderWorkerSet leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet)).To(gomega.Succeed())

// we don't have "slice" package before go1.21, could only manually delete pods with largest index
for i := range leaders.Items {
index, _ := strconv.Atoi(leaders.Items[i].Name[len(leaders.Items[i].Name)-1:])
if index >= int(*lws.Spec.Replicas) {
if index >= int(*leaderWorkerSet.Spec.Replicas) {
gomega.Expect(k8sClient.Delete(ctx, &leaders.Items[i])).To(gomega.Succeed())
// delete worker statefulset on behalf of kube-controller-manager
var sts appsv1.StatefulSet
Expand Down Expand Up @@ -360,8 +364,13 @@ func ValidatePodExclusivePlacementTerms(pod corev1.Pod) bool {

func UpdateReplicaCount(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, count int32) {
gomega.Eventually(func() error {
lws.Spec.Replicas = ptr.To[int32](count)
return k8sClient.Update(ctx, lws)
var leaderworkerset leaderworkerset.LeaderWorkerSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset); err != nil {
return err
}

leaderworkerset.Spec.Replicas = ptr.To[int32](count)
return k8sClient.Update(ctx, &leaderworkerset)
}, Timeout, Interval).Should(gomega.Succeed())
}

Expand Down

0 comments on commit 78268be

Please sign in to comment.