Skip to content

Commit

Permalink
Add e2e Multikueue tests for MPIJob
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Aug 23, 2024
1 parent 625531a commit 1182083
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 32 deletions.
59 changes: 30 additions & 29 deletions test/e2e/multikueue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
versionutil "k8s.io/apimachinery/pkg/util/version"
"k8s.io/utils/ptr"

kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
"sigs.k8s.io/controller-runtime/pkg/client"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

Expand All @@ -41,9 +42,11 @@ import (
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
workloadpytorchjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/pytorchjob"
workloadmpijob "sigs.k8s.io/kueue/pkg/controller/jobs/mpijob"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testingmpijob "sigs.k8s.io/kueue/pkg/util/testingjobs/mpijob"
testingpytorchjob "sigs.k8s.io/kueue/pkg/util/testingjobs/pytorchjob"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/util"
Expand Down Expand Up @@ -432,65 +435,63 @@ var _ = ginkgo.Describe("MultiKueue", func() {
})
})

ginkgo.It("Should run a kubeflow XGBoostJob on worker if admitted", func() {
// Skipped due to known bug - https://github.com/kubeflow/training-operator/issues/1711
ginkgo.Skip("Skipped due to state transitioning bug in training-operator")
ginkgo.It("Should run a MPIJob on worker if admitted", func() {
// Since it requires 1.5 CPU, this job can only be admitted in worker 1.
xgboostJob := testingxgboostjob.MakeXGBoostJob("xgboostjob1", managerNs.Name).
mpijob := testingmpijob.MakeMPIJob("mpijob1", managerNs.Name).
Queue(managerLq.Name).
XGBReplicaSpecs(
testingxgboostjob.XGBReplicaSpecRequirement{
ReplicaType: kftraining.XGBoostJobReplicaTypeMaster,
MPIJobReplicaSpecs(
testingmpijob.MPIJobReplicaSpecRequirement{
ReplicaType: kubeflow.MPIReplicaTypeLauncher,
ReplicaCount: 1,
RestartPolicy: "OnFailure",
},
testingxgboostjob.XGBReplicaSpecRequirement{
ReplicaType: kftraining.XGBoostJobReplicaTypeWorker,
ReplicaCount: 1,
testingmpijob.MPIJobReplicaSpecRequirement{
ReplicaType: kubeflow.MPIReplicaTypeWorker,
ReplicaCount: 2,
RestartPolicy: "OnFailure",
},
).
Request(kftraining.XGBoostJobReplicaTypeMaster, corev1.ResourceCPU, "1").
Request(kftraining.XGBoostJobReplicaTypeMaster, corev1.ResourceMemory, "200M").
Request(kftraining.XGBoostJobReplicaTypeWorker, corev1.ResourceCPU, "0.5").
Request(kftraining.XGBoostJobReplicaTypeWorker, corev1.ResourceMemory, "100M").
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0").
Args([]string{"1ms"}).
Request(kubeflow.MPIReplicaTypeLauncher, corev1.ResourceCPU, "1").
Request(kubeflow.MPIReplicaTypeLauncher, corev1.ResourceMemory, "200M").
Request(kubeflow.MPIReplicaTypeWorker, corev1.ResourceCPU, "0.5").
Request(kubeflow.MPIReplicaTypeWorker, corev1.ResourceMemory, "100M").
Image(kubeflow.MPIReplicaTypeLauncher, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Image(kubeflow.MPIReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Obj()

ginkgo.By("Creating the XGBoostJob", func() {
gomega.Expect(k8sManagerClient.Create(ctx, xgboostJob)).Should(gomega.Succeed())
ginkgo.By("Creating the MPIJob", func() {
gomega.Expect(k8sManagerClient.Create(ctx, mpijob)).Should(gomega.Succeed())
})

wlLookupKey := types.NamespacedName{Name: workloadxgboostjob.GetWorkloadNameForXGBoostJob(xgboostJob.Name, xgboostJob.UID), Namespace: managerNs.Name}
wlLookupKey := types.NamespacedName{Name: workloadmpijob.GetWorkloadNameForMPIJob(mpijob.Name, mpijob.UID), Namespace: managerNs.Name}

// the execution should be given to the worker1
waitForJobAdmitted(wlLookupKey, multiKueueAc.Name, "worker1")

ginkgo.By("Waiting for the XGBoostJob to finish", func() {
ginkgo.By("Waiting for the MPIJob to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdXGBoostJob := &kftraining.XGBoostJob{}
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(xgboostJob), createdXGBoostJob)).To(gomega.Succeed())
g.Expect(createdXGBoostJob.Status.ReplicaStatuses[kftraining.XGBoostJobReplicaTypeMaster]).To(gomega.BeComparableTo(
&kftraining.ReplicaStatus{
createdMPIJob := &kubeflow.MPIJob{}
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(mpijob), createdMPIJob)).To(gomega.Succeed())
g.Expect(createdMPIJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeLauncher]).To(gomega.BeComparableTo(
&kubeflow.ReplicaStatus{
Active: 0,
Succeeded: 1,
},
util.IgnoreConditionTimestampsAndObservedGeneration))

finishReasonMessage := fmt.Sprintf("XGBoostJob %s is successfully completed.", xgboostJob.Name)
finishReasonMessage := fmt.Sprintf("MPIJob %s/%s successfully completed.", mpijob.Namespace, mpijob.Name)
checkFinishStatusCondition(g, wlLookupKey, finishReasonMessage)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking no objects are left in the worker clusters and the XGBoostJob is completed", func() {
ginkgo.By("Checking no objects are left in the worker clusters and the MPIJob is completed", func() {
gomega.Eventually(func(g gomega.Gomega) {
workerWl := &kueue.Workload{}
g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
workerXGBoostJob := &kftraining.XGBoostJob{}
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(xgboostJob), workerXGBoostJob)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(xgboostJob), workerXGBoostJob)).To(utiltesting.BeNotFoundError())
workerMPIJob := &kubeflow.MPIJob{}
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(mpijob), workerMPIJob)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(mpijob), workerMPIJob)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})
Expand Down
10 changes: 8 additions & 2 deletions test/e2e/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
Expand Down Expand Up @@ -95,6 +96,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig
policyRule(kftraining.SchemeGroupVersion.Group, "pytorchjobs/status", "get"),
policyRule(kftraining.SchemeGroupVersion.Group, "xgboostjobs", resourceVerbs...),
policyRule(kftraining.SchemeGroupVersion.Group, "xgboostjobs/status", "get"),
policyRule(kubeflow.SchemeGroupVersion.Group, "mpijobs", resourceVerbs...),
policyRule(kubeflow.SchemeGroupVersion.Group, "mpijobs/status", "get"),
},
}
err := c.Create(ctx, cr)
Expand Down Expand Up @@ -233,8 +236,11 @@ var _ = ginkgo.BeforeSuite(func() {
util.WaitForJobSetAvailability(ctx, k8sWorker2Client)

// there should not be a kubeflow operator in manager cluster
util.WaitForKubeFlowAvailability(ctx, k8sWorker1Client)
util.WaitForKubeFlowAvailability(ctx, k8sWorker2Client)
util.WaitForKubeFlowTrainingOperatorAvailability(ctx, k8sWorker1Client)
util.WaitForKubeFlowTrainingOperatorAvailability(ctx, k8sWorker2Client)

util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker1Client)
util.WaitForKubeFlowMPIOperatorAvailability(ctx, k8sWorker2Client)

ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart))

Expand Down
11 changes: 10 additions & 1 deletion test/util/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/google/go-cmp/cmp/cmpopts"
kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -47,6 +48,9 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config)
err = kftraining.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = kubeflow.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme})
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
return client, cfg
Expand Down Expand Up @@ -108,7 +112,12 @@ func WaitForJobSetAvailability(ctx context.Context, k8sClient client.Client) {
waitForOperatorAvailability(ctx, k8sClient, jcmKey)
}

func WaitForKubeFlowAvailability(ctx context.Context, k8sClient client.Client) {
func WaitForKubeFlowTrainingOperatorAvailability(ctx context.Context, k8sClient client.Client) {
kftoKey := types.NamespacedName{Namespace: "kubeflow", Name: "training-operator"}
waitForOperatorAvailability(ctx, k8sClient, kftoKey)
}

func WaitForKubeFlowMPIOperatorAvailability(ctx context.Context, k8sClient client.Client) {
kftoKey := types.NamespacedName{Namespace: "mpi-operator", Name: "mpi-operator"}
waitForOperatorAvailability(ctx, k8sClient, kftoKey)
}

0 comments on commit 1182083

Please sign in to comment.