Skip to content

Commit

Permalink
Implement suspend semantics to PaddleJob
Browse files Browse the repository at this point in the history
  • Loading branch information
tenzen-y committed Jul 9, 2023
1 parent 992ea4e commit bc51719
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 17 deletions.
3 changes: 2 additions & 1 deletion pkg/controller.v1/paddlepaddle/paddlepaddle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package paddle
import (
"context"
"fmt"
trainutil "github.com/kubeflow/training-operator/pkg/util/train"
"strings"
"time"

Expand Down Expand Up @@ -364,7 +365,7 @@ func (r *PaddleJobReconciler) UpdateJobStatus(job interface{},
logger := commonutil.LoggerForJob(paddlejob)

// Set StartTime.
if jobStatus.StartTime == nil {
if !trainutil.IsJobSuspended(&paddlejob.Spec.RunPolicy) && jobStatus.StartTime == nil {
now := metav1.Now()
jobStatus.StartTime = &now
// enqueue a sync to check if job past ActiveDeadlineSeconds
Expand Down
281 changes: 265 additions & 16 deletions pkg/controller.v1/paddlepaddle/paddlepaddle_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ package paddle
import (
"context"
"fmt"
commonutil "github.com/kubeflow/training-operator/pkg/util"
"k8s.io/apimachinery/pkg/api/errors"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
"github.com/kubeflow/training-operator/pkg/util/testutil"
Expand All @@ -34,16 +37,37 @@ var _ = Describe("PaddleJob controller", func() {
const (
expectedPort = int32(8080)
)

var (
cleanPodPolicyAll = kubeflowv1.CleanPodPolicyAll
)
Context("When creating the PaddleJob", func() {
It("Should get the corresponding resources successfully", func() {
const (
namespace = "default"
name = "test-job"
)
By("By creating a new PaddleJob")
ctx := context.Background()
job := newPaddleJobForTest(name, namespace)
const name = "test-job"
var (
ctx = context.Background()
ns *corev1.Namespace
job *kubeflowv1.PaddleJob
jobKey types.NamespacedName
masterKey types.NamespacedName
worker0Key types.NamespacedName
)
BeforeEach(func() {
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "paddle-test-",
},
}
Expect(testK8sClient.Create(ctx, ns)).Should(Succeed())

job = newPaddleJobForTest(name, ns.Name)
jobKey = client.ObjectKeyFromObject(job)
masterKey = types.NamespacedName{
Name: fmt.Sprintf("%s-master-0", name),
Namespace: ns.Name,
}
worker0Key = types.NamespacedName{
Name: fmt.Sprintf("%s-worker-0", name),
Namespace: ns.Name,
}
job.Spec.PaddleReplicaSpecs = map[kubeflowv1.ReplicaType]*kubeflowv1.ReplicaSpec{
kubeflowv1.PaddleJobReplicaTypeMaster: {
Replicas: pointer.Int32(1),
Expand Down Expand Up @@ -86,19 +110,23 @@ var _ = Describe("PaddleJob controller", func() {
},
},
}

})
AfterEach(func() {
Expect(testK8sClient.Delete(ctx, job)).Should(Succeed())
Expect(testK8sClient.Delete(ctx, ns)).Should(Succeed())
})
It("Should get the corresponding resources successfully", func() {
By("By creating a new PaddleJob")
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

key := types.NamespacedName{Name: name, Namespace: namespace}
created := &kubeflowv1.PaddleJob{}

// We'll need to retry getting this newly created PaddleJob, given that creation may not immediately happen.
Eventually(func() bool {
err := testK8sClient.Get(ctx, key, created)
err := testK8sClient.Get(ctx, jobKey, created)
return err == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())

masterKey := types.NamespacedName{Name: fmt.Sprintf("%s-master-0", name), Namespace: namespace}
masterPod := &corev1.Pod{}
Eventually(func() bool {
err := testK8sClient.Get(ctx, masterKey, masterPod)
Expand Down Expand Up @@ -147,7 +175,7 @@ var _ = Describe("PaddleJob controller", func() {
masterPod.ResourceVersion = ""
Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed())
Eventually(func() bool {
err := testK8sClient.Get(ctx, key, created)
err := testK8sClient.Get(ctx, jobKey, created)
if err != nil {
return false
}
Expand All @@ -157,8 +185,229 @@ var _ = Describe("PaddleJob controller", func() {
// Check if the job is succeeded.
cond := getCondition(created.Status, kubeflowv1.JobSucceeded)
Expect(cond.Status).To(Equal(corev1.ConditionTrue))
By("Deleting the PaddleJob")
Expect(testK8sClient.Delete(ctx, job)).Should(Succeed())
})
It("Shouldn't create resources when PaddleJob is suspended; Should create resources once PaddleJob is unsuspended", func() {
By("By creating a new PaddleJob with suspend=true")
job.Spec.RunPolicy.Suspend = pointer.Bool(true)
job.Spec.RunPolicy.CleanPodPolicy = &cleanPodPolicyAll
job.Spec.PaddleReplicaSpecs[kubeflowv1.PaddleJobReplicaTypeWorker].Replicas = pointer.Int32(1)
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

created := &kubeflowv1.PaddleJob{}
masterPod := &corev1.Pod{}
workerPod := &corev1.Pod{}
masterSvc := &corev1.Service{}
workerSvc := &corev1.Service{}

By("Checking created PaddleJob")
Eventually(func() bool {
err := testK8sClient.Get(ctx, jobKey, created)
return err == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())

By("Checking if the pods and services aren't created")
Consistently(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterPod)
errWorker := testK8sClient.Get(ctx, worker0Key, workerPod)
return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())
Consistently(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterSvc)
errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc)
return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())

By("Checking if the PaddleJob has suspended condition")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.ConsistentDuration, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{
{
Type: kubeflowv1.JobCreated,
Status: corev1.ConditionTrue,
Reason: "PaddleJobCreated",
Message: fmt.Sprintf("PaddleJob %s is created.", name),
},
{
Type: kubeflowv1.JobSuspended,
Status: corev1.ConditionTrue,
Reason: commonutil.JobSuspendedReason,
Message: fmt.Sprintf("PaddleJob %s is suspended.", name),
},
}, testutil.IgnoreJobConditionsTimes))

By("Unsuspending the PaddleJob")
created.Spec.RunPolicy.Suspend = pointer.Bool(false)
Expect(testK8sClient.Update(ctx, created)).Should(Succeed())

By("Check if the pods and services are created")
Eventually(func() error {
return testK8sClient.Get(ctx, masterKey, masterPod)
}, testutil.Timeout, testutil.Interval).Should(BeNil())
Eventually(func() error {
return testK8sClient.Get(ctx, worker0Key, workerPod)
}, testutil.Timeout, testutil.Interval).Should(BeNil())
Eventually(func() error {
return testK8sClient.Get(ctx, masterKey, masterSvc)
}, testutil.Timeout, testutil.Interval).Should(BeNil())
Eventually(func() error {
return testK8sClient.Get(ctx, worker0Key, workerSvc)
}, testutil.Timeout, testutil.Interval).Should(BeNil())

By("Updating Pod's condition with running")
masterPod.Status.Phase = corev1.PodRunning
masterPod.ResourceVersion = ""
Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed())
workerPod.Status.Phase = corev1.PodRunning
workerPod.ResourceVersion = ""
Expect(testK8sClient.Status().Update(ctx, workerPod)).Should(Succeed())

By("Checking the PaddleJob has resumed conditions")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{
{
Type: kubeflowv1.JobCreated,
Status: corev1.ConditionTrue,
Reason: "PaddleJobCreated",
Message: fmt.Sprintf("PaddleJob %s is created.", name),
},
{
Type: kubeflowv1.JobSuspended,
Status: corev1.ConditionFalse,
Reason: commonutil.JobResumedReason,
Message: fmt.Sprintf("PaddleJob %s is resumed.", name),
},
{
Type: kubeflowv1.JobRunning,
Status: corev1.ConditionTrue,
Reason: commonutil.JobRunningReason,
Message: fmt.Sprintf("PaddleJob %s is running.", name),
},
}, testutil.IgnoreJobConditionsTimes))
})
It("Should delete resources after PaddleJob is suspended", func() {
By("By creating a new PaddleJob")
job.Spec.RunPolicy.CleanPodPolicy = &cleanPodPolicyAll
job.Spec.PaddleReplicaSpecs[kubeflowv1.PaddleJobReplicaTypeWorker].Replicas = pointer.Int32(1)
Expect(testK8sClient.Create(ctx, job)).Should(Succeed())

created := &kubeflowv1.PaddleJob{}
masterPod := &corev1.Pod{}
workerPod := &corev1.Pod{}
masterSvc := &corev1.Service{}
workerSvc := &corev1.Service{}

// We'll need to retry getting this newly created PaddleJob, given that creation may not immediately happen.
By("Checking created PaddleJob")
Eventually(func() bool {
err := testK8sClient.Get(ctx, jobKey, created)
return err == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Eventually(func() *metav1.Time {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.StartTime
}, testutil.Timeout, testutil.Interval).ShouldNot(BeNil())

By("Checking the created pods and services")
Eventually(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterPod)
errWorker := testK8sClient.Get(ctx, worker0Key, workerPod)
return errMaster == nil && errWorker == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Eventually(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterSvc)
errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc)
return errMaster == nil && errWorker == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())

By("Updating the pod's phase with Running")
masterPod.Status.Phase = corev1.PodRunning
workerPod.Status.Phase = corev1.PodRunning
masterPod.ResourceVersion = ""
workerPod.ResourceVersion = ""
Expect(testK8sClient.Status().Update(ctx, masterPod)).Should(Succeed())
Expect(testK8sClient.Status().Update(ctx, workerPod)).Should(Succeed())

By("Checking the PaddleJob's condition")
Eventually(func() []kubeflowv1.JobCondition {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.Conditions
}, testutil.Timeout, testutil.Interval).Should(BeComparableTo([]kubeflowv1.JobCondition{
{
Type: kubeflowv1.JobCreated,
Status: corev1.ConditionTrue,
Reason: "PaddleJobCreated",
Message: fmt.Sprintf("PaddleJob %s is created.", name),
},
{
Type: kubeflowv1.JobRunning,
Status: corev1.ConditionTrue,
Reason: commonutil.JobRunningReason,
Message: fmt.Sprintf("PaddleJob %s is running.", name),
},
}, testutil.IgnoreJobConditionsTimes))

By("Updating the PaddleJob with suspend=true")
created.Spec.RunPolicy.Suspend = pointer.Bool(true)
Expect(testK8sClient.Update(ctx, created)).Should(Succeed())

By("Checking if the pods and services are removed")
Eventually(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterPod)
errWorker := testK8sClient.Get(ctx, worker0Key, workerPod)
return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker)
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Consistently(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterPod)
errWorker := testK8sClient.Get(ctx, worker0Key, workerPod)
return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())
Eventually(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterSvc)
errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc)
return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker)
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Consistently(func() bool {
errMaster := testK8sClient.Get(ctx, masterKey, masterSvc)
errWorker := testK8sClient.Get(ctx, worker0Key, workerSvc)
return errors.IsNotFound(errMaster) && errors.IsNotFound(errWorker)
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())

By("Checking if the PaddleJob has a suspended condition")
Eventually(func() bool {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.ReplicaStatuses[kubeflowv1.PaddleJobReplicaTypeMaster].Active == 0 &&
created.Status.ReplicaStatuses[kubeflowv1.PaddleJobReplicaTypeWorker].Active == 0 &&
created.Status.StartTime == nil
}, testutil.Timeout, testutil.Interval).Should(BeTrue())
Consistently(func() bool {
Expect(testK8sClient.Get(ctx, jobKey, created)).Should(Succeed())
return created.Status.ReplicaStatuses[kubeflowv1.PaddleJobReplicaTypeMaster].Active == 0 &&
created.Status.ReplicaStatuses[kubeflowv1.PaddleJobReplicaTypeWorker].Active == 0 &&
created.Status.StartTime == nil
}, testutil.ConsistentDuration, testutil.Interval).Should(BeTrue())
Expect(created.Status.Conditions).Should(BeComparableTo([]kubeflowv1.JobCondition{
{
Type: kubeflowv1.JobCreated,
Status: corev1.ConditionTrue,
Reason: "PaddleJobCreated",
Message: fmt.Sprintf("PaddleJob %s is created.", name),
},
{
Type: kubeflowv1.JobRunning,
Status: corev1.ConditionFalse,
Reason: commonutil.JobSuspendedReason,
Message: fmt.Sprintf("PaddleJob %s is suspended.", name),
},
{
Type: kubeflowv1.JobSuspended,
Reason: commonutil.JobSuspendedReason,
Message: fmt.Sprintf("PaddleJob %s is suspended.", name),
Status: corev1.ConditionTrue,
},
}, testutil.IgnoreJobConditionsTimes))
})
})
})
Expand Down

0 comments on commit bc51719

Please sign in to comment.