Skip to content

Commit

Permalink
add multikueue tests for tfjob
Browse files Browse the repository at this point in the history
  • Loading branch information
mszadkow committed Jul 16, 2024
1 parent a57b642 commit 7845aa5
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 53 deletions.
112 changes: 61 additions & 51 deletions pkg/util/testingjobs/tfjob/wrappers_tfjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package testing

import (
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

"sigs.k8s.io/kueue/pkg/controller/constants"

kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
)

// TFJobWrapper wraps a Job.
Expand All @@ -42,61 +43,44 @@ func MakeTFJob(name, ns string) *TFJobWrapper {
RunPolicy: kftraining.RunPolicy{
Suspend: ptr.To(true),
},
TFReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{
kftraining.TFJobReplicaTypeChief: {
Replicas: ptr.To[int32](1),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "c",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
},
},
NodeSelector: map[string]string{},
},
},
},
kftraining.TFJobReplicaTypePS: {
Replicas: ptr.To[int32](1),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "c",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
},
},
NodeSelector: map[string]string{},
},
},
TFReplicaSpecs: map[kftraining.ReplicaType]*kftraining.ReplicaSpec{},
},
}}
}

type TFReplicaSpecRequirement struct {
ReplicaType kftraining.ReplicaType
ReplicaCount int32
Annotations map[string]string
RestartPolicy kftraining.RestartPolicy
}

func (j *TFJobWrapper) TFReplicaSpecs(replicaSpecs ...TFReplicaSpecRequirement) *TFJobWrapper {
j.Spec.TFReplicaSpecs = make(map[kftraining.ReplicaType]*kftraining.ReplicaSpec)

for _, rs := range replicaSpecs {
j.Spec.TFReplicaSpecs[rs.ReplicaType] = &kftraining.ReplicaSpec{
Replicas: ptr.To[int32](rs.ReplicaCount),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: rs.Annotations,
},
kftraining.TFJobReplicaTypeWorker: {
Replicas: ptr.To[int32](1),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "c",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
},
},
NodeSelector: map[string]string{},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicy(rs.RestartPolicy),
Containers: []corev1.Container{
{
Name: "tensorflow", // each tfjob container must have the name "tensorflow"
Command: []string{},
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
},
},
NodeSelector: map[string]string{},
},
},
},
}}
}
}

return j
}

// PriorityClass updates job priorityclass.
Expand All @@ -122,6 +106,20 @@ func (j *TFJobWrapper) Obj() *kftraining.TFJob {
return &j.TFJob
}

// Clone returns deep copy of the Job.
func (j *TFJobWrapper) Clone() *TFJobWrapper {
return &TFJobWrapper{TFJob: *j.DeepCopy()}
}

// Label sets the label key and value
func (j *TFJobWrapper) Label(key, value string) *TFJobWrapper {
if j.Labels == nil {
j.Labels = make(map[string]string)
}
j.Labels[key] = value
return j
}

// Queue updates the queue name of the job.
func (j *TFJobWrapper) Queue(queue string) *TFJobWrapper {
if j.Labels == nil {
Expand Down Expand Up @@ -155,3 +153,15 @@ func (j *TFJobWrapper) UID(uid string) *TFJobWrapper {
j.ObjectMeta.UID = types.UID(uid)
return j
}

// Condition adds a condition
func (j *TFJobWrapper) StatusConditions(c kftraining.JobCondition) *TFJobWrapper {
j.Status.Conditions = append(j.Status.Conditions, c)
return j
}

func (j *TFJobWrapper) Image(replicaType kftraining.ReplicaType, image string, args []string) *TFJobWrapper {
j.Spec.TFReplicaSpecs[replicaType].Template.Spec.Containers[0].Image = image
j.Spec.TFReplicaSpecs[replicaType].Template.Spec.Containers[0].Args = args
return j
}
100 changes: 100 additions & 0 deletions test/e2e/multikueue/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os/exec"

"github.com/google/go-cmp/cmp/cmpopts"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -37,9 +38,11 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job"
workloadjobset "sigs.k8s.io/kueue/pkg/controller/jobs/jobset"
workloadtfjob "sigs.k8s.io/kueue/pkg/controller/jobs/kubeflow/jobs/tfjob"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjob "sigs.k8s.io/kueue/pkg/util/testingjobs/job"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
testing "sigs.k8s.io/kueue/pkg/util/testingjobs/tfjob"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/util"
)
Expand Down Expand Up @@ -364,6 +367,103 @@ var _ = ginkgo.Describe("MultiKueue", func() {
util.IgnoreConditionTimestampsAndObservedGeneration)))
})
})
ginkgo.It("Should run a kubeflow tfjob on worker if admitted", func() {
tfJob := testing.MakeTFJob("tfjob1", managerNs.Name).
Queue(managerLq.Name).
TFReplicaSpecs(
testing.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypePS,
ReplicaCount: 1,
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
RestartPolicy: "Never",
},
testing.TFReplicaSpecRequirement{
ReplicaType: kftraining.TFJobReplicaTypeWorker,
ReplicaCount: 2,
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
RestartPolicy: "OnFailure",
},
).
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceCPU, "1").
Request(kftraining.TFJobReplicaTypePS, corev1.ResourceMemory, "200M").
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceCPU, "0.5").
Request(kftraining.TFJobReplicaTypeWorker, corev1.ResourceMemory, "100M").
Image(kftraining.TFJobReplicaTypePS, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}).
Image(kftraining.TFJobReplicaTypeWorker, "gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"5s"}).
Obj()

ginkgo.By("Creating the tfJob", func() {
gomega.Expect(k8sManagerClient.Create(ctx, tfJob)).Should(gomega.Succeed())
})

createdLeaderWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadtfjob.GetWorkloadNameForTFJob(tfJob.Name, tfJob.UID), Namespace: managerNs.Name}

