Skip to content

Commit

Permalink
No worker sts when size=1
Browse files Browse the repository at this point in the history
Signed-off-by: kerthcet <kerthcet@gmail.com>
  • Loading branch information
kerthcet committed Aug 8, 2024
1 parent e87e62a commit d07e6f3
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 32 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -295,4 +295,3 @@ toc-verify:
.PHONY: generate-apiref
generate-apiref: genref
cd $(PROJECT_DIR)/hack/genref/ && $(GENREF) -o $(PROJECT_DIR)/docs/reference

49 changes: 26 additions & 23 deletions pkg/controllers/leaderworkerset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,47 +337,45 @@ func (r *LeaderWorkerSetReconciler) SSAWithStatefulset(ctx context.Context, lws
// updates the condition of the leaderworkerset to either Progressing or Available.
func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) (bool, error) {
log := ctrl.LoggerFrom(ctx)
stsSelector := client.MatchingLabels(map[string]string{
leaderworkerset.SetNameLabelKey: lws.Name,
podSelector := client.MatchingLabels(map[string]string{
leaderworkerset.SetNameLabelKey: lws.Name,
leaderworkerset.WorkerIndexLabelKey: "0",
})

// update the condition based on the status of all statefulsets owned by the lws.
var lwssts appsv1.StatefulSetList
if err := r.List(ctx, &lwssts, stsSelector, client.InNamespace(lws.Namespace)); err != nil {
log.Error(err, "Fetching statefulsets managed by leaderworkerset instance")
leaderPodList := &corev1.PodList{}
if err := r.List(ctx, leaderPodList, podSelector, client.InNamespace(lws.Namespace)); err != nil {
log.Error(err, "Fetching leaderPods")
return false, err
}

updateStatus := false
readyCount, updatedCount, updatedNonBurstWorkerCount, currentNonBurstWorkerCount, updatedAndReadyCount := 0, 0, 0, 0, 0
templateHash := utils.LeaderWorkerTemplateHash(lws)
noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1

// Iterate through all statefulsets.
for _, sts := range lwssts.Items {
if sts.Name == lws.Name {
continue
}

index, err := strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
// Iterate through all leaderPods.
for _, pod := range leaderPodList.Items {
index, err := strconv.Atoi(pod.Labels[leaderworkerset.GroupIndexLabelKey])
if err != nil {
return false, err
}
if index < int(*lws.Spec.Replicas) {
currentNonBurstWorkerCount++
}

var leaderPod corev1.Pod
if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: sts.Name}, &leaderPod); err != nil {
log.Error(err, "Fetching leader pod")
return false, err
var sts appsv1.StatefulSet
if !noWorkerSts {
if err := r.Get(ctx, client.ObjectKey{Namespace: lws.Namespace, Name: pod.Name}, &sts); err != nil {
log.Error(err, "Fetching worker statefulSet")
return false, err
}
}

var ready, updated bool
if statefulsetutils.StatefulsetReady(sts) && podutils.PodRunningAndReady(leaderPod) {
if (noWorkerSts || statefulsetutils.StatefulsetReady(sts)) && podutils.PodRunningAndReady(pod) {
ready = true
readyCount++
}
if sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash && leaderPod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash {
if (noWorkerSts || sts.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash) && pod.Labels[leaderworkerset.TemplateRevisionHashKey] == templateHash {
updated = true
updatedCount++
if index < int(*lws.Spec.Replicas) {
Expand Down Expand Up @@ -424,7 +422,7 @@ func (r *LeaderWorkerSetReconciler) updateConditions(ctx context.Context, lws *l
return updateStatus || updateCondition, nil
}

// Updates status and condition of LeaderWorkerSet and returns whether or not an upate actually occurred.
// Updates status and condition of LeaderWorkerSet and returns whether or not an update actually occurred.
func (r *LeaderWorkerSetReconciler) updateStatus(ctx context.Context, lws *leaderworkerset.LeaderWorkerSet) error {
updateStatus := false
log := ctrl.LoggerFrom(ctx)
Expand Down Expand Up @@ -502,17 +500,18 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le
if err := r.List(ctx, &stsList, stsSelector, client.InNamespace(lws.Namespace)); err != nil {
return 0, 0, err
}

sortedSts := utils.SortByIndex(func(sts appsv1.StatefulSet) (int, error) {
return strconv.Atoi(sts.Labels[leaderworkerset.GroupIndexLabelKey])
}, stsList.Items, int(stsReplicas))

templateHash := utils.LeaderWorkerTemplateHash(lws)
// Once size==1, no worker statefulSets will be created.
noWorkerSts := *lws.Spec.LeaderWorkerTemplate.Size == 1
processReplica := func(index int32) (ready bool) {
nominatedName := fmt.Sprintf("%s-%d", lws.Name, index)
// It can happen that the leader pod or the worker statefulset hasn't created yet
// or under rebuilding, which also indicates not ready.
if nominatedName != sortedPods[index].Name || nominatedName != sortedSts[index].Name {
if nominatedName != sortedPods[index].Name || (!noWorkerSts && nominatedName != sortedSts[index].Name) {
return false
}

Expand All @@ -521,6 +520,10 @@ func (r *LeaderWorkerSetReconciler) iterateReplicas(ctx context.Context, lws *le
return false
}

if noWorkerSts {
return true
}

stsTemplateHash := sortedSts[index].Labels[leaderworkerset.TemplateRevisionHashKey]
return stsTemplateHash == templateHash && statefulsetutils.StatefulsetReady(sortedSts[index])
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, nil
}

// Once size = 1, no need to create worker statefulSets.
if *leaderWorkerSet.Spec.LeaderWorkerTemplate.Size == 1 {
return ctrl.Result{}, nil
}

// logic for handling leader pod
if leaderWorkerSet.Spec.StartupPolicy == leaderworkerset.LeaderReadyStartupPolicy && !k8spodutils.IsPodReady(&pod) {
log.V(2).Info("defer the creation of the worker statefulset because leader pod is not ready.")
Expand Down
17 changes: 17 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,23 @@ var _ = ginkgo.Describe("leaderWorkerSet e2e tests", func() {
}
})

ginkgo.It("Can create/update a lws with size=1", func() {
lws = testing.BuildLeaderWorkerSet(ns.Name).Replica(4).MaxSurge(1).Size(1).RestartPolicy(v1.RecreateGroupOnPodRestart).Obj()
testing.MustCreateLws(ctx, k8sClient, lws)

testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")

testing.UpdateWorkerTemplate(ctx, k8sClient, lws)
// Happen during rolling update.
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 5)

testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 4)
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
})

ginkgo.It("Can perform a rolling update", func() {
lws := testing.BuildLeaderWorkerSet(ns.Name).Obj()
testing.MustCreateLws(ctx, k8sClient, lws)
Expand Down
20 changes: 14 additions & 6 deletions test/integration/controllers/leaderworkerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,21 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
}),
ginkgo.Entry("group size is 1", &testCase{
makeLeaderWorkerSet: func(nsName string) *testing.LeaderWorkerSetWrapper {
return testing.BuildLeaderWorkerSet(nsName).Size(1)
return testing.BuildLeaderWorkerSet(nsName).Name("iiiiiiiiiiiiiiiii").Size(1)
},
updates: []*update{
{
checkLWSState: func(deployment *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, deployment, 2)
testing.ExpectValidWorkerStatefulSets(ctx, deployment, k8sClient, true)
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectValidLeaderStatefulSet(ctx, k8sClient, lws, 2)
testing.ExpectValidWorkerStatefulSets(ctx, lws, k8sClient, true)
},
},
{
lwsUpdateFn: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.SetLeaderPodsToReady(ctx, k8sClient, lws, 0, int(*lws.Spec.Replicas))
},
checkLWSState: func(lws *leaderworkerset.LeaderWorkerSet) {
testing.ExpectLeaderWorkerSetAvailable(ctx, k8sClient, lws, "All replicas are ready")
},
},
},
Expand Down Expand Up @@ -1511,7 +1519,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
},
},
}),
ginkgo.Entry("create a leaderworkerset with spec.startupPolicyy=LeaderReady", &testCase{
ginkgo.Entry("create a leaderworkerset with spec.startupPolicy=LeaderReady", &testCase{
makeLeaderWorkerSet: func(nsName string) *testing.LeaderWorkerSetWrapper {
return testing.BuildLeaderWorkerSet(nsName).Replica(4).StartupPolicy(leaderworkerset.LeaderReadyStartupPolicy)
},
Expand Down Expand Up @@ -1540,7 +1548,7 @@ var _ = ginkgo.Describe("LeaderWorkerSet controller", func() {
},
},
}),
ginkgo.Entry("create a leaderworkerset with spec.startupPolicyy=LeaderCreated", &testCase{
ginkgo.Entry("create a leaderworkerset with spec.startupPolicy=LeaderCreated", &testCase{
makeLeaderWorkerSet: func(nsName string) *testing.LeaderWorkerSetWrapper {
return testing.BuildLeaderWorkerSet(nsName).Replica(4).StartupPolicy(leaderworkerset.LeaderCreatedStartupPolicy)
},
Expand Down
13 changes: 13 additions & 0 deletions test/testutils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,4 +510,17 @@ func SetLeaderPodsToReady(ctx context.Context, k8sClient client.Client, lws *lea
for i := start; i < end; i++ {
SetLeaderPodToReady(ctx, k8sClient, fmt.Sprintf("%s-%d", leaderSts.Name, i), lws)
}

// If size=1, we should trigger the leader sts update or the controller will not run reconciliation.
gomega.Eventually(func() error {
var sts appsv1.StatefulSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: lws.Name, Namespace: lws.Namespace}, &sts); err != nil {
return err
}
sts.Status.ReadyReplicas = *sts.Spec.Replicas
sts.Status.Replicas = *sts.Spec.Replicas
sts.Status.CurrentRevision = ""
sts.Status.UpdateRevision = ""
return k8sClient.Status().Update(ctx, &sts)
}, Timeout, Interval).Should(gomega.Succeed())
}
8 changes: 6 additions & 2 deletions test/testutils/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,12 @@ func ExpectValidWorkerStatefulSets(ctx context.Context, leaderWorkerSet *leaderw
if err := k8sClient.List(ctx, &statefulSetList, client.InNamespace(lws.Namespace), &client.MatchingLabels{leaderworkerset.SetNameLabelKey: lws.Name}); err != nil {
return err
}
if leaderPodScheduled && int(*leaderSts.Spec.Replicas) != len(statefulSetList.Items)-1 {
return fmt.Errorf("running worker statefulsets replicas not right, want %d, got %d", *leaderSts.Spec.Replicas, len(statefulSetList.Items)-1)
stsNumber := *leaderSts.Spec.Replicas
if *lws.Spec.LeaderWorkerTemplate.Size == 1 {
stsNumber = 0
}
if leaderPodScheduled && len(statefulSetList.Items)-1 != int(stsNumber) {
return fmt.Errorf("running worker statefulsets replicas not right, want %d, got %d", len(statefulSetList.Items)-1, stsNumber)
}
if lws.Annotations[leaderworkerset.ExclusiveKeyAnnotationKey] != "" && !leaderPodScheduled && len(statefulSetList.Items) != 1 {
return fmt.Errorf("when exclusive placement is enabled, only expect sts count to be 1")
Expand Down
4 changes: 4 additions & 0 deletions test/testutils/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (lwsWrapper *LeaderWorkerSetWrapper) Obj() *leaderworkerset.LeaderWorkerSet
return &lwsWrapper.LeaderWorkerSet
}

func (lwsWrapper *LeaderWorkerSetWrapper) Name(name string) *LeaderWorkerSetWrapper {
lwsWrapper.ObjectMeta.Name = name
return lwsWrapper
}
func (lwsWrapper *LeaderWorkerSetWrapper) Replica(count int) *LeaderWorkerSetWrapper {
lwsWrapper.Spec.Replicas = ptr.To[int32](int32(count))
return lwsWrapper
Expand Down

0 comments on commit d07e6f3

Please sign in to comment.