Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the semantics of leaderWorkerSet Replicas #111

Merged
merged 3 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/leaderworkerset/v1/leaderworkerset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ type LeaderWorkerSetStatus struct {
// Conditions track the condition of the leaderworkerset.
Conditions []metav1.Condition `json:"conditions,omitempty"`

// ReadyReplicas track the number of groups that are in ready state.
// ReadyReplicas track the number of groups that are in ready state (updated or not).
ReadyReplicas int32 `json:"readyReplicas,omitempty"`

// UpdatedReplicas track the number of groups that have been updated.
// UpdatedReplicas track the number of groups that have been updated (ready or not).
UpdatedReplicas int32 `json:"updatedReplicas,omitempty"`

// Replicas track the active total number of groups.
// Replicas track the total number of groups that have been created (updated or not, ready or not)
Replicas int32 `json:"replicas,omitempty"`

// HPAPodSelector for pods that belong to the LeaderWorkerSet object, this is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15392,16 +15392,17 @@ spec:
type: string
readyReplicas:
description: ReadyReplicas track the number of groups that are in
ready state.
ready state (updated or not).
format: int32
type: integer
replicas:
description: Replicas track the active total number of groups.
description: Replicas track the total number of groups that have been
created (updated or not, ready or not)
format: int32
type: integer
updatedReplicas:
description: UpdatedReplicas track the number of groups that have
been updated.
been updated (ready or not).
format: int32
type: integer
type: object
Expand Down
36 changes: 17 additions & 19 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
}

updateStatus := false
readyCount := 0
updatedCount := 0
readyCount, updatedCount, updatedAndReadyCount := 0, 0, 0
templateHash := utils.LeaderWorkerTemplateHash(lws)

// Iterate through all statefulsets.
Expand All @@ -364,22 +363,21 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
continue
}

// this is the worker statefulset.
if statefulsetutils.StatefulsetReady(sts) {

// the worker pods are OK.
// need to check leader pod for this group.
var leaderPod corev1.Pod
if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: sts.Name}, &leaderPod); err != nil {
log.Error(err, "Fetching leader pod")
return false, err
}
if podutils.PodRunningAndReady(leaderPod) {
readyCount++
var leaderPod corev1.Pod
if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: sts.Name}, &leaderPod); err != nil {
log.Error(err, "Fetching leader pod")
return false, err
}

if sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash && leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash {
updatedCount++
}
var ready bool
if statefulsetutils.StatefulsetReady(sts) && podutils.PodRunningAndReady(leaderPod) {
ready = true
readyCount++
}
if sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash && leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash {
updatedCount++
if ready {
updatedAndReadyCount++
}
}
}
Expand All @@ -394,7 +392,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
updateStatus = true
}

condition := makeCondition(updatedCount == int(*lws.Spec.Replicas))
condition := makeCondition(updatedAndReadyCount == int(*lws.Spec.Replicas))
updateCondition := setCondition(lws, condition)
// if condition changed, record events
if updateCondition {
Expand All @@ -416,7 +414,7 @@ func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leade
}

