Skip to content

Commit

Permalink
add kubeflow tfjobs integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Jul 30, 2024
1 parent 921b12b commit 3b563fc
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/util/testingjobs/tfjob/wrappers_tfjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func MakeTFJob(name, ns string) *TFJobWrapper {

type TFReplicaSpecRequirement struct {
ReplicaType kftraining.ReplicaType
Name string
ReplicaCount int32
Annotations map[string]string
RestartPolicy kftraining.RestartPolicy
Expand All @@ -59,6 +60,7 @@ func (j *TFJobWrapper) TFReplicaSpecs(replicaSpecs ...TFReplicaSpecRequirement)
j = j.TFReplicaSpecsDefault()
for _, rs := range replicaSpecs {
j.Spec.TFReplicaSpecs[rs.ReplicaType].Replicas = ptr.To[int32](rs.ReplicaCount)
j.Spec.TFReplicaSpecs[rs.ReplicaType].Template.Name = rs.Name
j.Spec.TFReplicaSpecs[rs.ReplicaType].Template.Spec.RestartPolicy = corev1.RestartPolicy(rs.RestartPolicy)
j.Spec.TFReplicaSpecs[rs.ReplicaType].Template.Spec.Containers[0].Name = "tensorflow"

Expand Down
135 changes: 135 additions & 0 deletions test/integration/multikueue/multikueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ import (
"sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
"sigs.k8s.io/kueue/pkg/features"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testingtfjob "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/util"

kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
)

var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
Expand Down Expand Up @@ -769,6 +773,137 @@ var _ = ginkgo.Describe("Multikueue", ginkgo.Ordered, ginkgo.ContinueOnFailure,
})
})

ginkgo.FIt("Should run a TFJob on worker if admitted", func() {
tfJob := testingtfjob.MakeTFJob("tfjob1", managerNs.Name).
Queue(managerLq.Name).
TFReplicaSpecs(
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypeChief,
ReplicaCount: 1,
Name: "tfjob-chief",
RestartPolicy: "OnFailure",
},
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypePS,
ReplicaCount: 1,
Name: "tfjob-ps",
RestartPolicy: "Never",
},
testingtfjob.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypeWorker,
ReplicaCount: 3,
Name: "tfjob-worker",
RestartPolicy: "OnFailure",
},
).
Obj()
gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, tfJob)).Should(gomega.Succeed())

wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name}

admission := utiltesting.MakeAdmission(managerCq.Name).PodSets(
kueue.PodSetAssignment{
Name: "chief",
}, kueue.PodSetAssignment{
Name: "ps",
}, kueue.PodSetAssignment{
Name: "worker",
},
).Obj()

ginkgo.By("setting workload reservation in the management cluster", func() {
createdWorkload := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
fmt.Println(createdWorkload.Spec.PodSets)
g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("checking the workload creation in the worker clusters", func() {
managerWl := &kueue.Workload{}
createdWorkload := &kueue.Workload{}
gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("setting workload reservation in worker2, the workload is admitted in manager and worker1 wl is removed", func() {
createdWorkload := &kueue.Workload{}
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(util.SetQuotaReservation(worker2TestCluster.ctx, worker2TestCluster.client, createdWorkload, admission)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
acs := workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multikueueAC.Name)
g.Expect(acs).NotTo(gomega.BeNil())
g.Expect(acs.State).To(gomega.Equal(kueue.CheckStateReady))
g.Expect(acs.Message).To(gomega.Equal(`The workload got reservation on "worker2"`))

g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
}, util.IgnoreConditionTimestampsAndObservedGeneration))
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("changing the status of the TFJob in the worker, updates the manager's TFJob status", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdTfJob := kftraining.TFJob{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(tfJob), &createdTfJob)).To(gomega.Succeed())
// createdTfJob.Status.Restarts = 10
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdTfJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
createdTfJob := kftraining.TFJob{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(tfJob), &createdTfJob)).To(gomega.Succeed())
// g.Expect(createdTfJob.Status.Restarts).To(gomega.Equal(int32(10)))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("finishing the worker TFJob, the manager's wl is marked as finished and the worker2 wl removed", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdTfJob := kftraining.TFJob{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, client.ObjectKeyFromObject(tfJob), &createdTfJob)).To(gomega.Succeed())
createdTfJob.Status.Conditions = append(createdTfJob.Status.Conditions, kftraining.JobCondition{
Type: kueue.WorkloadFinished,
Status: corev1.ConditionTrue,
Reason: "ByTest",
Message: "TFJob finished successfully",
})
g.Expect(worker2TestCluster.client.Status().Update(worker2TestCluster.ctx, &createdTfJob)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
createdWorkload := &kueue.Workload{}
g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed())
g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: "ByTest",
Message: "TFJob finished successfully",
}, util.IgnoreConditionTimestampsAndObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
createdWorkload := &kueue.Workload{}
g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError())
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("Should remove the worker's workload and job after reconnect when the managers job and workload are deleted", func() {
job := testingjob.MakeJob("job", managerNs.Name).
Queue(managerLq.Name).
Expand Down
15 changes: 14 additions & 1 deletion test/integration/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
Expand Down Expand Up @@ -88,7 +89,7 @@ func createCluster(setupFnc framework.ManagerSetup, apiFeatureGates ...string) c
c.fwk = &framework.Framework{
CRDPath: filepath.Join("..", "..", "..", "config", "components", "crd", "bases"),
WebhookPath: filepath.Join("..", "..", "..", "config", "components", "webhook"),
DepCRDPaths: []string{filepath.Join("..", "..", "..", "dep-crds", "jobset-operator")},
DepCRDPaths: []string{filepath.Join("..", "..", "..", "dep-crds", "jobset-operator"), filepath.Join("..", "..", "..", "dep-crds", "training-operator")},
APIServerFeatureGates: apiFeatureGates,
}
c.cfg = c.fwk.Init()
Expand Down Expand Up @@ -135,6 +136,18 @@ func managerSetup(ctx context.Context, mgr manager.Manager) {

err = workloadjobset.SetupJobSetWebhook(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadtfjob.SetupIndexes(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())

tfjobReconciler := workloadtfjob.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(constants.JobControllerName))
err = tfjobReconciler.SetupWithManager(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = workloadtfjob.SetupTFJobWebhook(mgr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}

func managerAndMultiKueueSetup(ctx context.Context, mgr manager.Manager, gcInterval time.Duration) {
Expand Down

0 comments on commit 3b563fc

Please sign in to comment.