From bc5171987ce355bfa56815d1417558879983e145 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Sun, 9 Jul 2023 23:47:05 +0900 Subject: [PATCH] Implement suspend semantics to PaddleJob --- .../paddlepaddle/paddlepaddle_controller.go | 3 +- .../paddlepaddle_controller_test.go | 281 +++++++++++++++++- 2 files changed, 267 insertions(+), 17 deletions(-) diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go index ea635cb1a2..b5c89b136e 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go @@ -17,6 +17,7 @@ package paddle import ( "context" "fmt" + trainutil "github.com/kubeflow/training-operator/pkg/util/train" "strings" "time" @@ -364,7 +365,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{}, logger := commonutil.LoggerForJob(paddlejob) // Set StartTime. - if jobStatus.StartTime == nil { + if !trainutil.IsJobSuspended(&paddlejob.Spec.RunPolicy) && jobStatus.StartTime == nil { now := metav1.Now() jobStatus.StartTime = &now // enqueue a sync to check if job past ActiveDeadlineSeconds diff --git a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go index 3cd82d3424..11b6bb2670 100644 --- a/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go +++ b/pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go @@ -17,6 +17,8 @@ package paddle import ( "context" "fmt" + commonutil "github.com/kubeflow/training-operator/pkg/util" + "k8s.io/apimachinery/pkg/api/errors" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -24,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1" "github.com/kubeflow/training-operator/pkg/util/testutil" @@ -34,16 +37,37 @@ var _ = Describe("PaddleJob controller", func() { const ( expectedPort = int32(8080) ) - + var ( + cleanPodPolicyAll = kubeflowv1.CleanPodPolicyAll + ) Context("When creating the PaddleJob", func() { - It("Should get the corresponding resources successfully", func() { - const ( - namespace = "default" - name = "test-job" - ) - By("By creating a new PaddleJob") - ctx := context.Background() - job := newPaddleJobForTest(name, namespace) + const name = "test-job" + var ( + ctx = context.Background() + ns *corev1.Namespace + job *kubeflowv1.PaddleJob + jobKey types.NamespacedName + masterKey types.NamespacedName + worker0Key types.NamespacedName + ) + BeforeEach(func() { + ns = &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "paddle-test-", + }, + } + Expect(testK8sClient.Create(ctx, ns)).Should(Succeed()) + + job = newPaddleJobForTest(name, ns.Name) + jobKey = client.ObjectKeyFromObject(job) + masterKey = types.NamespacedName{ + Name: fmt.Sprintf("%s-master-0", name), + Namespace: ns.Name, + } + worker0Key = types.NamespacedName{ + Name: fmt.Sprintf("%s-worker-0", name), + Namespace: ns.Name, + } job.Spec.PaddleReplicaSpecs = map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{ kubeflowv1.PaddleJobReplicaTypeMaster: { Replicas: pointer.Int32(1), @@ -86,19 +110,23 @@ var _ = Describe("PaddleJob controller", func() { }, }, } - + }) + AfterEach(func() { + Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed()) + }) + It("Should get the corresponding resources successfully", func() { + By("By creating a new PaddleJob") Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) - key := types.NamespacedName{Name: name, Namespace: namespace} created := &kubeflowv1.PaddleJob{} // We'll need to retry getting this newly created PaddleJob, given that creation may not immediately happen. Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) return err == nil }, testutil.Timeout, testutil.Interval).Should(BeTrue()) - masterKey := types.NamespacedName{Name: fmt.Sprintf("%s-master-0", name), Namespace: namespace} masterPod := &corev1.Pod{} Eventually(func() bool { err := testK8sClient.Get(ctx, masterKey, masterPod) @@ -147,7 +175,7 @@ var _ = Describe("PaddleJob controller", func() { masterPod.ResourceVersion = "" Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed()) Eventually(func() bool { - err := testK8sClient.Get(ctx, key, created) + err := testK8sClient.Get(ctx, jobKey, created) if err != nil { return false } @@ -157,8 +185,229 @@ var _ = Describe("PaddleJob controller", func() { // Check if the job is succeeded. cond := getCondition(created.Status, kubeflowv1.JobSucceeded) Expect(cond.Status).To(Equal(corev1.ConditionTrue)) - By("Deleting the PaddleJob") - Expect(testK8sClient.Delete(ctx, job)).Should(Succeed()) + }) + It("Shouldn't create resources when PaddleJob is suspended; Should create resources once PaddleJob is unsuspended", func() { + By("By creating a new PaddleJob with suspend=true") + job.Spec.RunPolicy.Suspend = pointer.Bool(true) + job.Spec.RunPolicy.CleanPodPolicy = &cleanPodPolicyAll + job.Spec.PaddleReplicaSpecs[kubeflowv1.PaddleJobReplicaTypeWorker].Replicas = pointer.Int32(1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PaddleJob{} + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + + By("Checking created PaddleJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Checking if the pods and services aren't created") + Consistently(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the PaddleJob has suspended condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.ConsistentDuration, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: "PaddleJobCreated", + Message: fmt.Sprintf("PaddleJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Status: corev1.ConditionTrue, + Reason: commonutil.JobSuspendedReason, + Message: fmt.Sprintf("PaddleJob %s is suspended.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Unsuspending the PaddleJob") + created.Spec.RunPolicy.Suspend = pointer.Bool(false) + Expect(testK8sClient.Update(ctx, created)).Should(Succeed()) + + By("Check if the pods and services are created") + Eventually(func() error { + return testK8sClient.Get(ctx, masterKey, masterPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerPod) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, masterKey, masterSvc) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + Eventually(func() error { + return testK8sClient.Get(ctx, worker0Key, workerSvc) + }, testutil.Timeout, testutil.Interval).Should(BeNil()) + + By("Updating Pod's condition with running") + masterPod.Status.Phase = corev1.PodRunning + masterPod.ResourceVersion = "" + Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed()) + workerPod.Status.Phase = corev1.PodRunning + workerPod.ResourceVersion = "" + Expect(testK8sClient.Status().Update(ctx, workerPod)).Should(Succeed()) + + By("Checking the PaddleJob has resumed conditions") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: "PaddleJobCreated", + Message: fmt.Sprintf("PaddleJob %s is created.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Status: corev1.ConditionFalse, + Reason: commonutil.JobResumedReason, + Message: fmt.Sprintf("PaddleJob %s is resumed.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.JobRunningReason, + Message: fmt.Sprintf("PaddleJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + }) + It("Should delete resources after PaddleJob is suspended", func() { + By("By creating a new PaddleJob") + job.Spec.RunPolicy.CleanPodPolicy = &cleanPodPolicyAll + job.Spec.PaddleReplicaSpecs[kubeflowv1.PaddleJobReplicaTypeWorker].Replicas = pointer.Int32(1) + Expect(testK8sClient.Create(ctx, job)).Should(Succeed()) + + created := &kubeflowv1.PaddleJob{} + masterPod := &corev1.Pod{} + workerPod := &corev1.Pod{} + masterSvc := &corev1.Service{} + workerSvc := &corev1.Service{} + + // We'll need to retry getting this newly created PaddleJob, given that creation may not immediately happen. + By("Checking created PaddleJob") + Eventually(func() bool { + err := testK8sClient.Get(ctx, jobKey, created) + return err == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Eventually(func() *metav1.Time { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.StartTime + }, testutil.Timeout, testutil.Interval).ShouldNot(BeNil()) + + By("Checking the created pods and services") + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errMaster == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errMaster == nil && errWorker == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + + By("Updating the pod's phase with Running") + masterPod.Status.Phase = corev1.PodRunning + workerPod.Status.Phase = corev1.PodRunning + masterPod.ResourceVersion = "" + workerPod.ResourceVersion = "" + Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed()) + Expect(testK8sClient.Status().Update(ctx, workerPod)).Should(Succeed()) + + By("Checking the PaddleJob's condition") + Eventually(func() []kubeflowv1.JobCondition { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.Conditions + }, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: "PaddleJobCreated", + Message: fmt.Sprintf("PaddleJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionTrue, + Reason: commonutil.JobRunningReason, + Message: fmt.Sprintf("PaddleJob %s is running.", name), + }, + }, testutil.IgnoreJobConditionsTimes)) + + By("Updating the PaddleJob with suspend=true") + created.Spec.RunPolicy.Suspend = pointer.Bool(true) + Expect(testK8sClient.Update(ctx, created)).Should(Succeed()) + + By("Checking if the pods and services are removed") + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterPod) + errWorker := testK8sClient.Get(ctx, worker0Key, workerPod) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Eventually(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + errMaster := testK8sClient.Get(ctx, masterKey, masterSvc) + errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc) + return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker) + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + + By("Checking if the PaddleJob has a suspended condition") + Eventually(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.PaddleJobReplicaTypeMaster].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.PaddleJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime == nil + }, testutil.Timeout, testutil.Interval).Should(BeTrue()) + Consistently(func() bool { + Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed()) + return created.Status.ReplicaStatuses[kubeflowv1.PaddleJobReplicaTypeMaster].Active == 0 && + created.Status.ReplicaStatuses[kubeflowv1.PaddleJobReplicaTypeWorker].Active == 0 && + created.Status.StartTime == nil + }, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue()) + Expect(created.Status.Conditions).Should(BeComparableTo([]kubeflowv1.JobCondition{ + { + Type: kubeflowv1.JobCreated, + Status: corev1.ConditionTrue, + Reason: "PaddleJobCreated", + Message: fmt.Sprintf("PaddleJob %s is created.", name), + }, + { + Type: kubeflowv1.JobRunning, + Status: corev1.ConditionFalse, + Reason: commonutil.JobSuspendedReason, + Message: fmt.Sprintf("PaddleJob %s is suspended.", name), + }, + { + Type: kubeflowv1.JobSuspended, + Reason: commonutil.JobSuspendedReason, + Message: fmt.Sprintf("PaddleJob %s is suspended.", name), + Status: corev1.ConditionTrue, + }, + }, testutil.IgnoreJobConditionsTimes)) }) }) })