From 3e314a87773775cdde698a7cd2e33c329d4baa3f Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Tue, 16 Jul 2024 15:16:50 +0200 Subject: [PATCH] add multikueue tests for tfjob --- pkg/util/testingjobs/tfjob/wrappers_tfjob.go | 29 ++++- test/e2e/multikueue/e2e_test.go | 115 +++++++++++++++++++ test/e2e/multikueue/suite_test.go | 7 ++ test/util/e2e.go | 13 ++- 4 files changed, 161 insertions(+), 3 deletions(-) diff --git a/pkg/util/testingjobs/tfjob/wrappers_tfjob.go b/pkg/util/testingjobs/tfjob/wrappers_tfjob.go index c827cb5382..d4a0ea2fbd 100644 --- a/pkg/util/testingjobs/tfjob/wrappers_tfjob.go +++ b/pkg/util/testingjobs/tfjob/wrappers_tfjob.go @@ -17,7 +17,6 @@ limitations under the License. package testing import ( - kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -25,6 +24,8 @@ import ( "k8s.io/utils/ptr" "sigs.k8s.io/kueue/pkg/controller/constants" + + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) // TFJobWrapper wraps a Job. @@ -150,6 +151,20 @@ func (j *TFJobWrapper) Obj() *kftraining.TFJob { return &j.TFJob } +// Clone returns deep copy of the TFJobWrapper. +func (j *TFJobWrapper) Clone() *TFJobWrapper { + return &TFJobWrapper{TFJob: *j.DeepCopy()} +} + +// Label sets the label key and value +func (j *TFJobWrapper) Label(key, value string) *TFJobWrapper { + if j.Labels == nil { + j.Labels = make(map[string]string) + } + j.Labels[key] = value + return j +} + // Queue updates the queue name of the job. func (j *TFJobWrapper) Queue(queue string) *TFJobWrapper { if j.Labels == nil { @@ -183,3 +198,15 @@ func (j *TFJobWrapper) UID(uid string) *TFJobWrapper { j.ObjectMeta.UID = types.UID(uid) return j } + +// Condition adds a condition +func (j *TFJobWrapper) StatusConditions(c kftraining.JobCondition) *TFJobWrapper { + j.Status.Conditions = append(j.Status.Conditions, c) + return j +} + +func (j *TFJobWrapper) Image(replicaType kftraining.ReplicaType, image string, args []string) *TFJobWrapper { + j.Spec.TFReplicaSpecs[replicaType].Template.Spec.Containers[0].Image = image + j.Spec.TFReplicaSpecs[replicaType].Template.Spec.Containers[0].Args = args + return j +} diff --git a/test/e2e/multikueue/e2e_test.go b/test/e2e/multikueue/e2e_test.go index 9ab7c487f1..521d1679c5 100644 --- a/test/e2e/multikueue/e2e_test.go +++ b/test/e2e/multikueue/e2e_test.go @@ -21,6 +21,7 @@ import ( "os/exec" "github.com/google/go-cmp/cmp/cmpopts" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" batchv1 "k8s.io/api/batch/v1" @@ -37,9 +38,11 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset" + workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job" testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset" + testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" ) @@ -364,6 +367,118 @@ var _ = ginkgo.Describe("MultiKueue", func() { util.IgnoreConditionTimestampsAndObservedGeneration))) }) }) + ginkgo.It("Should run a kubeflow TFJob on worker if admitted", func() { + tfJob := testingtfjob.MakeTFJob("tfjob1", managerNs.Name). + Queue(managerLq.Name). + TFReplicaSpecs( + testingtfjob.TFReplicaSpecRequirement{ + ReplicaType: kftraining.TFJobReplicaTypeChief, + ReplicaCount: 1, + Annotations: map[string]string{ + "sidecar.istio.io/inject": "false", + }, + RestartPolicy: "OnFailure", + }, + testingtfjob.TFReplicaSpecRequirement{ + ReplicaType: kftraining.TFJobReplicaTypePS, + ReplicaCount: 1, + Annotations: map[string]string{ + "sidecar.istio.io/inject": "false", + }, + RestartPolicy: "Never", + }, + testingtfjob.TFReplicaSpecRequirement{ + ReplicaType: kftraining.TFJobReplicaTypeWorker, + ReplicaCount: 2, + Annotations: map[string]string{ + "sidecar.istio.io/inject": "false", + }, + RestartPolicy: "OnFailure", + }, + ). + Request(kftraining.TFJobReplicaTypePS, corev1.ResourceCPU, "0.5"). + Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M"). + Request(kftraining.TFJobReplicaTypePS, corev1.ResourceCPU, "0.5"). + Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M"). + Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceCPU, "0.5"). + Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceMemory, "100M"). + Image(kftraining.TFJobReplicaTypeChief, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}). + Image(kftraining.TFJobReplicaTypePS, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}). + Image(kftraining.TFJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}). + Obj() + + ginkgo.By("Creating the TfJob", func() { + gomega.Expect(k8sManagerClient.Create(ctx, tfJob)).Should(gomega.Succeed()) + }) + + createdLeaderWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name} + + // the execution should be given to the worker + ginkgo.By("Waiting to be admitted in worker1 and manager", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed()) + g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + g.Expect(workload.FindAdmissionCheck(createdLeaderWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: multiKueueAc.Name, + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime"))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Waiting for the TfJob to get status updates", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdTfJob := &kftraining.TFJob{} + g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(tfJob), createdTfJob)).To(gomega.Succeed()) + g.Expect(createdTfJob.Status.ReplicaStatuses).To(gomega.BeComparableTo( + map[kftraining.ReplicaType]*kftraining.ReplicaStatus{ + kftraining.TFJobReplicaTypeChief: { + Active: 0, + Succeeded: 1, + }, + kftraining.TFJobReplicaTypePS: { + Active: 0, + Succeeded: 1, + }, + kftraining.TFJobReplicaTypeWorker: { + Active: 0, + Succeeded: 2, + }, + }, + util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Waiting for the TfJob to finish", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed()) + + g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadFinished, + Status: metav1.ConditionTrue, + Reason: kueue.WorkloadFinishedReasonSucceeded, + Message: fmt.Sprintf("TFJob %s/%s successfully completed.", createdLeaderWorkload.Namespace, tfJob.Name), + }, util.IgnoreConditionTimestampsAndObservedGeneration)) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Checking no objects are left in the worker clusters and the TfJob is completed", func() { + gomega.Eventually(func(g gomega.Gomega) { + workerWl := &kueue.Workload{} + g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError()) + workerTfJob := &kftraining.TFJob{} + g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError()) + g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + }) }) ginkgo.When("The connection to a worker cluster is unreliable", func() { ginkgo.It("Should update the cluster status to reflect the connection state", func() { diff --git a/test/e2e/multikueue/suite_test.go b/test/e2e/multikueue/suite_test.go index fe206f0f99..0975545d49 100644 --- a/test/e2e/multikueue/suite_test.go +++ b/test/e2e/multikueue/suite_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" authenticationv1 "k8s.io/api/authentication/v1" @@ -86,6 +87,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig policyRule(jobset.SchemeGroupVersion.Group, "jobsets/status", "get"), policyRule(kueue.SchemeGroupVersion.Group, "workloads", resourceVerbs...), policyRule(kueue.SchemeGroupVersion.Group, "workloads/status", "get", "patch", "update"), + policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs", resourceVerbs...), + policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs/status", "get"), }, } err := c.Create(ctx, cr) @@ -223,6 +226,10 @@ var _ = ginkgo.BeforeSuite(func() { util.WaitForJobSetAvailability(ctx, k8sWorker1Client) util.WaitForJobSetAvailability(ctx, k8sWorker2Client) + // there should not be a kubeflow operator in manager cluster + util.WaitForKubeFlowAvailability(ctx, k8sWorker1Client) + util.WaitForKubeFlowAvailability(ctx, k8sWorker2Client) + ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart)) discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerCfg) diff --git a/test/util/e2e.go b/test/util/e2e.go index b81853cd20..f4dc7a81dc 100644 --- a/test/util/e2e.go +++ b/test/util/e2e.go @@ -6,6 +6,7 @@ import ( "os" "github.com/google/go-cmp/cmp/cmpopts" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -43,6 +44,9 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config) err = jobset.AddToScheme(scheme.Scheme) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) + err = kftraining.AddToScheme(scheme.Scheme) + gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) + client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme}) gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred()) return client, cfg @@ -100,6 +104,11 @@ func WaitForKueueAvailability(ctx context.Context, k8sClient client.Client) { } func WaitForJobSetAvailability(ctx context.Context, k8sClient client.Client) { - kcmKey := types.NamespacedName{Namespace: "jobset-system", Name: "jobset-controller-manager"} - waitForOperatorAvailability(ctx, k8sClient, kcmKey) + jcmKey := types.NamespacedName{Namespace: "jobset-system", Name: "jobset-controller-manager"} + waitForOperatorAvailability(ctx, k8sClient, jcmKey) +} + +func WaitForKubeFlowAvailability(ctx context.Context, k8sClient client.Client) { + kftoKey := types.NamespacedName{Namespace: "kubeflow", Name: "training-operator"} + waitForOperatorAvailability(ctx, k8sClient, kftoKey) }