Skip to content

Commit

Permalink
Stateful set integration
Browse files Browse the repository at this point in the history
* kueue.x-k8s.io/pod-group-fast-admission annotation
  • Loading branch information
vladikkuzn committed Sep 26, 2024
1 parent 4a4dece commit f5601e3
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 11 deletions.
32 changes: 31 additions & 1 deletion pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,9 +666,38 @@ func (p *Pod) Load(ctx context.Context, c client.Client, key *types.NamespacedNa
}

func (p *Pod) constructGroupPodSets() ([]kueue.PodSet, error) {
if _, useFastAdmission := p.pod.GetAnnotations()[GroupFastAdmissionAnnotation]; useFastAdmission {
tc, err := p.groupTotalCount()
if err != nil {
return nil, err
}
return constructGroupPodSetsFast(p, tc)
}
return constructGroupPodSets(p.list.Items)
}

func constructGroupPodSetsFast(p *Pod, groupTotalCount int) ([]kueue.PodSet, error) {
podSets := make([]kueue.PodSet, 1)
for i, podInGroup := range p.list.Items {
if !isPodRunnableOrSucceeded(&podInGroup) {
continue
}

roleHash, err := getRoleHash(podInGroup)
if err != nil {
return nil, fmt.Errorf("failed to calculate pod role hash: %w", err)
}

podSet := FromObject(&podInGroup).PodSets()
podSet[0].Name = roleHash
podSet[0].Count = int32(groupTotalCount)
podSets[i] = podSet[0]
return podSets, nil
}

return nil, fmt.Errorf("failed to find a runnable pod in the group")
}

