diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index 6cf0fa34b6..a608d3f390 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -51,7 +51,12 @@ func (ps *runningState) Execute(action v1alpha1.Action) error { }) default: return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool { - if status.Succeeded+status.Failed == TotalTasks(ps.job.Job) { + jobReplicas := TotalTasks(ps.job.Job) + if jobReplicas == 0 { + // when scale down to zero, keep the current job phase + return false + } + if status.Succeeded+status.Failed == jobReplicas { if status.Succeeded >= ps.job.Job.Spec.MinAvailable { status.State.Phase = vcbatch.Completed } else { diff --git a/pkg/webhooks/admission/jobs/validate/admit_job.go b/pkg/webhooks/admission/jobs/validate/admit_job.go index 2b2c0469dc..9a70a53f7a 100644 --- a/pkg/webhooks/admission/jobs/validate/admit_job.go +++ b/pkg/webhooks/admission/jobs/validate/admit_job.go @@ -112,9 +112,9 @@ func validateJobCreate(job *v1alpha1.Job, reviewResponse *v1beta1.AdmissionRespo taskNames := map[string]string{} var totalReplicas int32 - if job.Spec.MinAvailable <= 0 { + if job.Spec.MinAvailable < 0 { reviewResponse.Allowed = false - return fmt.Sprintf("'minAvailable' must be > 0.") + return fmt.Sprintf("'minAvailable' must be >= 0.") } if job.Spec.MaxRetry < 0 { @@ -212,8 +212,8 @@ func validateJobUpdate(old, new *v1alpha1.Job) error { if new.Spec.MinAvailable > totalReplicas { return fmt.Errorf("'minAvailable' must not be greater than total replicas") } - if new.Spec.MinAvailable <= 0 { - return fmt.Errorf("'minAvailable' must be > 0") + if new.Spec.MinAvailable < 0 { + return fmt.Errorf("'minAvailable' must be >= 0") } if len(old.Spec.Tasks) != len(new.Spec.Tasks) { diff --git a/pkg/webhooks/admission/jobs/validate/admit_job_test.go b/pkg/webhooks/admission/jobs/validate/admit_job_test.go index 2de357009a..ae384a5d44 100644 --- a/pkg/webhooks/admission/jobs/validate/admit_job_test.go +++ b/pkg/webhooks/admission/jobs/validate/admit_job_test.go @@ -304,7 +304,7 @@ func TestValidateJobCreate(t *testing.T) { Namespace: namespace, }, Spec: v1alpha1.JobSpec{ - MinAvailable: 0, + MinAvailable: -1, Queue: "default", Tasks: []v1alpha1.TaskSpec{ { @@ -328,7 +328,7 @@ func TestValidateJobCreate(t *testing.T) { }, }, reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, - ret: "'minAvailable' must be > 0", + ret: "'minAvailable' must be >= 0", ExpectErr: true, }, // maxretry less than zero @@ -1120,7 +1120,7 @@ func TestValidateJobUpdate(t *testing.T) { { name: "invalid minAvailable", replicas: 4, - minAvailable: 0, + minAvailable: -1, addTask: false, mutateTaskName: false, mutateSpec: false, diff --git a/test/e2e/job_scale_up_down.go b/test/e2e/job_scale_up_down.go index 796dcb0bba..92d1f5d8da 100644 --- a/test/e2e/job_scale_up_down.go +++ b/test/e2e/job_scale_up_down.go @@ -112,7 +112,7 @@ var _ = Describe("Dynamic Job scale up and down", func() { err := waitJobReady(context, job) Expect(err).NotTo(HaveOccurred()) - // scale up + // scale down job.Spec.MinAvailable = 1 job.Spec.Tasks[0].Replicas = 1 err = updateJob(context, job) @@ -142,4 +142,75 @@ var _ = Describe("Dynamic Job scale up and down", func() { }) + It("Scale down to zero and scale up", func() { + By("init test ctx") + ctx := initTestContext(options{}) + defer cleanupTestContext(ctx) + + jobName := "scale-down-job" + By("create job") + job := createJob(ctx, &jobSpec{ + name: jobName, + plugins: map[string][]string{ + "svc": {}, + }, + tasks: []taskSpec{ + { + name: "default", + img: defaultNginxImage, + min: 2, + rep: 2, + req: halfCPU, + }, + }, + }) + + // job phase: pending -> running + err := waitJobReady(ctx, job) + Expect(err).NotTo(HaveOccurred()) + + // scale down + job.Spec.MinAvailable = 0 + job.Spec.Tasks[0].Replicas = 0 + err = updateJob(ctx, job) + Expect(err).NotTo(HaveOccurred()) + + // wait for tasks scaled up + err = waitJobReady(ctx, job) + Expect(err).NotTo(HaveOccurred()) + + // check configmap updated + pluginName := fmt.Sprintf("%s-svc", jobName) + cm, err := ctx.kubeclient.CoreV1().ConfigMaps(ctx.namespace).Get(pluginName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + hosts := svc.GenerateHosts(job) + Expect(hosts).To(Equal(cm.Data)) + + // scale up + job.Spec.MinAvailable = 2 + job.Spec.Tasks[0].Replicas = 2 + err = updateJob(ctx, job) + Expect(err).NotTo(HaveOccurred()) + + // wait for tasks scaled up + err = waitJobReady(ctx, job) + Expect(err).NotTo(HaveOccurred()) + + // check configmap updated + cm, err = ctx.kubeclient.CoreV1().ConfigMaps(ctx.namespace).Get(pluginName, metav1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + hosts = svc.GenerateHosts(job) + Expect(hosts).To(Equal(cm.Data)) + + // TODO: check others + + By("delete job") + err = ctx.vcclient.BatchV1alpha1().Jobs(job.Namespace).Delete(job.Name, nil) + Expect(err).NotTo(HaveOccurred()) + + err = waitJobCleanedUp(ctx, job) + Expect(err).NotTo(HaveOccurred()) + }) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index f56f1b8708..1a0f9d0f1e 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -394,8 +394,9 @@ func updateJob(context *context, job *batchv1alpha1.Job) error { if err != nil { return err } - patchBytes := []byte(fmt.Sprintf(`{"spec":%s}`, spec)) - _, err = context.vcclient.BatchV1alpha1().Jobs(job.Namespace).Patch(job.Name, types.MergePatchType, patchBytes) + patch := fmt.Sprintf(`[{"op": "replace", "path": "/spec", "value":%s}]`, spec) + patchBytes := []byte(patch) + _, err = context.vcclient.BatchV1alpha1().Jobs(job.Namespace).Patch(job.Name, types.JSONPatchType, patchBytes) return err }