diff --git a/pkg/controller/admissionchecks/multikueue/indexer_test.go b/pkg/controller/admissionchecks/multikueue/indexer_test.go index 118243285f..cbd5c40bf2 100644 --- a/pkg/controller/admissionchecks/multikueue/indexer_test.go +++ b/pkg/controller/admissionchecks/multikueue/indexer_test.go @@ -22,7 +22,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -30,10 +29,10 @@ import ( clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2" kueuealpha "sigs.k8s.io/kueue/apis/kueue/v1alpha1" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/controller/jobframework" "sigs.k8s.io/kueue/pkg/util/slices" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" ) @@ -47,8 +46,13 @@ func getClientBuilder() (*fake.ClientBuilder, context.Context) { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(kueue.AddToScheme(scheme)) utilruntime.Must(kueuealpha.AddToScheme(scheme)) - utilruntime.Must(jobset.AddToScheme(scheme)) - utilruntime.Must(kftraining.AddToScheme(scheme)) + + utilruntime.Must(jobframework.ForEachIntegration(func(_ string, cb jobframework.IntegrationCallbacks) error { + if cb.MultiKueueAdapter != nil && cb.AddToScheme != nil { + return cb.AddToScheme(scheme) + } + return nil + })) ctx := context.Background() builder := fake.NewClientBuilder().WithScheme(scheme).WithObjects(&corev1.Namespace{ diff --git a/pkg/util/testingjobs/tfjob/wrappers_tfjob.go b/pkg/util/testingjobs/tfjob/wrappers_tfjob.go index 120fc25e22..19c6838215 100644 --- a/pkg/util/testingjobs/tfjob/wrappers_tfjob.go +++ b/pkg/util/testingjobs/tfjob/wrappers_tfjob.go @@ -17,6 +17,7 @@ limitations under the License. package testing import ( + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,8 +25,6 @@ import ( "k8s.io/utils/ptr" "sigs.k8s.io/kueue/pkg/controller/constants" - - kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) // TFJobWrapper wraps a Job. diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index ce512626d1..41e5a9cf78 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/google/go-cmp/cmp/cmpopts" + kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" batchv1 "k8s.io/api/batch/v1" @@ -48,8 +49,6 @@ import ( testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob" "sigs.k8s.io/kueue/pkg/workload" "sigs.k8s.io/kueue/test/util" - - kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" ) var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() { @@ -773,7 +772,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, }) }) - ginkgo.FIt("Should run a TFJob on worker if admitted", func() { + ginkgo.It("Should run a TFJob on worker if admitted", func() { tfJob := testingtfjob.MakeTFJob("tfjob1", managerNs.Name). Queue(managerLq.Name). TFReplicaSpecs( @@ -863,13 +862,36 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, gomega.Eventually(func(g gomega.Gomega) { createdTfJob := kftraining.TFJob{} g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(tfJob), &createdTfJob)).To(gomega.Succeed()) - // createdTfJob.Status.Restarts = 10 + createdTfJob.Status.ReplicaStatuses = map[kftraining.ReplicaType]*kftraining.ReplicaStatus{ + kftraining.TFJobReplicaTypeChief: { + Active: 1, + }, + kftraining.TFJobReplicaTypePS: { + Active: 1, + }, + kftraining.TFJobReplicaTypeWorker: { + Active: 2, + Succeeded: 1, + }, + } g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdTfJob)).To(gomega.Succeed()) }, util.Timeout, util.Interval).Should(gomega.Succeed()) gomega.Eventually(func(g gomega.Gomega) { createdTfJob := kftraining.TFJob{} g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(tfJob), &createdTfJob)).To(gomega.Succeed()) - // g.Expect(createdTfJob.Status.Restarts).To(gomega.Equal(int32(10))) + g.Expect(createdTfJob.Status.ReplicaStatuses).To(gomega.Equal( + map[kftraining.ReplicaType]*kftraining.ReplicaStatus{ + kftraining.TFJobReplicaTypeChief: { + Active: 1, + }, + kftraining.TFJobReplicaTypePS: { + Active: 1, + }, + kftraining.TFJobReplicaTypeWorker: { + Active: 2, + Succeeded: 1, + }, + })) }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) @@ -878,7 +900,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, createdTfJob := kftraining.TFJob{} g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(tfJob), &createdTfJob)).To(gomega.Succeed()) createdTfJob.Status.Conditions = append(createdTfJob.Status.Conditions, kftraining.JobCondition{ - Type: kueue.WorkloadFinished, + Type: kftraining.JobSucceeded, Status: corev1.ConditionTrue, Reason: "ByTest", Message: "TFJob finished successfully", @@ -892,7 +914,7 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{ Type: kueue.WorkloadFinished, Status: metav1.ConditionTrue, - Reason: "ByTest", + Reason: string(kftraining.JobSucceeded), Message: "TFJob finished successfully", }, util.IgnoreConditionTimestampsAndObservedGeneration)) }, util.LongTimeout, util.Interval).Should(gomega.Succeed())