Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concurrency): Support variable MaxConcurrency #92

Merged
merged 2 commits into from
Jun 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions apis/execution/v1alpha1/jobconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ type JobConfigSpec struct {
type ConcurrencySpec struct {
// Policy describes how to treat concurrent executions of the same JobConfig.
Policy ConcurrencyPolicy `json:"policy"`

// Maximum number of Jobs that can be running concurrently for the same
// JobConfig. Cannot be specified if Policy is set to Allow.
//
// Defaults to 1.
//
// +optional
MaxConcurrency *int64 `json:"maxConcurrency,omitempty"`
}

// GetMaxConcurrency returns the MaxConcurrency value if specified, otherwise defaults to 1.
func (c ConcurrencySpec) GetMaxConcurrency() int64 {
if c.MaxConcurrency != nil {
return *c.MaxConcurrency
}
return 1
}

// ScheduleSpec defines how a JobConfig should be automatically scheduled.
Expand Down
7 changes: 6 additions & 1 deletion apis/execution/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/execution.furiko.io_jobconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ spec:
concurrency:
description: Concurrency defines the behaviour of multiple concurrent Jobs.
properties:
maxConcurrency:
description: "Maximum number of Jobs that can be running concurrently for the same JobConfig. Cannot be specified if Policy is set to Allow. \n Defaults to 1."
format: int64
type: integer
policy:
description: Policy describes how to treat concurrent executions of the same JobConfig.
type: string
Expand Down
5 changes: 3 additions & 2 deletions pkg/execution/controllers/croncontroller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ func (w *Reconciler) processCronForConfig(ctx context.Context, namespace, name s

// Check concurrency policy.
concurrencyPolicy := jobConfig.Spec.Concurrency.Policy
maxConcurrency := jobConfig.Spec.Concurrency.GetMaxConcurrency()

// Handle Forbid concurrency policy.
if concurrencyPolicy == execution.ConcurrencyPolicyForbid && activeJobCount > 0 {
if concurrencyPolicy == execution.ConcurrencyPolicyForbid && activeJobCount+1 > maxConcurrency {
w.recorder.SkippedJobSchedule(ctx, jobConfig, scheduleTime,
"Skipped creating job due to concurrency policy Forbid")
return nil
}

// Cannot enqueue beyond max queue length.
// TODO(irvinlim): We use the status here, which may not be fully up-to-date.
// NOTE(irvinlim): We use the status here, which may not be fully up-to-date.
if max := cfg.MaxEnqueuedJobs; max != nil && jobConfig.Status.Queued >= *max {
w.recorder.SkippedJobSchedule(ctx, jobConfig, scheduleTime,
fmt.Sprintf("Skipped creating job, cannot exceed maximum queue length of %v", *max))
Expand Down
36 changes: 36 additions & 0 deletions pkg/execution/controllers/croncontroller/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ var (
},
})

jobConfigForbidMax2 = makeJobConfig("job-config-forbid-max-2", execution.JobConfigSpec{
Schedule: scheduleSpecEvery5Min,
Concurrency: execution.ConcurrencySpec{
Policy: execution.ConcurrencyPolicyForbid,
MaxConcurrency: pointer.Int64(2),
},
})

jobConfigAllow = makeJobConfig("job-config-allow", execution.JobConfigSpec{
Schedule: scheduleSpecEvery5Min,
Concurrency: execution.ConcurrencySpec{
Expand Down Expand Up @@ -189,6 +197,34 @@ func TestReconciler(t *testing.T) {
},
wantNumCreated: 1,
},
{
name: "can create job for Forbid with 1 active and MaxConcurrency set to 2",
initialJobConfigs: []*execution.JobConfig{
jobConfigForbidMax2,
},
initialCounts: map[*execution.JobConfig]int64{
jobConfigForbidMax2: 1,
},
syncTarget: syncTarget{
namespace: jobConfigForbidMax2.Namespace,
name: croncontroller.JoinJobConfigKeyName(jobConfigForbidMax2.Name, testutils.Mktime(scheduleTime)),
},
wantNumCreated: 1,
},
{
name: "skip create job for Forbid with 2 active and MaxConcurrency set to 2",
initialJobConfigs: []*execution.JobConfig{
jobConfigForbidMax2,
},
initialCounts: map[*execution.JobConfig]int64{
jobConfigForbidMax2: 2,
},
syncTarget: syncTarget{
namespace: jobConfigForbidMax2.Namespace,
name: croncontroller.JoinJobConfigKeyName(jobConfigForbidMax2.Name, testutils.Mktime(scheduleTime)),
},
wantSkipped: 1,
},
{
name: "cannot create more than maxEnqueuedJobs",
cfgs: controllercontext.ConfigsMap{
Expand Down
11 changes: 3 additions & 8 deletions pkg/execution/controllers/jobqueuecontroller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@ type JobControl struct {

var _ JobControlInterface = (*JobControl)(nil)

func NewJobControl(
client executionv1alpha1.ExecutionV1alpha1Interface,
recorder record.EventRecorder,
name string,
) *JobControl {
func NewJobControl(client executionv1alpha1.ExecutionV1alpha1Interface, recorder record.EventRecorder) *JobControl {
return &JobControl{
client: client,
recorder: recorder,
name: name,
}
}

Expand All @@ -69,7 +64,7 @@ func (c *JobControl) StartJob(ctx context.Context, rj *execution.Job) error {
}

klog.V(3).InfoS("jobqueuecontroller: started job", logvalues.
Values("worker", c.name, "namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()).
Values("namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()).
Level(4, "job", updatedRj).
Build()...,
)
Expand All @@ -88,7 +83,7 @@ func (c *JobControl) RejectJob(ctx context.Context, rj *execution.Job, msg strin
}

klog.V(3).InfoS("jobqueuecontroller: rejected job", logvalues.
Values("worker", c.name, "namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()).
Values("namespace", updatedRj.GetNamespace(), "name", updatedRj.GetName()).
Level(4, "job", updatedRj).
Build()...,
)
Expand Down
6 changes: 4 additions & 2 deletions pkg/execution/controllers/jobqueuecontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ func NewController(
terminate: cancel,
}

jobControl := NewJobControl(ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(), ctrl.Context.recorder)

// Create multiple reconcilers. For Jobs that are not owned by a JobConfig, we
// will simply start them as soon as they are created. Otherwise, we will
// sequentially process Jobs on a per-JobConfig basis in each Reconciler
// goroutine.
perConfigReconciler := NewPerConfigReconciler(ctrl.Context, concurrency)
independentReconciler := NewIndependentReconciler(ctrl.Context, concurrency)
perConfigReconciler := NewPerConfigReconciler(ctrl.Context, concurrency, jobControl)
independentReconciler := NewIndependentReconciler(ctrl.Context, concurrency, jobControl)

ctrl.informerWorker = NewInformerWorker(ctrl.Context)
ctrl.perConfigReconciler = reconciler.NewController(perConfigReconciler, ctrl.jobConfigQueue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,16 @@ type IndependentReconciler struct {
client JobControlInterface
}

func NewIndependentReconciler(ctrlContext *Context, concurrency *configv1alpha1.Concurrency) *IndependentReconciler {
reconciler := &IndependentReconciler{
func NewIndependentReconciler(
ctrlContext *Context,
concurrency *configv1alpha1.Concurrency,
client JobControlInterface,
) *IndependentReconciler {
return &IndependentReconciler{
Context: ctrlContext,
concurrency: concurrency,
client: client,
}
reconciler.client = NewJobControl(
ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(),
ctrlContext.recorder,
reconciler.Name(),
)
return reconciler
}

func (r *IndependentReconciler) Name() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestIndependentReconciler(t *testing.T) {
return jobqueuecontroller.NewIndependentReconciler(
c.(*jobqueuecontroller.Context),
runtimetesting.ReconcilerDefaultConcurrency,
newMockJobControl(c.Clientsets().Furiko().ExecutionV1alpha1()),
)
},
Now: testutils.Mktime(now),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,16 @@ type PerConfigReconciler struct {
client JobControlInterface
}

func NewPerConfigReconciler(ctrlContext *Context, concurrency *configv1alpha1.Concurrency) *PerConfigReconciler {
reconciler := &PerConfigReconciler{
func NewPerConfigReconciler(
ctrlContext *Context,
concurrency *configv1alpha1.Concurrency,
client JobControlInterface,
) *PerConfigReconciler {
return &PerConfigReconciler{
Context: ctrlContext,
concurrency: concurrency,
client: client,
}
reconciler.client = NewJobControl(
ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(),
ctrlContext.recorder,
reconciler.Name(),
)
return reconciler
}

func (w *PerConfigReconciler) Name() string {
Expand Down Expand Up @@ -175,8 +174,10 @@ func (w *PerConfigReconciler) canStartJob(
return false, nil
}

// There are concurrent jobs and we should immediately reject the job.
if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyForbid && activeCount > 0 {
maxConcurrency := rjc.Spec.Concurrency.GetMaxConcurrency()

// If it exceeds the max concurrency, Forbid will immediately reject the job.
if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyForbid && activeCount+1 > maxConcurrency {
msg := fmt.Sprintf("Cannot start new Job, %v has %v active Jobs but concurrency policy is %v",
rjc.Name, activeCount, spec.ConcurrencyPolicy)
if err := w.client.RejectJob(ctx, rj, msg); err != nil {
Expand All @@ -193,8 +194,8 @@ func (w *PerConfigReconciler) canStartJob(
return false, nil
}

// There are concurrent jobs and we should wait.
if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyEnqueue && activeCount > 0 {
// If it exceeds the max concurrency, Enqueue will wait.
if spec.ConcurrencyPolicy == execution.ConcurrencyPolicyEnqueue && activeCount+1 > maxConcurrency {
return false, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"

execution "github.com/furiko-io/furiko/apis/execution/v1alpha1"
"github.com/furiko-io/furiko/pkg/execution/controllers/jobqueuecontroller"
"github.com/furiko-io/furiko/pkg/execution/stores/activejobstore"
"github.com/furiko-io/furiko/pkg/runtime/controllercontext"
Expand All @@ -40,6 +41,7 @@ func TestPerJobConfigReconciler(t *testing.T) {
return jobqueuecontroller.NewPerConfigReconciler(
c.(*jobqueuecontroller.Context),
runtimetesting.ReconcilerDefaultConcurrency,
newMockJobControl(c.Clientsets().Furiko().ExecutionV1alpha1()),
)
},
Now: testutils.Mktime(now),
Expand Down Expand Up @@ -112,5 +114,87 @@ func TestPerJobConfigReconciler(t *testing.T) {
},
},
},
{
Name: "reject second job with Forbid",
Fixtures: []runtime.Object{
jobForForbid1,
jobForForbid2,
},
Target: jobConfigForbid,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)),
runtimetesting.NewUpdateJobAction(jobNamespace, rejectJob(jobForForbid2)),
},
},
},
},
{
Name: "wait for second job with Enqueue",
Fixtures: []runtime.Object{
jobForForbid1,
makeJob("job2", 1, jobConfigForbid, execution.ConcurrencyPolicyEnqueue, nil),
},
Target: jobConfigForbid,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)),
},
},
},
},
{
Name: "will skip first job with StartAfter",
Fixtures: []runtime.Object{
makeJob("job1", 0, jobConfigForbid, execution.ConcurrencyPolicyForbid, testutils.Mkmtimep(startAfter)),
jobForForbid2,
},
Target: jobConfigForbid,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid2, timeNow)),
},
},
},
},
{
Name: "can start both jobs with MaxConcurrency set to 2",
Fixtures: []runtime.Object{
jobForForbid1,
jobForForbid2,
},
Target: jobConfigForbidMax2,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)),
runtimetesting.NewUpdateJobAction(jobNamespace, startJob(jobForForbid2, timeNow)),
},
},
},
},
{
Name: "reject 3rd job with MaxConcurrency set to 2",
Fixtures: []runtime.Object{
jobForForbid1,
jobForForbid2,
makeJob("job3", 2, jobConfigForbid, execution.ConcurrencyPolicyForbid, nil),
},
Target: jobConfigForbidMax2,
WantActions: runtimetesting.CombinedActions{
Furiko: runtimetesting.ActionTest{
Actions: []runtimetesting.Action{
runtimetesting.NewUpdateJobStatusAction(jobNamespace, startJob(jobForForbid1, timeNow)),
runtimetesting.NewUpdateJobAction(jobNamespace, startJob(jobForForbid2, timeNow)),
runtimetesting.NewUpdateJobAction(jobNamespace, rejectJob(
makeJob("job3", 2, jobConfigForbid, execution.ConcurrencyPolicyForbid, nil),
)),
},
},
},
},
})
}
Loading