Skip to content

Commit

Permalink
fix volcano podgroup update issue (#2079)
Browse files Browse the repository at this point in the history
* fix volcano podgroup update issue

Signed-off-by: Weiyu Yen <ckyuto@gmail.com>

* queue value shouldn't be reset once it has been set

Signed-off-by: Weiyu Yen <ckyuto@gmail.com>

* make queue immutable

Signed-off-by: Weiyu Yen <ckyuto@gmail.com>

* add unit test

Signed-off-by: Weiyu Yen <ckyuto@gmail.com>

* add retry for update operation

Signed-off-by: Weiyu Yen <ckyuto@gmail.com>

---------

Signed-off-by: Weiyu Yen <ckyuto@gmail.com>
  • Loading branch information
ckyuto authored May 30, 2024
1 parent 7b9c73e commit 00f4d52
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 1 deletion.
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7330,6 +7330,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_mxjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7333,6 +7333,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_paddlejobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7812,6 +7812,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7849,6 +7849,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_tfjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 3 additions & 0 deletions manifests/base/crds/kubeflow.org_xgboostjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ spec:
type: string
queue:
type: string
x-kubernetes-validations:
- message: spec.runPolicy.schedulingPolicy.queue is immutable
rule: self == oldSelf
scheduleTimeoutSeconds:
format: int32
type: integer
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/kubeflow.org/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ type RunPolicy struct {
// SchedulingPolicy encapsulates various scheduling policies of the distributed training
// job, for example `minAvailable` for gang-scheduling.
type SchedulingPolicy struct {
MinAvailable *int32 `json:"minAvailable,omitempty"`
MinAvailable *int32 `json:"minAvailable,omitempty"`
// +kubebuilder:validation:XValidation:rule="self == oldSelf", message="spec.runPolicy.schedulingPolicy.queue is immutable"
Queue string `json:"queue,omitempty"`
MinResources *map[v1.ResourceName]resource.Quantity `json:"minResources,omitempty"`
PriorityClass string `json:"priorityClass,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ func (jc *JobController) ReconcileJobs(
if !match {
return fmt.Errorf("unable to recognize PodGroup: %v", klog.KObj(pg))
}

if q := volcanoPodGroup.Spec.Queue; len(q) > 0 {
queue = q
}

volcanoPodGroup.Spec = volcanov1beta1.PodGroupSpec{
MinMember: minMember,
Queue: queue,
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller.v1/pytorch/pytorchjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,34 @@ var _ = Describe("PyTorchJob controller", func() {
cond := getCondition(created.Status, kubeflowv1.JobSucceeded)
Expect(cond.Status).To(Equal(corev1.ConditionTrue))
})
It("Shouldn't be updated resources if spec.runPolicy.schedulingPolicy.queue is changed after the job is created", func() {
By("Creating a PyTorchJob with a specific queue")
job.Spec.RunPolicy.SchedulingPolicy = &kubeflowv1.SchedulingPolicy{}
job.Spec.RunPolicy.SchedulingPolicy.Queue = "initial-queue"
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

By("Attempting to update the PyTorchJob with a different queue value")
updatedJob := &kubeflowv1.PyTorchJob{}
Eventually(func() bool {
err := testK8sClient.Get(ctx, jobKey, updatedJob)
return err == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue(), "Failed to get PyTorchJob")

Eventually(func() bool {
updatedJob.Spec.RunPolicy.SchedulingPolicy.Queue = "test"
err := testK8sClient.Update(ctx, updatedJob)
By("Checking that the queue update fails")
Expect(err).To(HaveOccurred(), "Expected an error when updating the queue, but update succeeded")
Expect(err).To(MatchError(ContainSubstring("spec.runPolicy.schedulingPolicy.queue is immutable"), "The error message did not contain the expected message"))
return err != nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())

By("Validating the queue was not updated")
freshJob := &kubeflowv1.PyTorchJob{}
Expect(testK8sClient.Get(ctx, client.ObjectKeyFromObject(job), freshJob)).Should(Succeed(), "Failed to get PyTorchJob after update attempt")
Expect(freshJob.Spec.RunPolicy.SchedulingPolicy.Queue).To(Equal("initial-queue"), "The queue should remain as the initial value since it should be immutable")

})

It("Shouldn't create resources if PyTorchJob is suspended", func() {
By("By creating a new PyTorchJob with suspend=true")
Expand Down

0 comments on commit 00f4d52

Please sign in to comment.