func constructGroupPodSets(pods []corev1.Pod) ([]kueue.PodSet, error) {
var resultPodSets []kueue.PodSet

Expand Down Expand Up @@ -712,8 +741,9 @@ func (p *Pod) validatePodGroupMetadata(r record.EventRecorder, activePods []core
return err
}
originalQueue := jobframework.QueueName(p)
_, useFastAdmission := p.pod.GetAnnotations()[GroupFastAdmissionAnnotation]

if len(activePods) < groupTotalCount {
if !useFastAdmission && len(activePods) < groupTotalCount {
errMsg := fmt.Sprintf("'%s' group has fewer runnable pods than expected", podGroupName(p.pod))
r.Eventf(p.Object(), corev1.EventTypeWarning, jobframework.ReasonErrWorkloadCompose, errMsg)
return jobframework.UnretryableError(errMsg)
Expand Down
15 changes: 8 additions & 7 deletions pkg/controller/jobs/pod/pod_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ import (
)

const (
ManagedLabelKey = constants.ManagedByKueueLabel
ManagedLabelValue = "true"
PodFinalizer = ManagedLabelKey
GroupNameLabel = "kueue.x-k8s.io/pod-group-name"
GroupTotalCountAnnotation = "kueue.x-k8s.io/pod-group-total-count"
RoleHashAnnotation = "kueue.x-k8s.io/role-hash"
RetriableInGroupAnnotation = "kueue.x-k8s.io/retriable-in-group"
ManagedLabelKey = constants.ManagedByKueueLabel
ManagedLabelValue = "true"
PodFinalizer = ManagedLabelKey
GroupNameLabel = "kueue.x-k8s.io/pod-group-name"
GroupTotalCountAnnotation = "kueue.x-k8s.io/pod-group-total-count"
GroupFastAdmissionAnnotation = "kueue.x-k8s.io/pod-group-fast-admission"
RoleHashAnnotation = "kueue.x-k8s.io/role-hash"
RetriableInGroupAnnotation = "kueue.x-k8s.io/retriable-in-group"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/jobs/statefulset/statefulset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ func (wh *Webhook) Default(ctx context.Context, obj runtime.Object) error {
}
ss.Spec.Template.Labels[pod.GroupNameLabel] = podGroupName
if ss.Spec.Template.Annotations == nil {
ss.Spec.Template.Annotations = make(map[string]string, 1)
ss.Spec.Template.Annotations = make(map[string]string, 2)
}
ss.Spec.Template.Annotations[pod.GroupTotalCountAnnotation] = fmt.Sprint(podGroupReplicas)
ss.Spec.Template.Annotations[pod.GroupFastAdmissionAnnotation] = "true"

return nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/statefulset/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,9 @@ func (ss *StatefulSetWrapper) PodTemplateSpecPodGroupNameLabel(
func (ss *StatefulSetWrapper) PodTemplateSpecPodGroupTotalCountAnnotation(replicas int32) *StatefulSetWrapper {
return ss.PodTemplateSpecAnnotation(pod.GroupTotalCountAnnotation, fmt.Sprint(replicas))
}

func (ss *StatefulSetWrapper) Image(image string, args []string) *StatefulSetWrapper {
ss.Spec.Template.Spec.Containers[0].Image = image
ss.Spec.Template.Spec.Containers[0].Args = args
return ss
}
1 change: 1 addition & 0 deletions test/e2e/config/controller_manager_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ integrations:
- "kubeflow.org/tfjob"
- "kubeflow.org/xgboostjob"
- "pod"
- "statefulset"
85 changes: 85 additions & 0 deletions test/e2e/singlecluster/statefulset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package e2e

import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/testing"
statefulsettesting "sigs.k8s.io/kueue/pkg/util/testingjobs/statefulset"
"sigs.k8s.io/kueue/test/util"
)

var _ = ginkgo.Describe("Stateful set integration", func() {
var (
ns *corev1.Namespace
onDemandRF *kueue.ResourceFlavor
)

ginkgo.BeforeEach(func() {
if kubeVersion().LessThan(kubeversion.KubeVersion1_27) {
ginkgo.Skip("Unsupported in versions older than 1.27")
}
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "pod-e2e-",
},
}
gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed())
onDemandRF = testing.MakeResourceFlavor("statefulset-resource-flavour").
NodeLabel("instance-type", "on-demand").
Obj()
gomega.Expect(k8sClient.Create(ctx, onDemandRF)).To(gomega.Succeed())
})
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectObjectToBeDeleted(ctx, k8sClient, onDemandRF, true)
})

ginkgo.When("Single CQ", func() {
var (
cq *kueue.ClusterQueue
lq *kueue.LocalQueue
)

ginkgo.BeforeEach(func() {
cq = testing.MakeClusterQueue("statefulset-cluster-queue").
ResourceGroup(
*testing.MakeFlavorQuotas("statefulset-resource-flavour").
Resource(corev1.ResourceCPU, "5").
Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed())
lq = testing.MakeLocalQueue("statefulset-local-queue", ns.Name).ClusterQueue(cq.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, lq)).To(gomega.Succeed())
})
ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteAllPodsInNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectObjectToBeDeleted(ctx, k8sClient, cq, true)
})

ginkgo.It("should admit group that fits", func() {
statefulSet := statefulsettesting.MakeStatefulSet("sf", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1m"}).
Replicas(3).
Queue(lq.Name).
Obj()
gomega.Expect(k8sClient.Create(ctx, statefulSet)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
createdStatefulSet := &appsv1.StatefulSet{}
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(statefulSet), createdStatefulSet)).
To(gomega.Succeed())
g.Expect(createdStatefulSet.Status.ReadyReplicas).To(gomega.Equal(int32(3)))
}).Should(gomega.Succeed())
})
})
})
2 changes: 1 addition & 1 deletion test/e2e/singlecluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,5 @@ var _ = ginkgo.BeforeSuite(func() {
waitForAvailableStart := time.Now()
util.WaitForKueueAvailability(ctx, k8sClient)
util.WaitForJobSetAvailability(ctx, k8sClient)
ginkgo.GinkgoLogr.Info("Kueue and JobSet oprators are available in the cluster", "waitingTime", time.Since(waitForAvailableStart))
ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in the cluster", "waitingTime", time.Since(waitForAvailableStart))
})
1 change: 0 additions & 1 deletion test/integration/controller/jobs/statefulset/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/webhooks"
"sigs.k8s.io/kueue/test/integration/framework"
// +kubebuilder:scaffold:imports
)

var (
Expand Down

0 comments on commit f5601e3

Please sign in to comment.