// the execution should be given to the worker
ginkgo.By("Waiting to be admitted in worker1 and manager", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())
g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The workload is admitted",
}, util.IgnoreConditionTimestampsAndObservedGeneration))
g.Expect(workload.FindAdmissionCheck(createdLeaderWorkload.Status.AdmissionChecks, multiKueueAc.Name)).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{
Name: multiKueueAc.Name,
State: kueue.CheckStateReady,
Message: `The workload got reservation on "worker1"`,
}, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime")))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the tfJob to get status updates", func() {
gomega.Eventually(func(g gomega.Gomega) {
createdTfJob := &kftraining.TFJob{}
g.Expect(k8sManagerClient.Get(ctx, client.ObjectKeyFromObject(tfJob), createdTfJob)).To(gomega.Succeed())
g.Expect(createdTfJob.Status.ReplicaStatuses).To(gomega.BeComparableTo(
map[kftraining.ReplicaType]*kftraining.ReplicaStatus{
kftraining.TFJobReplicaTypePS: {
Active: 1,
Succeeded: 0,
},
kftraining.TFJobReplicaTypeWorker: {
Active: 2,
Succeeded: 0,
},
},
util.IgnoreConditionTimestampsAndObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Waiting for the tfJob to finish", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlLookupKey, createdLeaderWorkload)).To(gomega.Succeed())

g.Expect(apimeta.FindStatusCondition(createdLeaderWorkload.Status.Conditions, kueue.WorkloadFinished)).To(gomega.BeComparableTo(&metav1.Condition{
Type: kueue.WorkloadFinished,
Status: metav1.ConditionTrue,
Reason: kueue.WorkloadFinishedReasonSucceeded,
Message: fmt.Sprintf("TFJob %s/%s successfully completed.", createdLeaderWorkload.Namespace, tfJob.Name),
}, util.IgnoreConditionTimestampsAndObservedGeneration))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking no objects are left in the worker clusters and the tfJob is completed", func() {
gomega.Eventually(func(g gomega.Gomega) {
workerWl := &kueue.Workload{}
g.Expect(k8sWorker1Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, wlLookupKey, workerWl)).To(utiltesting.BeNotFoundError())
workerTfJob := &kftraining.TFJob{}
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError())
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(tfJob), workerTfJob)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})
})
ginkgo.When("The connection to a worker cluster is unreliable", func() {
ginkgo.It("Should update the cluster status to reflect the connection state", func() {
Expand Down
7 changes: 7 additions & 0 deletions test/e2e/multikueue/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
authenticationv1 "k8s.io/api/authentication/v1"
Expand Down Expand Up @@ -86,6 +87,8 @@ func kubeconfigForMultiKueueSA(ctx context.Context, c client.Client, restConfig
policyRule(jobset.SchemeGroupVersion.Group, "jobsets/status", "get"),
policyRule(kueue.SchemeGroupVersion.Group, "workloads", resourceVerbs...),
policyRule(kueue.SchemeGroupVersion.Group, "workloads/status", "get", "patch", "update"),
policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs", resourceVerbs...),
policyRule(kftraining.SchemeGroupVersion.Group, "tfjobs/status", "get"),
},
}
err := c.Create(ctx, cr)
Expand Down Expand Up @@ -223,6 +226,10 @@ var _ = ginkgo.BeforeSuite(func() {
util.WaitForJobSetAvailability(ctx, k8sWorker1Client)
util.WaitForJobSetAvailability(ctx, k8sWorker2Client)

// there should not be a kubeflow operator in manager cluster
util.WaitForKubeFlowAvailability(ctx, k8sWorker1Client)
util.WaitForKubeFlowAvailability(ctx, k8sWorker2Client)

ginkgo.GinkgoLogr.Info("Kueue and JobSet operators are available in all the clusters", "waitingTime", time.Since(waitForAvailableStart))

discoveryClient, err := discovery.NewDiscoveryClientForConfig(managerCfg)
Expand Down
13 changes: 11 additions & 2 deletions test/util/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/google/go-cmp/cmp/cmpopts"
kftraining "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -43,6 +44,9 @@ func CreateClientUsingCluster(kContext string) (client.WithWatch, *rest.Config)
err = jobset.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

err = kftraining.AddToScheme(scheme.Scheme)
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())

client, err := client.NewWithWatch(cfg, client.Options{Scheme: scheme.Scheme})
gomega.ExpectWithOffset(1, err).NotTo(gomega.HaveOccurred())
return client, cfg
Expand Down Expand Up @@ -100,6 +104,11 @@ func WaitForKueueAvailability(ctx context.Context, k8sClient client.Client) {
}

func WaitForJobSetAvailability(ctx context.Context, k8sClient client.Client) {
kcmKey := types.NamespacedName{Namespace: "jobset-system", Name: "jobset-controller-manager"}
waitForOperatorAvailability(ctx, k8sClient, kcmKey)
jcmKey := types.NamespacedName{Namespace: "jobset-system", Name: "jobset-controller-manager"}
waitForOperatorAvailability(ctx, k8sClient, jcmKey)
}

func WaitForKubeFlowAvailability(ctx context.Context, k8sClient client.Client) {
kftoKey := types.NamespacedName{Namespace: "kubeflow", Name: "training-operator"}
waitForOperatorAvailability(ctx, k8sClient, kftoKey)
}

0 comments on commit 7845aa5

Please sign in to comment.