Skip to content

Commit

Permalink
feat(concurrency): Support variable MaxConcurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
irvinlim committed Jun 7, 2022
1 parent bdc131a commit 7cbf420
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 40 deletions.
16 changes: 16 additions & 0 deletions apis/execution/v1alpha1/jobconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ type JobConfigSpec struct {
type ConcurrencySpec struct {
// Policy describes how to treat concurrent executions of the same JobConfig.
Policy ConcurrencyPolicy `json:"policy"`

// Maximum number of Jobs that can be running concurrently for the same
// JobConfig. Cannot be specified if Policy is set to Allow.
//
// Defaults to 1.
//
// +optional
MaxConcurrency *int64 `json:"maxConcurrency,omitempty"`
}

// GetMaxConcurrency returns the MaxConcurrency value if specified, otherwise defaults to 1.
func (c ConcurrencySpec) GetMaxConcurrency() int64 {
if c.MaxConcurrency != nil {
return *c.MaxConcurrency
}
return 1
}

// ScheduleSpec defines how a JobConfig should be automatically scheduled.
Expand Down
7 changes: 6 additions & 1 deletion apis/execution/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/execution/controllers/croncontroller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ func (w *Reconciler) processCronForConfig(ctx context.Context, namespace, name s

// Check concurrency policy.
concurrencyPolicy := jobConfig.Spec.Concurrency.Policy
maxConcurrency := jobConfig.Spec.Concurrency.GetMaxConcurrency()

// Handle Forbid concurrency policy.
if concurrencyPolicy == execution.ConcurrencyPolicyForbid && activeJobCount > 0 {
if concurrencyPolicy == execution.ConcurrencyPolicyForbid && activeJobCount+1 > maxConcurrency {
w.recorder.SkippedJobSchedule(ctx, jobConfig, scheduleTime,
"Skipped creating job due to concurrency policy Forbid")
return nil
}

// Cannot enqueue beyond max queue length.
// TODO(irvinlim): We use the status here, which may not be fully up-to-date.
// NOTE(irvinlim): We use the status here, which may not be fully up-to-date.
if max := cfg.MaxEnqueuedJobs; max != nil && jobConfig.Status.Queued >= *max {
w.recorder.SkippedJobSchedule(ctx, jobConfig, scheduleTime,
fmt.Sprintf("Skipped creating job, cannot exceed maximum queue length of %v", *max))
Expand Down
36 changes: 36 additions & 0 deletions pkg/execution/controllers/croncontroller/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ var (
},
})

jobConfigForbidMax2 = makeJobConfig("job-config-forbid-max-2", execution.JobConfigSpec{
Schedule: scheduleSpecEvery5Min,
Concurrency: execution.ConcurrencySpec{
Policy: execution.ConcurrencyPolicyForbid,
MaxConcurrency: pointer.Int64(2),
},
})

jobConfigAllow = makeJobConfig("job-config-allow", execution.JobConfigSpec{
Schedule: scheduleSpecEvery5Min,
Concurrency: execution.ConcurrencySpec{
Expand Down Expand Up @@ -189,6 +197,34 @@ func TestReconciler(t *testing.T) {
},
wantNumCreated: 1,
},
{
name: "can create job for Forbid with 1 active and MaxConcurrency set to 2",
initialJobConfigs: []*execution.JobConfig{
jobConfigForbidMax2,
},
initialCounts: map[*execution.JobConfig]int64{
jobConfigForbidMax2: 1,
},
syncTarget: syncTarget{
namespace: jobConfigForbidMax2.Namespace,
name: croncontroller.JoinJobConfigKeyName(jobConfigForbidMax2.Name, testutils.Mktime(scheduleTime)),
},
wantNumCreated: 1,
},
{
name: "skip create job for Forbid with 2 active and MaxConcurrency set to 2",
initialJobConfigs: []*execution.JobConfig{
jobConfigForbidMax2,
},
initialCounts: map[*execution.JobConfig]int64{
jobConfigForbidMax2: 2,
},
syncTarget: syncTarget{
namespace: jobConfigForbidMax2.Namespace,
name: croncontroller.JoinJobConfigKeyName(jobConfigForbidMax2.Name, testutils.Mktime(scheduleTime)),
},
wantSkipped: 1,
},
{
name: "cannot create more than maxEnqueuedJobs",
cfgs: controllercontext.ConfigsMap{
Expand Down
11 changes: 3 additions & 8 deletions pkg/execution/controllers/jobqueuecontroller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@ type JobControl struct {

var _ JobControlInterface = (*JobControl)(nil)

func NewJobControl(
client executionv1alpha1.ExecutionV1alpha1Interface,
recorder record.EventRecorder,
name string,
) *JobControl {
func NewJobControl(client executionv1alpha1.ExecutionV1alpha1Interface, recorder record.EventRecorder) *JobControl {
return &JobControl{
client: client,
recorder: recorder,
name: name,
}
}

Expand All @@ -69,7 +64,7 @@ func (c *JobControl) StartJob(ctx context.Context, rj *execution.Job) error {
}

klog.V(3).InfoS("jobqueuecontroller: started job", logvalues.
Values("worker", c.name, "namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()).
Values("namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()).
Level(4, "job", updatedRj).
Build()...,
)
Expand All @@ -88,7 +83,7 @@ func (c *JobControl) RejectJob(ctx context.Context, rj *execution.Job, msg strin
}

klog.V(3).InfoS("jobqueuecontroller: rejected job", logvalues.
Values("worker", c.name, "namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()).
Values("namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()).
Level(4, "job", updatedRj).
Build()...,
)
Expand Down
6 changes: 4 additions & 2 deletions pkg/execution/controllers/jobqueuecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ func NewController(
terminate: cancel,
}

jobControl := NewJobControl(ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(), ctrl.Context.recorder)

// Create multiple reconcilers. For Jobs that are not owned by a JobConfig, we
// will simply start them as soon as they are created. Otherwise, we will
// sequentially process Jobs on a per-JobConfig basis in each Reconciler
// goroutine.
perConfigReconciler := NewPerConfigReconciler(ctrl.Context, concurrency)
independentReconciler := NewIndependentReconciler(ctrl.Context, concurrency)
perConfigReconciler := NewPerConfigReconciler(ctrl.Context, concurrency, jobControl)
independentReconciler := NewIndependentReconciler(ctrl.Context, concurrency, jobControl)

ctrl.informerWorker = NewInformerWorker(ctrl.Context)
ctrl.perConfigReconciler = reconciler.NewController(perConfigReconciler, ctrl.jobConfigQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@ type IndependentReconciler struct {
client JobControlInterface
}

func NewIndependentReconciler(ctrlContext *Context, concurrency *configv1alpha1.Concurrency) *IndependentReconciler {
reconciler := &IndependentReconciler{
func NewIndependentReconciler(
ctrlContext *Context,
concurrency *configv1alpha1.Concurrency,
client JobControlInterface,
) *IndependentReconciler {
return &IndependentReconciler{
Context: ctrlContext,
concurrency: concurrency,
client: client,
}
reconciler.client = NewJobControl(
ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(),
ctrlContext.recorder,
reconciler.Name(),
)
return reconciler
}

func (r *IndependentReconciler) Name() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestIndependentReconciler(t *testing.T) {
return jobqueuecontroller.NewIndependentReconciler(
c.(*jobqueuecontroller.Context),
runtimetesting.ReconcilerDefaultConcurrency,
newMockJobControl(c.Clientsets().Furiko().ExecutionV1alpha1()),
)
},
Now: testutils.Mktime(now),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,16 @@ type PerConfigReconciler struct {
client JobControlInterface
}

func NewPerConfigReconciler(ctrlContext *Context, concurrency *configv1alpha1.Concurrency) *PerConfigReconciler {
reconciler := &PerConfigReconciler{
func NewPerConfigReconciler(
ctrlContext *Context,
concurrency *configv1alpha1.Concurrency,
client JobControlInterface,
) *PerConfigReconciler {
return &PerConfigReconciler{
Context: ctrlContext,
concurrency: concurrency,
client: client,
}
reconciler.client = NewJobControl(
ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(),
ctrlContext.recorder,
reconciler.Name(),
)
return reconciler
}

func (w *PerConfigReconciler) Name() string {
Expand Down Expand Up @@ -175,8 +174,10 @@ func (w *PerConfigReconciler) canStartJob(
return false, nil
}

// There are concurrent jobs and we should immediately reject the job.
if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyForbid && activeCount > 0 {
maxConcurrency := rjc.Spec.Concurrency.GetMaxConcurrency()

// If it exceeds the max concurrency, Forbid will immediately reject the job.
if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyForbid && activeCount+1 > maxConcurrency {
msg := fmt.Sprintf("Cannot start new Job, %v has %v active Jobs but concurrency policy is %v",
rjc.Name, activeCount, spec.ConcurrencyPolicy)
if err := w.client.RejectJob(ctx, rj, msg); err != nil {
Expand All @@ -193,8 +194,8 @@ func (w *PerConfigReconciler) canStartJob(
return false, nil
}

// There are concurrent jobs and we should wait.
if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyEnqueue && activeCount > 0 {
// If it exceeds the max concurrency, Enqueue will wait.
if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyEnqueue && activeCount+1 > maxConcurrency {
return false, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"

execution "github.com/furiko-io/furiko/apis/execution/v1alpha1"
"github.com/furiko-io/furiko/pkg/execution/controllers/jobqueuecontroller"
"github.com/furiko-io/furiko/pkg/execution/stores/activejobstore"
"github.com/furiko-io/furiko/pkg/runtime/controllercontext"
Expand All @@ -40,6 +41,7 @@ func TestPerJobConfigReconciler(t *testing.T) {
return jobqueuecontroller.NewPerConfigReconciler(
c.(*jobqueuecontroller.Context),
runtimetesting.ReconcilerDefaultConcurrency,
newMockJobControl(c.Clientsets().Furiko().ExecutionV1alpha1()),
)
},
Now: testutils.Mktime(now),
Expand Down Expand Up @@ -112,5 +114,87 @@ func TestPerJobConfigReconciler(t *testing.T) {
},
},
},
{
Name: "reject second job with Forbid",
Fixtures: []runtime.Object{
jobForForbid1,
jobForForbid2,
},
Target: jobConfigForbid,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)),
runtimetesting.NewUpdateJobAction(jobNamespace, rejectJob(jobForForbid2)),
},
},
},
},
{
Name: "wait for second job with Enqueue",
Fixtures: []runtime.Object{
jobForForbid1,
makeJob("job2", 1, jobConfigForbid, execution.ConcurrencyPolicyEnqueue, nil),
},
Target: jobConfigForbid,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)),
},
},
},
},
{
Name: "will skip first job with StartAfter",
Fixtures: []runtime.Object{
makeJob("job1", 0, jobConfigForbid, execution.ConcurrencyPolicyForbid, testutils.Mkmtimep(startAfter)),
jobForForbid2,
},
Target: jobConfigForbid,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid2, timeNow)),
},
},
},
},
{
Name: "can start both jobs with MaxConcurrency set to 2",
Fixtures: []runtime.Object{
jobForForbid1,
jobForForbid2,
},
Target: jobConfigForbidMax2,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)),
runtimetesting.NewUpdateJobAction(jobNamespace, startJob(jobForForbid2, timeNow)),
},
},
},
},
{
Name: "reject 3rd job with MaxConcurrency set to 2",
Fixtures: []runtime.Object{
jobForForbid1,
jobForForbid2,
makeJob("job3", 2, jobConfigForbid, execution.ConcurrencyPolicyForbid, nil),
},
Target: jobConfigForbidMax2,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)),
runtimetesting.NewUpdateJobAction(jobNamespace, startJob(jobForForbid2, timeNow)),
runtimetesting.NewUpdateJobAction(jobNamespace, rejectJob(
makeJob("job3", 2, jobConfigForbid, execution.ConcurrencyPolicyForbid, nil),
)),
},
},
},
},
})
}
Loading

0 comments on commit 7cbf420

Please sign in to comment.