From e98217bf21ee4741eab3ff8ff81107f75e97eeb3 Mon Sep 17 00:00:00 2001 From: Mykhailo Bobrovskyi Date: Thu, 31 Oct 2024 18:17:26 +0200 Subject: [PATCH] TAS: Support RayCluster. (#3386) --- .../jobs/raycluster/raycluster_controller.go | 9 +- .../raycluster/raycluster_controller_test.go | 136 ++++++++++++++++++ .../jobs/rayjob/rayjob_controller.go | 9 +- .../jobs/rayjob/rayjob_controller_test.go | 47 ++++++ pkg/util/testingjobs/raycluster/wrappers.go | 9 ++ 5 files changed, 204 insertions(+), 6 deletions(-) diff --git a/pkg/controller/jobs/raycluster/raycluster_controller.go b/pkg/controller/jobs/raycluster/raycluster_controller.go index e2cccba899..0cba18baf0 100644 --- a/pkg/controller/jobs/raycluster/raycluster_controller.go +++ b/pkg/controller/jobs/raycluster/raycluster_controller.go @@ -113,14 +113,17 @@ func (j *RayCluster) PodSets() []kueue.PodSet { // workers for index := range j.Spec.WorkerGroupSpecs { wgs := &j.Spec.WorkerGroupSpecs[index] - replicas := int32(1) + count := int32(1) if wgs.Replicas != nil { - replicas = *wgs.Replicas + count = *wgs.Replicas + } + if wgs.NumOfHosts > 1 { + count *= wgs.NumOfHosts } podSets[index+1] = kueue.PodSet{ Name: strings.ToLower(wgs.GroupName), Template: *wgs.Template.DeepCopy(), - Count: replicas, + Count: count, TopologyRequest: jobframework.PodSetTopologyRequest(&wgs.Template.ObjectMeta), } } diff --git a/pkg/controller/jobs/raycluster/raycluster_controller_test.go b/pkg/controller/jobs/raycluster/raycluster_controller_test.go index f22eca857b..91832f8baa 100644 --- a/pkg/controller/jobs/raycluster/raycluster_controller_test.go +++ b/pkg/controller/jobs/raycluster/raycluster_controller_test.go @@ -524,6 +524,142 @@ func TestReconciler(t *testing.T) { Obj(), }, }, + "RayCluster with NumOfHosts > 1": { + initObjects: []client.Object{ + utiltesting.MakeResourceFlavor("unit-test-flavor").NodeLabel("kubernetes.io/arch", "arm64").Obj(), + }, + job: *baseJobWrapper.Clone(). + WithNumOfHosts("workers-group-0", 2). + Obj(), + wantJob: *baseJobWrapper.Clone(). + Suspend(false). + NodeSelectorHeadGroup("kubernetes.io/arch", "arm64"). + WithNumOfHosts("workers-group-0", 2). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("test", "ns"). + Finalizers(kueue.ResourceInUseFinalizerName). + PodSets( + kueue.PodSet{ + Name: "head", + Count: int32(1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "head-container", + Resources: corev1.ResourceRequirements{ + Requests: make(corev1.ResourceList), + }, + }, + }, + }, + }, + }, + kueue.PodSet{ + Name: "workers-group-0", + Count: int32(2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + + Containers: []corev1.Container{ + { + Name: "worker-container", + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + }, + }, + }, + }, + }, + }). + Request(corev1.ResourceCPU, "10"). + ReserveQuota( + utiltesting.MakeAdmission("cq", "head", "workers-group-0"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(2). + Obj(), + ). + Admitted(true). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "head", + }, + { + Name: "workers-group-0", + }, + }, + }). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns"). + Finalizers(kueue.ResourceInUseFinalizerName). + PodSets( + kueue.PodSet{ + Name: "head", + Count: int32(1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "head-container", + Resources: corev1.ResourceRequirements{ + Requests: make(corev1.ResourceList), + }, + }, + }, + }, + }, + }, + kueue.PodSet{ + Name: "workers-group-0", + Count: int32(2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Containers: []corev1.Container{ + { + Name: "worker-container", + Resources: corev1.ResourceRequirements{ + Requests: make(corev1.ResourceList), + }, + }, + }, + }, + }, + }). + ReserveQuota( + utiltesting.MakeAdmission("cq", "head", "workers-group-0"). + Assignment(corev1.ResourceCPU, "unit-test-flavor", "1"). + AssignmentPodCount(2). + Obj(), + ). + Admitted(true). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "head", + }, + { + Name: "workers-group-0", + }, + }, + }). + Obj(), + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { diff --git a/pkg/controller/jobs/rayjob/rayjob_controller.go b/pkg/controller/jobs/rayjob/rayjob_controller.go index 2210253711..dd1cdbdc65 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller.go @@ -117,14 +117,17 @@ func (j *RayJob) PodSets() []kueue.PodSet { // workers for index := range j.Spec.RayClusterSpec.WorkerGroupSpecs { wgs := &j.Spec.RayClusterSpec.WorkerGroupSpecs[index] - replicas := int32(1) + count := int32(1) if wgs.Replicas != nil { - replicas = *wgs.Replicas + count = *wgs.Replicas + } + if wgs.NumOfHosts > 1 { + count *= wgs.NumOfHosts } podSets[index+1] = kueue.PodSet{ Name: strings.ToLower(wgs.GroupName), Template: *wgs.Template.DeepCopy(), - Count: replicas, + Count: count, TopologyRequest: jobframework.PodSetTopologyRequest(&wgs.Template.ObjectMeta), } } diff --git a/pkg/controller/jobs/rayjob/rayjob_controller_test.go b/pkg/controller/jobs/rayjob/rayjob_controller_test.go index 15988d4341..7622e7792b 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller_test.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller_test.go @@ -196,6 +196,53 @@ func TestPodSets(t *testing.T) { } }, }, + "with NumOfHosts > 1": { + rayJob: (*RayJob)(testingrayutil.MakeJob("rayjob", "ns"). + WithHeadGroupSpec( + rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "head_c"}}}, + }, + }, + ). + WithWorkerGroups( + rayv1.WorkerGroupSpec{ + GroupName: "group1", + NumOfHosts: 4, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group1_c"}}}, + }, + }, + rayv1.WorkerGroupSpec{ + GroupName: "group2", + Replicas: ptr.To[int32](3), + NumOfHosts: 4, + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "group2_c"}}}, + }, + }, + ). + Obj()), + wantPodSets: func(rayJob *RayJob) []kueue.PodSet { + return []kueue.PodSet{ + { + Name: headGroupPodSetName, + Count: 1, + Template: *rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.DeepCopy(), + }, + { + Name: "group1", + Count: 4, + Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.DeepCopy(), + }, + { + Name: "group2", + Count: 12, + Template: *rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[1].Template.DeepCopy(), + }, + } + }, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { diff --git a/pkg/util/testingjobs/raycluster/wrappers.go b/pkg/util/testingjobs/raycluster/wrappers.go index 08aa3820d7..db05f1e1b1 100644 --- a/pkg/util/testingjobs/raycluster/wrappers.go +++ b/pkg/util/testingjobs/raycluster/wrappers.go @@ -150,6 +150,15 @@ func (j *ClusterWrapper) WithWorkerPriorityClassName(value string) *ClusterWrapp return j } +func (j *ClusterWrapper) WithNumOfHosts(groupName string, value int32) *ClusterWrapper { + for index, group := range j.Spec.WorkerGroupSpecs { + if group.GroupName == groupName { + j.Spec.WorkerGroupSpecs[index].NumOfHosts = value + } + } + return j +} + // WorkloadPriorityClass updates job workloadpriorityclass. func (j *ClusterWrapper) WorkloadPriorityClass(wpc string) *ClusterWrapper { if j.Labels == nil {