diff --git a/config/crd/bases/batch.volcano.sh_jobs.yaml b/config/crd/bases/batch.volcano.sh_jobs.yaml index 0ac4f5222d..0a0a4e6cbe 100644 --- a/config/crd/bases/batch.volcano.sh_jobs.yaml +++ b/config/crd/bases/batch.volcano.sh_jobs.yaml @@ -81,6 +81,11 @@ spec: to the summary of tasks' replicas format: int32 type: integer + minSuccess: + description: The minimal success pods to run for this Job + format: int32 + minimum: 1 + type: integer plugins: additionalProperties: items: diff --git a/config/crd/v1beta1/batch.volcano.sh_jobs.yaml b/config/crd/v1beta1/batch.volcano.sh_jobs.yaml index 5a9efdbc56..d93e95f2c4 100644 --- a/config/crd/v1beta1/batch.volcano.sh_jobs.yaml +++ b/config/crd/v1beta1/batch.volcano.sh_jobs.yaml @@ -81,6 +81,11 @@ spec: to the summary of tasks' replicas format: int32 type: integer + minSuccess: + description: The minimal success pods to run for this Job + format: int32 + minimum: 1 + type: integer plugins: additionalProperties: items: diff --git a/installer/helm/chart/volcano/crd/bases/batch.volcano.sh_jobs.yaml b/installer/helm/chart/volcano/crd/bases/batch.volcano.sh_jobs.yaml index b9d6ed6a4f..70610b0183 100644 --- a/installer/helm/chart/volcano/crd/bases/batch.volcano.sh_jobs.yaml +++ b/installer/helm/chart/volcano/crd/bases/batch.volcano.sh_jobs.yaml @@ -79,6 +79,11 @@ spec: to the summary of tasks' replicas format: int32 type: integer + minSuccess: + description: The minimal success pods to run for this Job + format: int32 + minimum: 1 + type: integer plugins: additionalProperties: items: diff --git a/installer/helm/chart/volcano/crd/v1beta1/batch.volcano.sh_jobs.yaml b/installer/helm/chart/volcano/crd/v1beta1/batch.volcano.sh_jobs.yaml index e0f1dcff76..b931746b48 100644 --- a/installer/helm/chart/volcano/crd/v1beta1/batch.volcano.sh_jobs.yaml +++ b/installer/helm/chart/volcano/crd/v1beta1/batch.volcano.sh_jobs.yaml @@ -79,6 +79,11 @@ spec: to the summary of tasks' replicas format: int32 type: integer + minSuccess: + description: The minimal success pods to run for this Job + format: int32 + minimum: 1 + type: integer plugins: additionalProperties: items: diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 9818686a0c..acb7f9381b 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -224,6 +224,11 @@ spec: to the summary of tasks' replicas format: int32 type: integer + minSuccess: + description: The minimal success pods to run for this Job + format: int32 + minimum: 1 + type: integer plugins: additionalProperties: items: diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index 022b041827..3e269c7c3c 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -107,6 +107,11 @@ type JobSpec struct { // If specified, indicates the job's priority. // +optional PriorityClassName string `json:"priorityClassName,omitempty" protobuf:"bytes,10,opt,name=priorityClassName"` + + // The minimal success pods to run for this Job + // +kubebuilder:validation:Minimum=1 + // +optional + MinSuccess *int32 `json:"minSuccess,omitempty" protobuf:"varint,11,opt,name=minSuccess"` } // VolumeSpec defines the specification of Volume, e.g. PVC. diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index a608d3f390..d5b3c133dd 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -56,8 +56,17 @@ func (ps *runningState) Execute(action v1alpha1.Action) error { // when scale down to zero, keep the current job phase return false } + + minSuccess := ps.job.Job.Spec.MinSuccess + if minSuccess != nil && status.Succeeded >= *minSuccess { + status.State.Phase = vcbatch.Completed + return true + } + if status.Succeeded+status.Failed == jobReplicas { - if status.Succeeded >= ps.job.Job.Spec.MinAvailable { + if minSuccess != nil && status.Succeeded < *minSuccess { + status.State.Phase = vcbatch.Failed + } else if status.Succeeded >= ps.job.Job.Spec.MinAvailable { status.State.Phase = vcbatch.Completed } else { status.State.Phase = vcbatch.Failed diff --git a/test/e2e/jobp/min_success.go b/test/e2e/jobp/min_success.go new file mode 100644 index 0000000000..d5213296fe --- /dev/null +++ b/test/e2e/jobp/min_success.go @@ -0,0 +1,69 @@ +/* + Copyright 2021 The Volcano Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package jobp + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + vcbatch "volcano.sh/volcano/pkg/apis/batch/v1alpha1" + + e2eutil "volcano.sh/volcano/test/e2e/util" +) + +var _ = Describe("Check min success", func() { + It("Min Success", func() { + By("init test ctx") + ctx := e2eutil.InitTestContext(e2eutil.Options{}) + defer e2eutil.CleanupTestContext(ctx) + + jobName := "min-success-job" + By("create job") + var minSuccess int32 = 2 + job := e2eutil.CreateJob(ctx, &e2eutil.JobSpec{ + Name: jobName, + MinSuccess: &minSuccess, + Tasks: []e2eutil.TaskSpec{ + { + Name: "short", + Img: e2eutil.DefaultBusyBoxImage, + Min: 1, + Rep: 3, + Command: "sleep 3", + }, + { + Name: "log", + Img: e2eutil.DefaultBusyBoxImage, + Min: 1, + Rep: 2, + Command: "sleep 100000", + }, + }, + }) + + // job phase: pending -> running + err := e2eutil.WaitJobReady(ctx, job) + Expect(err).NotTo(HaveOccurred()) + + // wait short tasks completed + err = e2eutil.WaitTasksCompleted(ctx, job, minSuccess) + Expect(err).NotTo(HaveOccurred()) + + // wait job completed + err = e2eutil.WaitJobStates(ctx, job, []vcbatch.JobPhase{vcbatch.Completed}, e2eutil.OneMinute) + Expect(err).NotTo(HaveOccurred()) + }) +}) diff --git a/test/e2e/util/job.go b/test/e2e/util/job.go index 0e9dc63603..ed127d9046 100644 --- a/test/e2e/util/job.go +++ b/test/e2e/util/job.go @@ -66,7 +66,8 @@ type JobSpec struct { Volumes []batchv1alpha1.VolumeSpec NodeName string // ttl seconds after job finished - Ttl *int32 + Ttl *int32 + MinSuccess *int32 } func Namespace(context *TestContext, job *JobSpec) string { @@ -194,6 +195,7 @@ func CreateJobInner(ctx *TestContext, jobSpec *JobSpec) (*batchv1alpha1.Job, err Queue: jobSpec.Queue, Plugins: jobSpec.Plugins, TTLSecondsAfterFinished: jobSpec.Ttl, + MinSuccess: jobSpec.MinSuccess, }, } @@ -739,3 +741,34 @@ func IsPodScheduled(pod *v1.Pod) bool { } return false } + +func WaitTasksCompleted(ctx *TestContext, job *batchv1alpha1.Job, successNum int32) error { + var additionalError error + err := wait.Poll(100*time.Millisecond, TwoMinute, func() (bool, error) { + pods, err := ctx.Kubeclient.CoreV1().Pods(job.Namespace).List(context.TODO(), metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + + var succeeded int32 = 0 + for _, pod := range pods.Items { + if !metav1.IsControlledBy(&pod, job) { + continue + } + + if pod.Status.Phase == "Succeeded" { + succeeded++ + } + } + + ready := succeeded >= successNum + if !ready { + additionalError = fmt.Errorf("expected job '%s' to have %d succeeded pods, actual got %d", job.Name, + successNum, + succeeded) + } + return ready, nil + }) + if err != nil && strings.Contains(err.Error(), TimeOutMessage) { + return fmt.Errorf("[Wait time out]: %s", additionalError) + } + return err +}