// retrieve the current number of replicas -- the number of leaders
replicas := int(*sts.Spec.Replicas)
replicas := int(sts.Status.Replicas)
if lws.Status.Replicas != int32(replicas) {
lws.Status.Replicas = int32(replicas)
updateStatus = true
Expand Down
64 changes: 22 additions & 42 deletions test/integration/controllers/leaderworkerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
v1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -91,12 +92,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderworkerset leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed())
testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3))
testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3))
var leaderSts appsv1.StatefulSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed())
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, &leaderworkerset, 2, 3)).To(gomega.Succeed())
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed())
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 2, 3)).To(gomega.Succeed())
},
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient)
Expand All @@ -113,12 +112,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderworkerset leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed())
testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3))
testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3))
var leaderSts appsv1.StatefulSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed())
testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset)
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed())
testing.DeleteLeaderPods(ctx, k8sClient, lws)
},
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient)
Expand All @@ -135,12 +132,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderworkerset leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed())
testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(0))
testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(0))
var leaderSts appsv1.StatefulSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed())
testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset)
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed())
testing.DeleteLeaderPods(ctx, k8sClient, lws)
},
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidReplicasCount(ctx, deployment, 0, k8sClient)
Expand All @@ -157,12 +152,10 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var leaderworkerset leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset)).To(gomega.Succeed())
testing.UpdateReplicaCount(ctx, k8sClient, &leaderworkerset, int32(3))
testing.UpdateReplicaCount(ctx, k8sClient, lws, int32(3))
var leaderSts appsv1.StatefulSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: leaderworkerset.Name, Namespace: leaderworkerset.Namespace}, &leaderSts)).To(gomega.Succeed())
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, &leaderworkerset, 0, 3)).To(gomega.Succeed())
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderSts)).To(gomega.Succeed())
gomega.Expect(testing.CreateLeaderPods(ctx, leaderSts, k8sClient, lws, 0, 3)).To(gomega.Succeed())
},
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidReplicasCount(ctx, deployment, 3, k8sClient)
Expand Down Expand Up @@ -298,7 +291,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
var scale v1.Scale
var scale autoscalingv1.Scale
gomega.Expect(k8sClient.SubResource("scale").Get(ctx, lws, &scale)).To(gomega.Succeed())
gomega.Expect(int32(scale.Spec.Replicas)).To(gomega.Equal(*lws.Spec.Replicas))
gomega.Expect(int32(scale.Status.Replicas)).To(gomega.Equal(lws.Status.Replicas))
Expand All @@ -307,24 +300,17 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
},
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
var scale v1.Scale
gomega.Expect(k8sClient.SubResource("scale").Get(ctx, lws, &scale)).To(gomega.Succeed())
scale.Spec.Replicas = 3
lwsUnstructed, _ := ToUnstructured(lws)
lwsUnstructed.SetAPIVersion("leaderworkerset.x-k8s.io/v1")
lwsUnstructed.SetKind("LeaderWorkerSet")
scaleUnstructed, _ := ToUnstructured(scale.DeepCopy())
scaleUnstructed.SetAPIVersion("autoscaling/v1")
scaleUnstructed.SetKind("Scale")
gomega.Expect(k8sClient.SubResource("scale").Update(ctx, lwsUnstructed, client.WithSubResourceBody(scaleUnstructed))).To(gomega.Succeed())
dep := &leaderworkerset.LeaderWorkerSet{ObjectMeta: metav1.ObjectMeta{Namespace: lws.Namespace, Name: lws.Name}}
scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: 3}}
gomega.Expect(k8sClient.SubResource("scale").Update(ctx, dep, client.WithSubResourceBody(scale))).To(gomega.Succeed())
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
gomega.Eventually(func() (int32, error) {
var leaderWorkerSet leaderworkerset.LeaderWorkerSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet); err != nil {
return -1, err
return 0, err
}
return leaderWorkerSet.Status.Replicas, nil
return *leaderWorkerSet.Spec.Replicas, nil
}, testing.Timeout, testing.Interval).Should(gomega.Equal(int32(3)))
},
},
Expand Down Expand Up @@ -451,13 +437,6 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
updates: []*update{
{
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
gomega.Eventually(func() (int32, error) {
var leaderWorkerSet leaderworkerset.LeaderWorkerSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet); err != nil {
return -1, err
}
return leaderWorkerSet.Status.Replicas, nil
}, testing.Timeout, testing.Interval).Should(gomega.Equal(int32(2)))
testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient)
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)
testing.ExpectLeaderWorkerSetProgressing(ctx, k8sClient, lws, "Replicas are progressing")
Expand Down Expand Up @@ -561,7 +540,8 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
testing.ExpectLeaderWorkerSetUnavailable(ctx, k8sClient, lws, "All replicas are ready")
testing.ExpectStatefulsetPartitionEqualTo(ctx, k8sClient, lws, 2)
testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient)
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 1)
// 3-index status is unready but template already updated.
testing.ExpectLeaderWorkerSetStatusReplicas(ctx, k8sClient, lws, 3, 2)
},
},
{
Expand Down Expand Up @@ -884,7 +864,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
return k8sClient.Update(ctx, &leaderworkerset)
}, testing.Timeout, testing.Interval).Should(gomega.Succeed())
// Manually delete leader pods here because we have no statefulset controller.
testing.DeleteLeaderPods(ctx, k8sClient, leaderworkerset)
testing.DeleteLeaderPods(ctx, k8sClient, &leaderworkerset)
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidLeaderStatefulSet(ctx, lws, k8sClient)
Expand Down
17 changes: 13 additions & 4 deletions test/testutils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,18 @@ func CreateWorkerPodsForLeaderPod(ctx context.Context, leaderPod corev1.Pod, k8s
}).Should(gomega.Succeed())
}

func DeleteLeaderPods(ctx context.Context, k8sClient client.Client, lws leaderworkerset.LeaderWorkerSet) {
func DeleteLeaderPods(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet) {
// delete pods with the highest indexes
var leaders corev1.PodList
gomega.Expect(k8sClient.List(ctx, &leaders, client.InNamespace(lws.Namespace), &client.MatchingLabels{leaderworkerset.WorkerIndexLabelKey: "0"})).To(gomega.Succeed())

var leaderWorkerSet leaderworkerset.LeaderWorkerSet
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderWorkerSet)).To(gomega.Succeed())

// we don't have "slice" package before go1.21, could only manually delete pods with largest index
for i := range leaders.Items {
index, _ := strconv.Atoi(leaders.Items[i].Name[len(leaders.Items[i].Name)-1:])
if index >= int(*lws.Spec.Replicas) {
if index >= int(*leaderWorkerSet.Spec.Replicas) {
gomega.Expect(k8sClient.Delete(ctx, &leaders.Items[i])).To(gomega.Succeed())
// delete worker statefulset on behalf of kube-controller-manager
var sts appsv1.StatefulSet
Expand Down Expand Up @@ -360,8 +364,13 @@ func ValidatePodExclusivePlacementTerms(pod corev1.Pod) bool {

func UpdateReplicaCount(ctx context.Context, k8sClient client.Client, lws *leaderworkerset.LeaderWorkerSet, count int32) {
gomega.Eventually(func() error {
lws.Spec.Replicas = ptr.To[int32](count)
return k8sClient.Update(ctx, lws)
var leaderworkerset leaderworkerset.LeaderWorkerSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &leaderworkerset); err != nil {
return err
}

leaderworkerset.Spec.Replicas = ptr.To[int32](count)
return k8sClient.Update(ctx, &leaderworkerset)
}, Timeout, Interval).Should(gomega.Succeed())
}

Expand Down