diff --git a/apis/execution/v1alpha1/jobconfig_types.go b/apis/execution/v1alpha1/jobconfig_types.go index 2e4db01..efd7fc7 100644 --- a/apis/execution/v1alpha1/jobconfig_types.go +++ b/apis/execution/v1alpha1/jobconfig_types.go @@ -49,6 +49,22 @@ type JobConfigSpec struct { type ConcurrencySpec struct { // Policy describes how to treat concurrent executions of the same JobConfig. Policy ConcurrencyPolicy `json:"policy"` + + // Maximum number of Jobs that can be running concurrently for the same + // JobConfig. Cannot be specified if Policy is set to Allow. + // + // Defaults to 1. + // + // +optional + MaxConcurrency *int64 `json:"maxConcurrency,omitempty"` +} + +// GetMaxConcurrency returns the MaxConcurrency value if specified, otherwise defaults to 1. +func (c ConcurrencySpec) GetMaxConcurrency() int64 { + if c.MaxConcurrency != nil { + return *c.MaxConcurrency + } + return 1 } // ScheduleSpec defines how a JobConfig should be automatically scheduled. diff --git a/apis/execution/v1alpha1/zz_generated.deepcopy.go b/apis/execution/v1alpha1/zz_generated.deepcopy.go index 4c85d8a..de7220e 100644 --- a/apis/execution/v1alpha1/zz_generated.deepcopy.go +++ b/apis/execution/v1alpha1/zz_generated.deepcopy.go @@ -43,6 +43,11 @@ func (in *BoolOptionConfig) DeepCopy() *BoolOptionConfig { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ConcurrencySpec) DeepCopyInto(out *ConcurrencySpec) { *out = *in + if in.MaxConcurrency != nil { + in, out := &in.MaxConcurrency, &out.MaxConcurrency + *out = new(int64) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConcurrencySpec. @@ -281,7 +286,7 @@ func (in *JobConfigList) DeepCopyObject() runtime.Object { func (in *JobConfigSpec) DeepCopyInto(out *JobConfigSpec) { *out = *in in.Template.DeepCopyInto(&out.Template) - out.Concurrency = in.Concurrency + in.Concurrency.DeepCopyInto(&out.Concurrency) if in.Schedule != nil { in, out := &in.Schedule, &out.Schedule *out = new(ScheduleSpec) diff --git a/config/crd/bases/execution.furiko.io_jobconfigs.yaml b/config/crd/bases/execution.furiko.io_jobconfigs.yaml index fd5fb8f..12d25d4 100644 --- a/config/crd/bases/execution.furiko.io_jobconfigs.yaml +++ b/config/crd/bases/execution.furiko.io_jobconfigs.yaml @@ -59,6 +59,10 @@ spec: concurrency: description: Concurrency defines the behaviour of multiple concurrent Jobs. properties: + maxConcurrency: + description: "Maximum number of Jobs that can be running concurrently for the same JobConfig. Cannot be specified if Policy is set to Allow. \n Defaults to 1." + format: int64 + type: integer policy: description: Policy describes how to treat concurrent executions of the same JobConfig. type: string diff --git a/pkg/execution/controllers/croncontroller/reconciler.go b/pkg/execution/controllers/croncontroller/reconciler.go index 6f77ad7..89176eb 100644 --- a/pkg/execution/controllers/croncontroller/reconciler.go +++ b/pkg/execution/controllers/croncontroller/reconciler.go @@ -109,16 +109,17 @@ func (w *Reconciler) processCronForConfig(ctx context.Context, namespace, name s // Check concurrency policy. concurrencyPolicy := jobConfig.Spec.Concurrency.Policy + maxConcurrency := jobConfig.Spec.Concurrency.GetMaxConcurrency() // Handle Forbid concurrency policy. - if concurrencyPolicy == execution.ConcurrencyPolicyForbid && activeJobCount > 0 { + if concurrencyPolicy == execution.ConcurrencyPolicyForbid && activeJobCount+1 > maxConcurrency { w.recorder.SkippedJobSchedule(ctx, jobConfig, scheduleTime, "Skipped creating job due to concurrency policy Forbid") return nil } // Cannot enqueue beyond max queue length. - // TODO(irvinlim): We use the status here, which may not be fully up-to-date. + // NOTE(irvinlim): We use the status here, which may not be fully up-to-date. if max := cfg.MaxEnqueuedJobs; max != nil && jobConfig.Status.Queued >= *max { w.recorder.SkippedJobSchedule(ctx, jobConfig, scheduleTime, fmt.Sprintf("Skipped creating job, cannot exceed maximum queue length of %v", *max)) diff --git a/pkg/execution/controllers/croncontroller/reconciler_test.go b/pkg/execution/controllers/croncontroller/reconciler_test.go index 8136bb9..cf005d8 100644 --- a/pkg/execution/controllers/croncontroller/reconciler_test.go +++ b/pkg/execution/controllers/croncontroller/reconciler_test.go @@ -52,6 +52,14 @@ var ( }, }) + jobConfigForbidMax2 = makeJobConfig("job-config-forbid-max-2", execution.JobConfigSpec{ + Schedule: scheduleSpecEvery5Min, + Concurrency: execution.ConcurrencySpec{ + Policy: execution.ConcurrencyPolicyForbid, + MaxConcurrency: pointer.Int64(2), + }, + }) + jobConfigAllow = makeJobConfig("job-config-allow", execution.JobConfigSpec{ Schedule: scheduleSpecEvery5Min, Concurrency: execution.ConcurrencySpec{ @@ -189,6 +197,34 @@ func TestReconciler(t *testing.T) { }, wantNumCreated: 1, }, + { + name: "can create job for Forbid with 1 active and MaxConcurrency set to 2", + initialJobConfigs: []*execution.JobConfig{ + jobConfigForbidMax2, + }, + initialCounts: map[*execution.JobConfig]int64{ + jobConfigForbidMax2: 1, + }, + syncTarget: syncTarget{ + namespace: jobConfigForbidMax2.Namespace, + name: croncontroller.JoinJobConfigKeyName(jobConfigForbidMax2.Name, testutils.Mktime(scheduleTime)), + }, + wantNumCreated: 1, + }, + { + name: "skip create job for Forbid with 2 active and MaxConcurrency set to 2", + initialJobConfigs: []*execution.JobConfig{ + jobConfigForbidMax2, + }, + initialCounts: map[*execution.JobConfig]int64{ + jobConfigForbidMax2: 2, + }, + syncTarget: syncTarget{ + namespace: jobConfigForbidMax2.Namespace, + name: croncontroller.JoinJobConfigKeyName(jobConfigForbidMax2.Name, testutils.Mktime(scheduleTime)), + }, + wantSkipped: 1, + }, { name: "cannot create more than maxEnqueuedJobs", cfgs: controllercontext.ConfigsMap{ diff --git a/pkg/execution/controllers/jobqueuecontroller/control.go b/pkg/execution/controllers/jobqueuecontroller/control.go index df728b8..5791b0d 100644 --- a/pkg/execution/controllers/jobqueuecontroller/control.go +++ b/pkg/execution/controllers/jobqueuecontroller/control.go @@ -47,15 +47,10 @@ type JobControl struct { var _ JobControlInterface = (*JobControl)(nil) -func NewJobControl( - client executionv1alpha1.ExecutionV1alpha1Interface, - recorder record.EventRecorder, - name string, -) *JobControl { +func NewJobControl(client executionv1alpha1.ExecutionV1alpha1Interface, recorder record.EventRecorder) *JobControl { return &JobControl{ client: client, recorder: recorder, - name: name, } } @@ -69,7 +64,7 @@ func (c *JobControl) StartJob(ctx context.Context, rj *execution.Job) error { } klog.V(3).InfoS("jobqueuecontroller: started job", logvalues. - Values("worker", c.name, "namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()). + Values("namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()). Level(4, "job", updatedRj). Build()..., ) @@ -88,7 +83,7 @@ func (c *JobControl) RejectJob(ctx context.Context, rj *execution.Job, msg strin } klog.V(3).InfoS("jobqueuecontroller: rejected job", logvalues. - Values("worker", c.name, "namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()). + Values("namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()). Level(4, "job", updatedRj). Build()..., ) diff --git a/pkg/execution/controllers/jobqueuecontroller/controller.go b/pkg/execution/controllers/jobqueuecontroller/controller.go index faaf50b..e0cc3be 100644 --- a/pkg/execution/controllers/jobqueuecontroller/controller.go +++ b/pkg/execution/controllers/jobqueuecontroller/controller.go @@ -111,12 +111,14 @@ func NewController( terminate: cancel, } + jobControl := NewJobControl(ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(), ctrl.Context.recorder) + // Create multiple reconcilers. For Jobs that are not owned by a JobConfig, we // will simply start them as soon as they are created. Otherwise, we will // sequentially process Jobs on a per-JobConfig basis in each Reconciler // goroutine. - perConfigReconciler := NewPerConfigReconciler(ctrl.Context, concurrency) - independentReconciler := NewIndependentReconciler(ctrl.Context, concurrency) + perConfigReconciler := NewPerConfigReconciler(ctrl.Context, concurrency, jobControl) + independentReconciler := NewIndependentReconciler(ctrl.Context, concurrency, jobControl) ctrl.informerWorker = NewInformerWorker(ctrl.Context) ctrl.perConfigReconciler = reconciler.NewController(perConfigReconciler, ctrl.jobConfigQueue) diff --git a/pkg/execution/controllers/jobqueuecontroller/reconciler_independent.go b/pkg/execution/controllers/jobqueuecontroller/reconciler_independent.go index ed813be..e25864a 100644 --- a/pkg/execution/controllers/jobqueuecontroller/reconciler_independent.go +++ b/pkg/execution/controllers/jobqueuecontroller/reconciler_independent.go @@ -42,17 +42,16 @@ type IndependentReconciler struct { client JobControlInterface } -func NewIndependentReconciler(ctrlContext *Context, concurrency *configv1alpha1.Concurrency) *IndependentReconciler { - reconciler := &IndependentReconciler{ +func NewIndependentReconciler( + ctrlContext *Context, + concurrency *configv1alpha1.Concurrency, + client JobControlInterface, +) *IndependentReconciler { + return &IndependentReconciler{ Context: ctrlContext, concurrency: concurrency, + client: client, } - reconciler.client = NewJobControl( - ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(), - ctrlContext.recorder, - reconciler.Name(), - ) - return reconciler } func (r *IndependentReconciler) Name() string { diff --git a/pkg/execution/controllers/jobqueuecontroller/reconciler_independent_test.go b/pkg/execution/controllers/jobqueuecontroller/reconciler_independent_test.go index ebadec7..7ba60bb 100644 --- a/pkg/execution/controllers/jobqueuecontroller/reconciler_independent_test.go +++ b/pkg/execution/controllers/jobqueuecontroller/reconciler_independent_test.go @@ -37,6 +37,7 @@ func TestIndependentReconciler(t *testing.T) { return jobqueuecontroller.NewIndependentReconciler( c.(*jobqueuecontroller.Context), runtimetesting.ReconcilerDefaultConcurrency, + newMockJobControl(c.Clientsets().Furiko().ExecutionV1alpha1()), ) }, Now: testutils.Mktime(now), diff --git a/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig.go b/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig.go index 62f4bb6..ebf3915 100644 --- a/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig.go +++ b/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig.go @@ -48,17 +48,16 @@ type PerConfigReconciler struct { client JobControlInterface } -func NewPerConfigReconciler(ctrlContext *Context, concurrency *configv1alpha1.Concurrency) *PerConfigReconciler { - reconciler := &PerConfigReconciler{ +func NewPerConfigReconciler( + ctrlContext *Context, + concurrency *configv1alpha1.Concurrency, + client JobControlInterface, +) *PerConfigReconciler { + return &PerConfigReconciler{ Context: ctrlContext, concurrency: concurrency, + client: client, } - reconciler.client = NewJobControl( - ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(), - ctrlContext.recorder, - reconciler.Name(), - ) - return reconciler } func (w *PerConfigReconciler) Name() string { @@ -175,8 +174,10 @@ func (w *PerConfigReconciler) canStartJob( return false, nil } - // There are concurrent jobs and we should immediately reject the job. - if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyForbid && activeCount > 0 { + maxConcurrency := rjc.Spec.Concurrency.GetMaxConcurrency() + + // If it exceeds the max concurrency, Forbid will immediately reject the job. + if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyForbid && activeCount+1 > maxConcurrency { msg := fmt.Sprintf("Cannot start new Job, %v has %v active Jobs but concurrency policy is %v", rjc.Name, activeCount, spec.ConcurrencyPolicy) if err := w.client.RejectJob(ctx, rj, msg); err != nil { @@ -193,8 +194,8 @@ func (w *PerConfigReconciler) canStartJob( return false, nil } - // There are concurrent jobs and we should wait. - if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyEnqueue && activeCount > 0 { + // If it exceeds the max concurrency, Enqueue will wait. + if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyEnqueue && activeCount+1 > maxConcurrency { return false, nil } } diff --git a/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig_test.go b/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig_test.go index 0745e52..e03485b 100644 --- a/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig_test.go +++ b/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig_test.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" + execution "github.com/furiko-io/furiko/apis/execution/v1alpha1" "github.com/furiko-io/furiko/pkg/execution/controllers/jobqueuecontroller" "github.com/furiko-io/furiko/pkg/execution/stores/activejobstore" "github.com/furiko-io/furiko/pkg/runtime/controllercontext" @@ -40,6 +41,7 @@ func TestPerJobConfigReconciler(t *testing.T) { return jobqueuecontroller.NewPerConfigReconciler( c.(*jobqueuecontroller.Context), runtimetesting.ReconcilerDefaultConcurrency, + newMockJobControl(c.Clientsets().Furiko().ExecutionV1alpha1()), ) }, Now: testutils.Mktime(now), @@ -112,5 +114,87 @@ func TestPerJobConfigReconciler(t *testing.T) { }, }, }, + { + Name: "reject second job with Forbid", + Fixtures: []runtime.Object{ + jobForForbid1, + jobForForbid2, + }, + Target: jobConfigForbid, + WantActions: runtimetesting.CombinedActions{ + Furiko: runtimetesting.ActionTest{ + Actions: []runtimetesting.Action{ + runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)), + runtimetesting.NewUpdateJobAction(jobNamespace, rejectJob(jobForForbid2)), + }, + }, + }, + }, + { + Name: "wait for second job with Enqueue", + Fixtures: []runtime.Object{ + jobForForbid1, + makeJob("job2", 1, jobConfigForbid, execution.ConcurrencyPolicyEnqueue, nil), + }, + Target: jobConfigForbid, + WantActions: runtimetesting.CombinedActions{ + Furiko: runtimetesting.ActionTest{ + Actions: []runtimetesting.Action{ + runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)), + }, + }, + }, + }, + { + Name: "will skip first job with StartAfter", + Fixtures: []runtime.Object{ + makeJob("job1", 0, jobConfigForbid, execution.ConcurrencyPolicyForbid, testutils.Mkmtimep(startAfter)), + jobForForbid2, + }, + Target: jobConfigForbid, + WantActions: runtimetesting.CombinedActions{ + Furiko: runtimetesting.ActionTest{ + Actions: []runtimetesting.Action{ + runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid2, timeNow)), + }, + }, + }, + }, + { + Name: "can start both jobs with MaxConcurrency set to 2", + Fixtures: []runtime.Object{ + jobForForbid1, + jobForForbid2, + }, + Target: jobConfigForbidMax2, + WantActions: runtimetesting.CombinedActions{ + Furiko: runtimetesting.ActionTest{ + Actions: []runtimetesting.Action{ + runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)), + runtimetesting.NewUpdateJobAction(jobNamespace, startJob(jobForForbid2, timeNow)), + }, + }, + }, + }, + { + Name: "reject 3rd job with MaxConcurrency set to 2", + Fixtures: []runtime.Object{ + jobForForbid1, + jobForForbid2, + makeJob("job3", 2, jobConfigForbid, execution.ConcurrencyPolicyForbid, nil), + }, + Target: jobConfigForbidMax2, + WantActions: runtimetesting.CombinedActions{ + Furiko: runtimetesting.ActionTest{ + Actions: []runtimetesting.Action{ + runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)), + runtimetesting.NewUpdateJobAction(jobNamespace, startJob(jobForForbid2, timeNow)), + runtimetesting.NewUpdateJobAction(jobNamespace, rejectJob( + makeJob("job3", 2, jobConfigForbid, execution.ConcurrencyPolicyForbid, nil), + )), + }, + }, + }, + }, }) } diff --git a/pkg/execution/controllers/jobqueuecontroller/reconciler_test.go b/pkg/execution/controllers/jobqueuecontroller/reconciler_test.go index 6a60026..5792164 100644 --- a/pkg/execution/controllers/jobqueuecontroller/reconciler_test.go +++ b/pkg/execution/controllers/jobqueuecontroller/reconciler_test.go @@ -17,11 +17,19 @@ package jobqueuecontroller_test import ( + "context" + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" executiongroup "github.com/furiko-io/furiko/apis/execution" execution "github.com/furiko-io/furiko/apis/execution/v1alpha1" + "github.com/furiko-io/furiko/pkg/execution/controllers/jobqueuecontroller" + jobutil "github.com/furiko-io/furiko/pkg/execution/util/job" "github.com/furiko-io/furiko/pkg/execution/util/jobconfig" + executionv1alpha1 "github.com/furiko-io/furiko/pkg/generated/clientset/versioned/typed/execution/v1alpha1" + "github.com/furiko-io/furiko/pkg/utils/ktime" "github.com/furiko-io/furiko/pkg/utils/testutils" ) @@ -30,15 +38,18 @@ const ( createTime = "2021-02-09T04:06:00Z" now = "2021-02-09T04:06:05Z" startAfter = "2021-02-09T05:00:00Z" - uid1 = "0ed1bc76-07ca-4cf7-9a47-a0cc4aec48b9" - uid2 = "6e08ee33-ccbe-4fc5-9c46-e29c19cc2fcb" + + uidJobConfig = "0ed1bc76-07ca-4cf7-9a47-a0cc4aec48b9" + uidForbid = "25b6cc22-a0c0-4f55-97b8-b798769d462b" + uidNonexistent = "6e08ee33-ccbe-4fc5-9c46-e29c19cc2fcb" ) var ( timeNow = testutils.Mkmtimep(now) startPolicy = &execution.StartPolicySpec{ - StartAfter: testutils.Mkmtimep(startAfter), + ConcurrencyPolicy: execution.ConcurrencyPolicyAllow, + StartAfter: testutils.Mkmtimep(startAfter), } jobToBeStarted = &execution.Job{ @@ -68,12 +79,39 @@ var ( jobConfig1 = &execution.JobConfig{ ObjectMeta: metav1.ObjectMeta{ - UID: uid1, + UID: uidJobConfig, Namespace: jobNamespace, Name: "job-config-1", }, } + jobConfigForbid = &execution.JobConfig{ + ObjectMeta: metav1.ObjectMeta{ + UID: uidForbid, + Namespace: jobNamespace, + Name: "job-config-forbid", + }, + Spec: execution.JobConfigSpec{ + Concurrency: execution.ConcurrencySpec{ + Policy: execution.ConcurrencyPolicyForbid, + }, + }, + } + + jobConfigForbidMax2 = &execution.JobConfig{ + ObjectMeta: metav1.ObjectMeta{ + UID: uidForbid, + Namespace: jobNamespace, + Name: "job-config-forbid", + }, + Spec: execution.JobConfigSpec{ + Concurrency: execution.ConcurrencySpec{ + Policy: execution.ConcurrencyPolicyForbid, + MaxConcurrency: pointer.Int64(2), + }, + }, + } + jobForConfig1ToBeStarted = &execution.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "job-for-config-1-to-be-started", @@ -83,7 +121,7 @@ var ( executiongroup.DeleteDependentsFinalizer, }, Labels: map[string]string{ - jobconfig.LabelKeyJobConfigUID: uid1, + jobconfig.LabelKeyJobConfigUID: uidJobConfig, }, }, } @@ -97,7 +135,7 @@ var ( executiongroup.DeleteDependentsFinalizer, }, Labels: map[string]string{ - jobconfig.LabelKeyJobConfigUID: uid2, + jobconfig.LabelKeyJobConfigUID: uidNonexistent, }, }, } @@ -111,17 +149,81 @@ var ( executiongroup.DeleteDependentsFinalizer, }, Labels: map[string]string{ - jobconfig.LabelKeyJobConfigUID: uid1, + jobconfig.LabelKeyJobConfigUID: uidJobConfig, }, }, Spec: execution.JobSpec{ StartPolicy: startPolicy, }, } + + jobForForbid1 = makeJob("job-for-forbid-1", 0, jobConfigForbid, execution.ConcurrencyPolicyForbid, nil) + jobForForbid2 = makeJob("job-for-forbid-2", 1, jobConfigForbid, execution.ConcurrencyPolicyForbid, nil) ) +func makeJob( + name string, + index int, + jobConfig *execution.JobConfig, + cp execution.ConcurrencyPolicy, + startAfter *metav1.Time, +) *execution.Job { + return &execution.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: jobNamespace, + // NOTE(irvinlim): We use a staggered CreationTimestamp because the reconciler + // orders Jobs using this field internally, so we can trigger sync with + // deterministic order. + CreationTimestamp: metav1.NewTime(testutils.Mkmtime(createTime).Add(time.Second * time.Duration(index))), + Finalizers: []string{ + executiongroup.DeleteDependentsFinalizer, + }, + Labels: map[string]string{ + jobconfig.LabelKeyJobConfigUID: string(jobConfig.UID), + }, + }, + Spec: execution.JobSpec{ + StartPolicy: &execution.StartPolicySpec{ + ConcurrencyPolicy: cp, + StartAfter: startAfter, + }, + }, + } +} + func startJob(job *execution.Job, now *metav1.Time) *execution.Job { newJob := job.DeepCopy() newJob.Status.StartTime = now return newJob } + +func rejectJob(job *execution.Job) *execution.Job { + newJob := job.DeepCopy() + jobutil.MarkAdmissionError(newJob, "rejected") + return newJob +} + +type mockJobControl struct { + client executionv1alpha1.ExecutionV1alpha1Interface +} + +func newMockJobControl(client executionv1alpha1.ExecutionV1alpha1Interface) *mockJobControl { + return &mockJobControl{ + client: client, + } +} + +var _ jobqueuecontroller.JobControlInterface = (*mockJobControl)(nil) + +func (m *mockJobControl) StartJob(ctx context.Context, rj *execution.Job) error { + newJob := startJob(rj, ktime.Now()) + _, err := m.client.Jobs(rj.Namespace).Update(ctx, newJob, metav1.UpdateOptions{}) + return err +} + +func (m *mockJobControl) RejectJob(ctx context.Context, rj *execution.Job, _ string) error { + newJob := rejectJob(rj) + _, err := m.client.Jobs(rj.Namespace).Update(ctx, newJob, metav1.UpdateOptions{}) + return err +} diff --git a/pkg/execution/validation/validation.go b/pkg/execution/validation/validation.go index 33a4e1e..6117937 100644 --- a/pkg/execution/validation/validation.go +++ b/pkg/execution/validation/validation.go @@ -135,15 +135,17 @@ func (v *Validator) validateJobCreateWithJobConfig( ) field.ErrorList { allErrs := field.ErrorList{} + maxConcurrency := rjc.Spec.Concurrency.GetMaxConcurrency() + // Reject Job if ConcurrencyPolicyForbid and JobConfig has active Jobs. if spec := rj.Spec.StartPolicy; spec != nil && rjc != nil && spec.ConcurrencyPolicy == v1alpha1.ConcurrencyPolicyForbid && - rjc.Status.Active > 0 { + rjc.Status.Active+1 > maxConcurrency { allErrs = append(allErrs, field.Forbidden( field.NewPath("spec.startPolicy.concurrencyPolicy"), fmt.Sprintf( - "cannot create new Job for JobConfig %v, concurrencyPolicy is Forbid but there are %v active jobs", - rjc.Name, rjc.Status.Active, + "%v currently has %v active job(s), but concurrency policy forbids exceeding maximum concurrency of %v", + rjc.Name, rjc.Status.Active, maxConcurrency, ), )) } @@ -209,6 +211,13 @@ func (v *Validator) ValidateJobTemplate(spec *v1alpha1.JobTemplateSpec, fldPath func (v *Validator) ValidateConcurrencySpec(spec v1alpha1.ConcurrencySpec, fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} allErrs = append(allErrs, v.ValidateConcurrencyPolicy(spec.Policy, fldPath.Child("policy"))...) + if spec.MaxConcurrency != nil { + fldPath := fldPath.Child("maxConcurrency") + allErrs = append(allErrs, validation.ValidateGT(*spec.MaxConcurrency, 0, fldPath)...) + if spec.Policy == v1alpha1.ConcurrencyPolicyAllow { + allErrs = append(allErrs, field.Forbidden(fldPath, "cannot specify maxConcurrency with Allow")) + } + } return allErrs } diff --git a/pkg/execution/validation/validation_test.go b/pkg/execution/validation/validation_test.go index f68d793..f057724 100644 --- a/pkg/execution/validation/validation_test.go +++ b/pkg/execution/validation/validation_test.go @@ -307,6 +307,32 @@ func TestValidateJobConfig(t *testing.T) { }, wantErr: "spec.concurrency.policy: Unsupported value: \"invalid\"", }, + { + name: "invalid concurrency.maxConcurrency", + rjc: &v1alpha1.JobConfig{ + Spec: v1alpha1.JobConfigSpec{ + Template: jobTemplateSpecBasic, + Concurrency: v1alpha1.ConcurrencySpec{ + Policy: v1alpha1.ConcurrencyPolicyForbid, + MaxConcurrency: pointer.Int64(0), + }, + }, + }, + wantErr: "spec.concurrency.maxConcurrency: Invalid value: 0", + }, + { + name: "cannot use concurrency.maxConcurrency with Allow", + rjc: &v1alpha1.JobConfig{ + Spec: v1alpha1.JobConfigSpec{ + Template: jobTemplateSpecBasic, + Concurrency: v1alpha1.ConcurrencySpec{ + Policy: v1alpha1.ConcurrencyPolicyAllow, + MaxConcurrency: pointer.Int64(3), + }, + }, + }, + wantErr: "spec.concurrency.maxConcurrency: Forbidden: cannot specify maxConcurrency with Allow", + }, { name: "schedule without any schedule types", rjc: &v1alpha1.JobConfig{ @@ -1205,7 +1231,7 @@ func TestValidateJobCreate(t *testing.T) { }, }, }, - wantErr: "spec.startPolicy.concurrencyPolicy: Forbidden: cannot create new Job for JobConfig jobconfig-sample, concurrencyPolicy is Forbid but there are 5 active jobs", + wantErr: "spec.startPolicy.concurrencyPolicy: Forbidden: jobconfig-sample currently has 5 active job(s), but concurrency policy forbids exceeding maximum concurrency of 1", }, { name: "can create Job with startPolicy.concurrencyPolicy Forbid with no Active", @@ -1255,6 +1281,61 @@ func TestValidateJobCreate(t *testing.T) { }, wantErr: "spec.startPolicy: Forbidden: cannot create new Job for JobConfig jobconfig-sample, which would exceed maximum queue length of 5", }, + { + name: "can create Job with concurrencyPolicy Forbid with Active and maxConcurrency", + rj: &v1alpha1.Job{ + ObjectMeta: objectMetaJobWithAllReferences, + Spec: v1alpha1.JobSpec{ + Type: v1alpha1.JobTypeAdhoc, + Template: &jobTemplateSpecBasic.Spec, + StartPolicy: &v1alpha1.StartPolicySpec{ + ConcurrencyPolicy: v1alpha1.ConcurrencyPolicyForbid, + }, + }, + }, + rjcs: []*v1alpha1.JobConfig{ + { + ObjectMeta: objectMetaJobConfig, + Spec: v1alpha1.JobConfigSpec{ + Concurrency: v1alpha1.ConcurrencySpec{ + Policy: v1alpha1.ConcurrencyPolicyForbid, + MaxConcurrency: pointer.Int64(3), + }, + }, + Status: v1alpha1.JobConfigStatus{ + Active: 1, + }, + }, + }, + }, + { + name: "cannot create Job with concurrencyPolicy Forbid with Active and exceed maxConcurrency", + rj: &v1alpha1.Job{ + ObjectMeta: objectMetaJobWithAllReferences, + Spec: v1alpha1.JobSpec{ + Type: v1alpha1.JobTypeAdhoc, + Template: &jobTemplateSpecBasic.Spec, + StartPolicy: &v1alpha1.StartPolicySpec{ + ConcurrencyPolicy: v1alpha1.ConcurrencyPolicyForbid, + }, + }, + }, + rjcs: []*v1alpha1.JobConfig{ + { + ObjectMeta: objectMetaJobConfig, + Spec: v1alpha1.JobConfigSpec{ + Concurrency: v1alpha1.ConcurrencySpec{ + Policy: v1alpha1.ConcurrencyPolicyForbid, + MaxConcurrency: pointer.Int64(3), + }, + }, + Status: v1alpha1.JobConfigStatus{ + Active: 3, + }, + }, + }, + wantErr: "spec.startPolicy.concurrencyPolicy: Forbidden: jobconfig-sample currently has 3 active job(s), but concurrency policy forbids exceeding maximum concurrency of 3", + }, } for _, tt := range tests { tt := tt