Skip to content

Commit

Permalink
Reduce the amount of KFJob e2e multikueue tests
Browse files Browse the repository at this point in the history
due to consolidation of MultiKueue adapters for the KFJobs
  • Loading branch information
mszadkow committed Aug 23, 2024
1 parent d46dc44 commit 625531a
Showing 1 changed file with 0 additions and 135 deletions.
135 changes: 0 additions & 135 deletions test/e2e/multikueue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,11 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
workloadpaddlejob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/paddlejob"
workloadpytorchjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob"
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
workloadxgboostjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/xgboostjob"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testingpaddlejob "sigs.k8s.io/kueue/pkg/util/testingjobs/paddlejob"
testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob"
testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
testingxgboostjob "sigs.k8s.io/kueue/pkg/util/testingjobs/xgboostjob"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/util"
)
Expand Down Expand Up @@ -375,135 +369,6 @@ var _ = ginkgo.Describe("MultiKueue", func() {
util.IgnoreConditionTimestampsAndObservedGeneration)))
})
})
ginkgo.It("Should run a kubeflow TFJob on worker if admitted", func() {
// Since it requires 1.5 CPU, this job can only be admitted in worker 1.
tfJob := testingtfjob.MakeTFJob("tfjob1", managerNs.Name).
Queue(managerLq.Name).
TFReplicaSpecs(
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypeChief,
ReplicaCount: 1,
RestartPolicy: "OnFailure",
},
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypePS,
ReplicaCount: 1,
RestartPolicy: "Never",
},
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypeWorker,
ReplicaCount: 1,
RestartPolicy: "OnFailure",
},
).
Request(kftraining.TFJobReplicaTypeChief, corev1.ResourceCPU, "0.5").
Request(kftraining.TFJobReplicaTypeChief, corev1.ResourceMemory, "200M").
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceCPU, "0.5").
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M").
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceCPU, "0.5").
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceMemory, "100M").
Image(kftraining.TFJobReplicaTypeChief, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Image(kftraining.TFJobReplicaTypePS, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Image(kftraining.TFJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Obj()

ginkgo.By("Creating the TfJob", func() {
gomega.Expect(k8sManagerClient.Create(ctx, tfJob)).Should(gomega.Succeed())
})
wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name}

// the execution should be given to the worker1
waitForJobAdmitted(wlLookupKey, multiKueueAc.Name, "worker1")

ginkgo.By("Waiting for the TfJob to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdTfJob := &kftraining.TFJob{}
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(tfJob), createdTfJob)).To(gomega.Succeed())
g.Expect(createdTfJob.Status.ReplicaStatuses[kftraining.TFJobReplicaTypeChief]).To(gomega.BeComparableTo(
&kftraining.ReplicaStatus{
Active: 0,
Succeeded: 1,
},
util.IgnoreConditionTimestampsAndObservedGeneration))

finishReasonMessage := fmt.Sprintf("TFJob %s/%s successfully completed.", tfJob.Namespace, tfJob.Name)
checkFinishStatusCondition(g, wlLookupKey, finishReasonMessage)
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking no objects are left in the worker clusters and the TfJob is completed", func() {
gomega.Eventually(func(g gomega.Gomega) {
workerWl := &kueue.Workload{}
g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
workerTfJob := &kftraining.TFJob{}
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should run a kubeflow PaddleJob on worker if admitted", func() {
// Since it requires 1600M memory, this job can only be admitted in worker 2.
paddleJob := testingpaddlejob.MakePaddleJob("paddlejob1", managerNs.Name).
Queue(managerLq.Name).
PaddleReplicaSpecs(
testingpaddlejob.PaddleReplicaSpecRequirement{
ReplicaType: kftraining.PaddleJobReplicaTypeMaster,
ReplicaCount: 1,
RestartPolicy: "OnFailure",
},
testingpaddlejob.PaddleReplicaSpecRequirement{
ReplicaType: kftraining.PaddleJobReplicaTypeWorker,
ReplicaCount: 1,
RestartPolicy: "OnFailure",
},
).
Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceCPU, "0.2").
Request(kftraining.PaddleJobReplicaTypeMaster, corev1.ResourceMemory, "800M").
Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceCPU, "0.2").
Request(kftraining.PaddleJobReplicaTypeWorker, corev1.ResourceMemory, "800M").
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0").
Args([]string{"1ms"}).
Obj()

ginkgo.By("Creating the PaddleJob", func() {
gomega.Expect(k8sManagerClient.Create(ctx, paddleJob)).Should(gomega.Succeed())
})

wlLookupKey := types.NamespacedName{Name: workloadpaddlejob.GetWorkloadNameForPaddleJob(paddleJob.Name, paddleJob.UID), Namespace: managerNs.Name}

// the execution should be given to the worker2
waitForJobAdmitted(wlLookupKey, multiKueueAc.Name, "worker2")

ginkgo.By("Waiting for the PaddleJob to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdPaddleJob := &kftraining.PaddleJob{}
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(paddleJob), createdPaddleJob)).To(gomega.Succeed())
g.Expect(createdPaddleJob.Status.ReplicaStatuses[kftraining.PaddleJobReplicaTypeMaster]).To(gomega.BeComparableTo(
&kftraining.ReplicaStatus{
Active: 0,
Succeeded: 1,
Selector: fmt.Sprintf("training.kubeflow.org/job-name=%s,training.kubeflow.org/operator-name=paddlejob-controller,training.kubeflow.org/replica-type=master", createdPaddleJob.Name),
},
util.IgnoreConditionTimestampsAndObservedGeneration))

finishReasonMessage := fmt.Sprintf("PaddleJob %s is successfully completed.", paddleJob.Name)
checkFinishStatusCondition(g, wlLookupKey, finishReasonMessage)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking no objects are left in the worker clusters and the PaddleJob is completed", func() {
gomega.Eventually(func(g gomega.Gomega) {
workerWl := &kueue.Workload{}
g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
workerPaddleJob := &kftraining.PaddleJob{}
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(paddleJob), workerPaddleJob)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(paddleJob), workerPaddleJob)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should run a kubeflow PyTorchJob on worker if admitted", func() {
// Since it requires 1600M of memory, this job can only be admitted in worker 2.
Expand Down

0 comments on commit 625531a

Please sign in to comment.