Skip to content

Commit

Permalink
[test/integration/jobs] Check selectors restoration on workload deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Mar 28, 2023
1 parent 3fc8fbc commit 56be3aa
Show file tree
Hide file tree
Showing 2 changed files with 277 additions and 0 deletions.
127 changes: 127 additions & 0 deletions test/integration/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,4 +831,131 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() {
return createdProdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
})

ginkgo.When("The workload is deleted while it's admitted", func() {
ginkgo.It("Should restore the original node selectors", func() {
localQueue := testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()
job := testingjob.MakeJob(jobName, ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "2").Obj()
lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
createdJob := &batchv1.Job{}

ginkgo.By("create a job", func() {
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
})

ginkgo.By("job should be suspend", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(true)))
})

// backup the the podSet's node selector
originalNodeSelector := createdJob.Spec.Template.Spec.NodeSelector

ginkgo.By("create a localQueue", func() {
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})

ginkgo.By("job should be unsuspended", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
})

ginkgo.By("the node selector should be updated", func() {
gomega.Eventually(func() map[string]string {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Template.Spec.NodeSelector
}, util.Timeout, util.Interval).ShouldNot(gomega.Equal(originalNodeSelector))
})

ginkgo.By("delete the workload", func() {
wl := &kueue.Workload{}
wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name), Namespace: job.Namespace}
gomega.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
gomega.Expect(util.DeleteWorkload(ctx, k8sClient, wl)).Should(gomega.Succeed())
})

ginkgo.By("delete the localQueue, prevent readmission", func() {
gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).Should(gomega.Succeed())
})

ginkgo.By("the node selector should be restored", func() {
gomega.Eventually(func() map[string]string {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Template.Spec.NodeSelector
}, util.Timeout, util.Interval).Should(gomega.Equal(originalNodeSelector))
})
})

ginkgo.It("Should continue to use the admission node selectors if the annotation is corrupted", func() {
localQueue := testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()
job := testingjob.MakeJob(jobName, ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "2").Obj()
lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
createdJob := &batchv1.Job{}

ginkgo.By("create a job", func() {
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
})

ginkgo.By("job should be suspend", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(true)))
})

// backup the the podSet's node selector
originalNodeSelector := createdJob.Spec.Template.Spec.NodeSelector

ginkgo.By("create a localQueue", func() {
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})

ginkgo.By("job should be unsuspended", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
})

ginkgo.By("the node selector should be updated", func() {
gomega.Eventually(func() map[string]string {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Template.Spec.NodeSelector
}, util.Timeout, util.Interval).ShouldNot(gomega.Equal(originalNodeSelector))
})

runningNodeSelector := createdJob.Spec.Template.Spec.NodeSelector

ginkgo.By("change the annotation", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
createdJob.Annotations[jobframework.OriginalNodeSelectorsAnnotation] = "invalid json string }"
return k8sClient.Update(ctx, createdJob)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("delete the workload", func() {
wl := &kueue.Workload{}
wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name), Namespace: job.Namespace}
gomega.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
gomega.Expect(util.DeleteWorkload(ctx, k8sClient, wl)).Should(gomega.Succeed())
})

ginkgo.By("delete the localQueue, prevent readmission", func() {
gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).Should(gomega.Succeed())
})

ginkgo.By("the node selector should not be restored", func() {
gomega.Consistently(func() map[string]string {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Template.Spec.NodeSelector
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(runningNodeSelector))
})

})
})
})
150 changes: 150 additions & 0 deletions test/integration/controller/mpijob/mpijob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,4 +561,154 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() {

})

ginkgo.When("The workload is deleted while it's admitted", func() {
ginkgo.It("Should restore the original node selectors", func() {

localQueue := testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
job := testingmpijob.MakeMPIJob(jobName, ns.Name).Queue(localQueue.Name).
Request(kubeflow.MPIReplicaTypeLauncher, corev1.ResourceCPU, "3").
Request(kubeflow.MPIReplicaTypeWorker, corev1.ResourceCPU, "4").
Obj()
lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
createdJob := &kubeflow.MPIJob{}

nodeSelectors := func(j *kubeflow.MPIJob) map[kubeflow.MPIReplicaType]map[string]string {
ret := map[kubeflow.MPIReplicaType]map[string]string{}
for k := range j.Spec.MPIReplicaSpecs {
ret[k] = j.Spec.MPIReplicaSpecs[k].Template.Spec.NodeSelector
}
return ret
}

ginkgo.By("create a job", func() {
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
})

ginkgo.By("job should be suspend", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(true)))
})

// backup the the node selectors
originalNodeSelectors := nodeSelectors(createdJob)

ginkgo.By("create a localQueue", func() {
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})

ginkgo.By("job should be unsuspended", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
})

ginkgo.By("the node selector should be updated", func() {
gomega.Eventually(func() map[kubeflow.MPIReplicaType]map[string]string {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return nodeSelectors(createdJob)
}, util.Timeout, util.Interval).ShouldNot(gomega.Equal(originalNodeSelectors))
})

ginkgo.By("delete the workload", func() {
wl := &kueue.Workload{}
wlKey := types.NamespacedName{Name: workloadmpijob.GetWorkloadNameForMPIJob(job.Name), Namespace: job.Namespace}
gomega.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
gomega.Expect(util.DeleteWorkload(ctx, k8sClient, wl)).Should(gomega.Succeed())
})

ginkgo.By("delete the localQueue, prevent readmission", func() {
gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).Should(gomega.Succeed())
})

ginkgo.By("the node selectors should be restored", func() {
gomega.Eventually(func() map[kubeflow.MPIReplicaType]map[string]string {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return nodeSelectors(createdJob)
}, util.Timeout, util.Interval).Should(gomega.Equal(originalNodeSelectors))
})
})

ginkgo.It("Should continue to use the admission node selectors if the annotation is corrupted", func() {

localQueue := testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
job := testingmpijob.MakeMPIJob(jobName, ns.Name).Queue(localQueue.Name).
Request(kubeflow.MPIReplicaTypeLauncher, corev1.ResourceCPU, "3").
Request(kubeflow.MPIReplicaTypeWorker, corev1.ResourceCPU, "4").
Obj()
lookupKey := types.NamespacedName{Name: job.Name, Namespace: job.Namespace}
createdJob := &kubeflow.MPIJob{}

nodeSelectors := func(j *kubeflow.MPIJob) map[kubeflow.MPIReplicaType]map[string]string {
ret := map[kubeflow.MPIReplicaType]map[string]string{}
for k := range j.Spec.MPIReplicaSpecs {
ret[k] = j.Spec.MPIReplicaSpecs[k].Template.Spec.NodeSelector
}
return ret
}

ginkgo.By("create a job", func() {
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
})

ginkgo.By("job should be suspend", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(true)))
})

// backup the the node selectors
originalNodeSelectors := nodeSelectors(createdJob)

ginkgo.By("create a localQueue", func() {
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())
})

ginkgo.By("job should be unsuspended", func() {
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.RunPolicy.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
})

ginkgo.By("the node selector should be updated", func() {
gomega.Eventually(func() map[kubeflow.MPIReplicaType]map[string]string {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return nodeSelectors(createdJob)
}, util.Timeout, util.Interval).ShouldNot(gomega.Equal(originalNodeSelectors))
})

// backup the the node selectors
runningNodeSelectors := nodeSelectors(createdJob)

ginkgo.By("change the annotation", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
createdJob.Annotations[jobframework.OriginalNodeSelectorsAnnotation] = "invalid json string }"
return k8sClient.Update(ctx, createdJob)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("delete the workload", func() {
wl := &kueue.Workload{}
wlKey := types.NamespacedName{Name: workloadmpijob.GetWorkloadNameForMPIJob(job.Name), Namespace: job.Namespace}
gomega.Expect(k8sClient.Get(ctx, wlKey, wl)).Should(gomega.Succeed())
gomega.Expect(util.DeleteWorkload(ctx, k8sClient, wl)).Should(gomega.Succeed())
})

ginkgo.By("delete the localQueue, prevent readmission", func() {
gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).Should(gomega.Succeed())
})

ginkgo.By("the node selectors should not be restored", func() {
gomega.Consistently(func() map[kubeflow.MPIReplicaType]map[string]string {
gomega.Expect(k8sClient.Get(ctx, lookupKey, createdJob)).Should(gomega.Succeed())
return nodeSelectors(createdJob)
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(runningNodeSelectors))
})
})
})
})

0 comments on commit 56be3aa

Please sign in to comment.