diff --git a/cmd/execution-controller/main.go b/cmd/execution-controller/main.go index a46be8f..09e6e1e 100644 --- a/cmd/execution-controller/main.go +++ b/cmd/execution-controller/main.go @@ -107,6 +107,17 @@ func main() { klog.Fatalf("cannot initialize controller manager: %v", err) } + // Set up stores. + for _, factory := range GetStoreFactories() { + klog.Infof("setting up store %v", factory.Name()) + store, err := factory.New(ctrlContext) + if err != nil { + klog.Fatalf("cannot initialize store %v: %v", factory.Name(), err) + } + mgr.AddStore(store) + ctrlContext.Stores().Register(store) + } + // Set up controllers. for _, factory := range GetControllerFactories() { concurrencySpec := options.ControllerConcurrency @@ -121,17 +132,6 @@ func main() { mgr.Add(controller) } - // Set up stores. - for _, factory := range GetStoreFactories() { - klog.Infof("setting up store %v", factory.Name()) - store, err := factory.New(ctrlContext) - if err != nil { - klog.Fatalf("cannot initialize store %v: %v", factory.Name(), err) - } - mgr.AddStore(store) - ctrlContext.Stores().Register(store) - } - ctx := ctrl.SetupSignalHandler() // Start HTTP server in background. diff --git a/pkg/execution/controllers/croncontroller/control.go b/pkg/execution/controllers/croncontroller/control.go new file mode 100644 index 0000000..5f3541f --- /dev/null +++ b/pkg/execution/controllers/croncontroller/control.go @@ -0,0 +1,103 @@ +/* + * Copyright 2022 The Furiko Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package croncontroller + +import ( + "context" + "fmt" + "strings" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + + execution "github.com/furiko-io/furiko/apis/execution/v1alpha1" + executionv1alpha1 "github.com/furiko-io/furiko/pkg/generated/clientset/versioned/typed/execution/v1alpha1" + "github.com/furiko-io/furiko/pkg/utils/logvalues" +) + +type ExecutionControlInterface interface { + CreateJob(ctx context.Context, rjc *execution.JobConfig, rj *execution.Job) error +} + +// ExecutionControl is a wrapper around the Execution clientset. +type ExecutionControl struct { + name string + client executionv1alpha1.ExecutionV1alpha1Interface + recorder record.EventRecorder +} + +var _ ExecutionControlInterface = (*ExecutionControl)(nil) + +func NewExecutionControl( + name string, + client executionv1alpha1.ExecutionV1alpha1Interface, + recorder record.EventRecorder, +) *ExecutionControl { + return &ExecutionControl{ + name: name, + client: client, + recorder: recorder, + } +} + +func (c *ExecutionControl) CreateJob(ctx context.Context, rjc *execution.JobConfig, rj *execution.Job) error { + createdRj, err := c.client.Jobs(rj.GetNamespace()).Create(ctx, rj, metav1.CreateOptions{}) + + // If we fail to create the Job due to an invalid error (e.g. from webhook), do + // not retry and instead store as an event. + if kerrors.IsInvalid(err) { + // Get detailed error information if possible. + errMessage := err.Error() + if err, ok := err.(kerrors.APIStatus); ok { + if details := err.Status().Details; details != nil && len(details.Causes) > 0 { + causes := make([]string, 0, len(details.Causes)) + for _, cause := range details.Causes { + causes = append(causes, fmt.Sprintf("%s: %s", cause.Field, cause.Message)) + } + errMessage = strings.Join(causes, ", ") + } + } + + klog.ErrorS(err, "croncontroller: failed to create job", logvalues. + Values("worker", c.name, "namespace", rj.GetNamespace(), "name", rj.GetName()). + Level(4, "job", rj). + Build()..., + ) + c.recorder.Eventf(rjc, corev1.EventTypeWarning, "CronScheduleFailed", + "Failed to create Job %v from cron schedule: %v", rj.GetName(), errMessage) + return nil + } + + if err != nil { + return errors.Wrapf(err, "cannot create job") + } + + klog.InfoS("croncontroller: created job", logvalues. + Values("worker", c.name, "namespace", createdRj.GetNamespace(), "name", createdRj.GetName()). + Level(4, "job", createdRj). + Build()..., + ) + + c.recorder.Eventf(rjc, corev1.EventTypeNormal, "CronSchedule", + "Created Job %v from cron schedule", createdRj.GetName()) + + return nil +} diff --git a/pkg/execution/controllers/croncontroller/control_test.go b/pkg/execution/controllers/croncontroller/control_test.go new file mode 100644 index 0000000..832cab6 --- /dev/null +++ b/pkg/execution/controllers/croncontroller/control_test.go @@ -0,0 +1,124 @@ +/* + * Copyright 2022 The Furiko Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package croncontroller_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/validation/field" + ktesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/record" + + execution "github.com/furiko-io/furiko/apis/execution/v1alpha1" + "github.com/furiko-io/furiko/pkg/execution/controllers/croncontroller" + "github.com/furiko-io/furiko/pkg/runtime/controllercontext/mock" + "github.com/furiko-io/furiko/pkg/utils/ktime" +) + +const ( + jobNamespace = "test" + jobName = "my-sample-job" +) + +var ( + fakeJob = &execution.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + Namespace: jobNamespace, + }, + Spec: execution.JobSpec{ + Type: execution.JobTypeAdhoc, + Template: &execution.JobTemplateSpec{ + Task: execution.JobTaskSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "container", + Image: "hello-world", + Args: []string{ + "echo", + "Hello world!", + }, + }, + }, + }, + }, + }, + }, + }, + } +) + +func TestExecutionControl(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + ktime.Clock = clock.NewFakeClock(time.Now()) + c := mock.NewContext() + fakeClient := c.MockClientsets().FurikoMock() + client := fakeClient.ExecutionV1alpha1() + recorder := record.NewFakeRecorder(1024) + control := croncontroller.NewExecutionControl("test", client, recorder) + err := c.Start(ctx) + assert.NoError(t, err) + + // Create job with client with recorded event + assert.Len(t, recorder.Events, 0) + err = control.CreateJob(ctx, jobConfigAllow, fakeJob) + assert.NoError(t, err) + assert.Len(t, recorder.Events, 1) + + // Should be created + _, err = client.Jobs(fakeJob.Namespace).Get(ctx, fakeJob.Name, metav1.GetOptions{}) + assert.NoError(t, err) + + // Creating duplicate Job should throw errpr + err = control.CreateJob(ctx, jobConfigAllow, fakeJob) + assert.True(t, kerrors.IsAlreadyExists(err)) + + // Delete the Job now + err = client.Jobs(fakeJob.Namespace).Delete(ctx, fakeJob.Name, metav1.DeleteOptions{}) + assert.NoError(t, err) + + // Add Reactor to simulate InvalidError + fakeClient.PrependReactor("create", "jobs", + func(action ktesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, kerrors.NewInvalid(execution.GVKJob.GroupKind(), fakeJob.Name, field.ErrorList{ + field.Invalid(field.NewPath("metadata", "name"), fakeJob.Name, "invalid name"), + }) + }) + + // Create Job with InvalidError, should not throw error but recorded an event + assert.Len(t, recorder.Events, 1) + err = control.CreateJob(ctx, jobConfigAllow, fakeJob) + assert.NoError(t, err) + assert.Len(t, recorder.Events, 2) + + // Should not be created + _, err = client.Jobs(fakeJob.Namespace).Get(ctx, fakeJob.Name, metav1.GetOptions{}) + assert.Error(t, err) + assert.True(t, kerrors.IsNotFound(err)) +} diff --git a/pkg/execution/controllers/croncontroller/controller.go b/pkg/execution/controllers/croncontroller/controller.go index 5b59210..49b9b8a 100644 --- a/pkg/execution/controllers/croncontroller/controller.go +++ b/pkg/execution/controllers/croncontroller/controller.go @@ -21,6 +21,7 @@ import ( "sync/atomic" "time" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" v1core "k8s.io/client-go/kubernetes/typed/core/v1" @@ -67,22 +68,15 @@ type Context struct { controllercontext.ContextInterface jobInformer executioninformers.JobInformer jobconfigInformer executioninformers.JobConfigInformer - hasSynced []cache.InformerSynced + HasSynced []cache.InformerSynced queue workqueue.RateLimitingInterface updatedConfigs chan *execution.JobConfig - recorder record.EventRecorder } +// NewContext returns a new Context. func NewContext(context controllercontext.ContextInterface) *Context { c := &Context{ContextInterface: context} - // Create recorder. - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ - Interface: c.Clientsets().Kubernetes().CoreV1().Events(""), - }) - c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) - // Create workqueue. ratelimiter := workqueue.DefaultControllerRateLimiter() c.queue = workqueue.NewNamedRateLimitingQueue(ratelimiter, controllerName) @@ -90,7 +84,7 @@ func NewContext(context controllercontext.ContextInterface) *Context { // Bind informers. c.jobInformer = c.Informers().Furiko().Execution().V1alpha1().Jobs() c.jobconfigInformer = c.Informers().Furiko().Execution().V1alpha1().JobConfigs() - c.hasSynced = []cache.InformerSynced{ + c.HasSynced = []cache.InformerSynced{ c.jobInformer.Informer().HasSynced, c.jobconfigInformer.Informer().HasSynced, } @@ -100,6 +94,15 @@ func NewContext(context controllercontext.ContextInterface) *Context { return c } +// NewRecorder returns a new Recorder for the controller. +func NewRecorder(context controllercontext.ContextInterface) record.EventRecorder { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ + Interface: context.Clientsets().Kubernetes().CoreV1().Events(""), + }) + return eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) +} + func NewController( ctrlContext controllercontext.ContextInterface, concurrency *configv1.Concurrency, @@ -110,7 +113,18 @@ func NewController( ctrl.cronWorker = NewCronWorker(ctrl.Context) ctrl.informerWorker = NewInformerWorker(ctrl.Context) - ctrl.reconciler = reconciler.NewController(NewReconciler(ctrl.Context, concurrency), ctrl.queue) + + client := NewExecutionControl( + (&Reconciler{}).Name(), + ctrlContext.Clientsets().Furiko().ExecutionV1alpha1(), + NewRecorder(ctrl.Context), + ) + store, err := ctrlContext.Stores().ActiveJobStore() + if err != nil { + return nil, errors.Wrapf(err, "cannot load ActiveJobStore") + } + recon := NewReconciler(ctrl.Context, client, store, concurrency) + ctrl.reconciler = reconciler.NewController(recon, ctrl.queue) return ctrl, nil } @@ -125,7 +139,7 @@ func (c *Controller) Run(ctx context.Context) error { ctx, controllerName, waitForCacheSyncTimeout, - c.hasSynced..., + c.HasSynced..., ); err != nil { klog.ErrorS(err, "croncontroller: cache sync timeout") return err diff --git a/pkg/execution/controllers/croncontroller/reconciler.go b/pkg/execution/controllers/croncontroller/reconciler.go index 27d43b3..5fb14d9 100644 --- a/pkg/execution/controllers/croncontroller/reconciler.go +++ b/pkg/execution/controllers/croncontroller/reconciler.go @@ -19,32 +19,42 @@ package croncontroller import ( "context" "fmt" - "strings" "time" "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" utiltrace "k8s.io/utils/trace" configv1 "github.com/furiko-io/furiko/apis/config/v1" execution "github.com/furiko-io/furiko/apis/execution/v1alpha1" + "github.com/furiko-io/furiko/pkg/runtime/controllercontext" "github.com/furiko-io/furiko/pkg/runtime/controllerutil" "github.com/furiko-io/furiko/pkg/utils/execution/jobconfig" - "github.com/furiko-io/furiko/pkg/utils/logvalues" ) +// Reconciler creates Jobs that are queued to be started from the workqueue, +// with a schedule time and JobConfig, unless forbidden by the +// ConcurrencyPolicy. It may be possible for multiple workers to process the +// same JobConfig concurrently. type Reconciler struct { *Context concurrency *configv1.Concurrency + client ExecutionControlInterface + store controllercontext.ActiveJobStore } -func NewReconciler(ctrlContext *Context, concurrency *configv1.Concurrency) *Reconciler { +func NewReconciler( + ctrlContext *Context, + client ExecutionControlInterface, + store controllercontext.ActiveJobStore, + concurrency *configv1.Concurrency, +) *Reconciler { return &Reconciler{ Context: ctrlContext, concurrency: concurrency, + client: client, + store: store, } } @@ -68,10 +78,6 @@ func (w *Reconciler) SyncOne(ctx context.Context, namespace, name string, _ int) return w.processCronForConfig(ctx, namespace, configName, ts) } -// processCronForConfig creates a single Job in response to cron scheduling. We -// will create a new Job unless forbidden by the ConcurrencyPolicy. It may be -// possible for multiple worker instances to process the same JobConfig -// concurrently. func (w *Reconciler) processCronForConfig(ctx context.Context, namespace, name string, scheduleTime time.Time) error { trace := utiltrace.New( "cron_reconcile", @@ -91,11 +97,7 @@ func (w *Reconciler) processCronForConfig(ctx context.Context, namespace, name s trace.Step("Lookup jobconfig in cache done") // Count active jobs for the JobConfig. - store, err := w.Stores().ActiveJobStore() - if err != nil { - return errors.Wrapf(err, "cannot get activejobstore") - } - activeJobCount := store.CountActiveJobsForConfig(jobConfig) + activeJobCount := w.store.CountActiveJobsForConfig(jobConfig) trace.Step("Count active jobs for config done") // Check concurrency policy. @@ -135,7 +137,7 @@ func (w *Reconciler) processCronForConfig(ctx context.Context, namespace, name s // If existing Job does not exist, create the resource. if kerrors.IsNotFound(err) { - if err := w.createNewJob(ctx, jobConfig, newJob); err != nil { + if err := w.client.CreateJob(ctx, jobConfig, newJob); err != nil { return errors.Wrapf(err, "could not create new job") } trace.Step("Create job done") @@ -143,52 +145,3 @@ func (w *Reconciler) processCronForConfig(ctx context.Context, namespace, name s return nil } - -func (w *Reconciler) createNewJob( - ctx context.Context, - rjc *execution.JobConfig, - rj *execution.Job, -) (err error) { - createdRj, err := w.Clientsets().Furiko().ExecutionV1alpha1().Jobs(rj.GetNamespace()). - Create(ctx, rj, metav1.CreateOptions{}) - - // If we fail to create the Job due to an invalid error (e.g. from webhook), do - // not retry and instead store as an event. - if kerrors.IsInvalid(err) { - // Get detailed error information if possible. - errMessage := err.Error() - if err, ok := err.(kerrors.APIStatus); ok { - if details := err.Status().Details; details != nil && len(details.Causes) > 0 { - causes := make([]string, 0, len(details.Causes)) - for _, cause := range details.Causes { - causes = append(causes, fmt.Sprintf("%s: %s", cause.Field, cause.Message)) - } - errMessage = strings.Join(causes, ", ") - } - } - - klog.ErrorS(err, "croncontroller: failed to create job", logvalues. - Values("worker", w.Name(), "namespace", rj.GetNamespace(), "name", rj.GetName()). - Level(4, "job", rj). - Build()..., - ) - w.recorder.Eventf(rjc, corev1.EventTypeWarning, "CronScheduleFailed", - "Failed to create Job %v from cron schedule: %v", rj.GetName(), errMessage) - return nil - } - - if err != nil { - return errors.Wrapf(err, "cannot create job") - } - - klog.InfoS("croncontroller: created job", logvalues. - Values("worker", w.Name(), "namespace", createdRj.GetNamespace(), "name", createdRj.GetName()). - Level(4, "job", createdRj). - Build()..., - ) - - w.recorder.Eventf(rjc, corev1.EventTypeNormal, "CronSchedule", - "Created Job %v from cron schedule", createdRj.GetName()) - - return nil -} diff --git a/pkg/execution/controllers/croncontroller/reconciler_test.go b/pkg/execution/controllers/croncontroller/reconciler_test.go new file mode 100644 index 0000000..ebbf0fe --- /dev/null +++ b/pkg/execution/controllers/croncontroller/reconciler_test.go @@ -0,0 +1,300 @@ +/* + * Copyright 2022 The Furiko Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package croncontroller_test + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + + execution "github.com/furiko-io/furiko/apis/execution/v1alpha1" + "github.com/furiko-io/furiko/pkg/execution/controllers/croncontroller" + "github.com/furiko-io/furiko/pkg/runtime/controllercontext" + "github.com/furiko-io/furiko/pkg/runtime/controllercontext/mock" + runtimetesting "github.com/furiko-io/furiko/pkg/runtime/testing" + utilatomic "github.com/furiko-io/furiko/pkg/utils/atomic" + "github.com/furiko-io/furiko/pkg/utils/testutils" +) + +var ( + scheduleSpecEvery5Min = &execution.ScheduleSpec{ + Cron: &execution.CronSchedule{ + Expression: "0/5 * * * *", + }, + } + + jobConfigForbid = makeJobConfig("job-config-forbid", execution.JobConfigSpec{ + Schedule: scheduleSpecEvery5Min, + Concurrency: execution.ConcurrencySpec{ + Policy: execution.ConcurrencyPolicyForbid, + }, + }) + + jobConfigAllow = makeJobConfig("job-config-allow", execution.JobConfigSpec{ + Schedule: scheduleSpecEvery5Min, + Concurrency: execution.ConcurrencySpec{ + Policy: execution.ConcurrencyPolicyAllow, + }, + }) +) + +func TestReconciler(t *testing.T) { + type syncTarget struct { + namespace string + name string + } + tests := []struct { + name string + syncTarget syncTarget + initialJobConfigs []*execution.JobConfig + initialCounts map[*execution.JobConfig]int64 + control MockControl + wantNumCreated int + wantErr bool + }{ + { + name: "cannot split name", + syncTarget: syncTarget{ + namespace: testNamespace, + name: "", + }, + wantErr: true, + }, + { + name: "no such job config, don't throw error", + syncTarget: syncTarget{ + namespace: testNamespace, + name: croncontroller.JoinJobConfigKeyName(testName, testutils.Mktime(scheduleTime)), + }, + }, + { + name: "create job successfully", + initialJobConfigs: []*execution.JobConfig{ + jobConfigForbid, + jobConfigAllow, + }, + syncTarget: syncTarget{ + namespace: jobConfigAllow.Namespace, + name: croncontroller.JoinJobConfigKeyName(jobConfigAllow.Name, testutils.Mktime(scheduleTime)), + }, + wantNumCreated: 1, + }, + { + name: "failed to create job", + initialJobConfigs: []*execution.JobConfig{ + jobConfigForbid, + jobConfigAllow, + }, + control: newMockControlWithError(errors.New("error")), + syncTarget: syncTarget{ + namespace: jobConfigAllow.Namespace, + name: croncontroller.JoinJobConfigKeyName(jobConfigAllow.Name, testutils.Mktime(scheduleTime)), + }, + wantErr: true, + }, + { + name: "create job for Forbid", + initialJobConfigs: []*execution.JobConfig{ + jobConfigForbid, + jobConfigAllow, + }, + syncTarget: syncTarget{ + namespace: jobConfigForbid.Namespace, + name: croncontroller.JoinJobConfigKeyName(jobConfigForbid.Name, testutils.Mktime(scheduleTime)), + }, + wantNumCreated: 1, + }, + { + name: "create job for Allow with active", + initialJobConfigs: []*execution.JobConfig{ + jobConfigForbid, + jobConfigAllow, + }, + initialCounts: map[*execution.JobConfig]int64{ + jobConfigAllow: 1, + jobConfigForbid: 1, + }, + syncTarget: syncTarget{ + namespace: jobConfigAllow.Namespace, + name: croncontroller.JoinJobConfigKeyName(jobConfigAllow.Name, testutils.Mktime(scheduleTime)), + }, + wantNumCreated: 1, + }, + { + name: "do not create job for Forbid with active", + initialJobConfigs: []*execution.JobConfig{ + jobConfigForbid, + jobConfigAllow, + }, + initialCounts: map[*execution.JobConfig]int64{ + jobConfigAllow: 1, + jobConfigForbid: 1, + }, + syncTarget: syncTarget{ + namespace: jobConfigForbid.Namespace, + name: croncontroller.JoinJobConfigKeyName(jobConfigForbid.Name, testutils.Mktime(scheduleTime)), + }, + }, + { + name: "can create job for Forbid with other job active", + initialJobConfigs: []*execution.JobConfig{ + jobConfigForbid, + jobConfigAllow, + }, + initialCounts: map[*execution.JobConfig]int64{ + jobConfigAllow: 1, + }, + syncTarget: syncTarget{ + namespace: jobConfigForbid.Namespace, + name: croncontroller.JoinJobConfigKeyName(jobConfigForbid.Name, testutils.Mktime(scheduleTime)), + }, + wantNumCreated: 1, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + c := mock.NewContext() + ctrlCtx := croncontroller.NewContext(c) + control := tt.control + if control == nil { + control = &mockControl{} + } + store := newMockStore(tt.initialCounts) + reconciler := croncontroller.NewReconciler(ctrlCtx, control, store, runtimetesting.ReconcilerDefaultConcurrency) + + err := c.Start(ctx) + assert.NoError(t, err) + client := c.MockClientsets() + + // Create initial objects + for _, rjc := range tt.initialJobConfigs { + rjc := rjc.DeepCopy() + _, err := client.Furiko().ExecutionV1alpha1().JobConfigs(rjc.Namespace).Create(ctx, rjc, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("cannot create JobConfig: %v", err) + } + } + + // NOTE(irvinlim): Add a short delay otherwise cache may not sync consistently + time.Sleep(time.Millisecond * 10) + + // Wait for cache sync + if !cache.WaitForCacheSync(ctx.Done(), ctrlCtx.HasSynced...) { + assert.FailNow(t, "caches not synced") + } + + // Trigger sync + if err := reconciler.SyncOne(ctx, tt.syncTarget.namespace, tt.syncTarget.name, 0); (err != nil) != tt.wantErr { + t.Errorf("SyncOne() error = %v, wantErr %v", err, tt.wantErr) + } + + // Assert call count. + assert.Equal(t, control.CountCreatedJobs(), tt.wantNumCreated) + }) + } +} + +type MockControl interface { + croncontroller.ExecutionControlInterface + CountCreatedJobs() int +} + +type mockControl struct { + createdJobs []*execution.Job +} + +func newMockControl() *mockControl { + return &mockControl{} +} + +var _ MockControl = (*mockControl)(nil) + +func (m *mockControl) CountCreatedJobs() int { + return len(m.createdJobs) +} + +func (m *mockControl) CreateJob(_ context.Context, _ *execution.JobConfig, rj *execution.Job) error { + m.createdJobs = append(m.createdJobs, rj) + return nil +} + +type mockControlWithError struct { + *mockControl + error error +} + +func newMockControlWithError(error error) *mockControlWithError { + return &mockControlWithError{ + mockControl: newMockControl(), + error: error, + } +} + +func (m *mockControlWithError) CreateJob(_ context.Context, _ *execution.JobConfig, rj *execution.Job) error { + return m.error +} + +type mockStore struct { + *utilatomic.Counter +} + +func newMockStore(initial map[*execution.JobConfig]int64) *mockStore { + counter := utilatomic.NewCounter() + for rjc, count := range initial { + for i := int64(0); i < count; i++ { + counter.Add(string(rjc.UID)) + } + } + return &mockStore{ + Counter: counter, + } +} + +var _ controllercontext.ActiveJobStore = (*mockStore)(nil) + +func (m *mockStore) CountActiveJobsForConfig(rjc *execution.JobConfig) int64 { + return m.Counter.Get(string(rjc.UID)) +} + +func (m *mockStore) CheckAndAdd(rjc *execution.JobConfig, oldCount int64) (success bool) { + return m.Counter.CheckAndAdd(string(rjc.UID), oldCount) +} + +func (m *mockStore) Delete(rjc *execution.JobConfig) { + m.Counter.Remove(string(rjc.UID)) +} + +// makeJobConfig returns a new JobConfig with the given Spec. +func makeJobConfig(name string, spec execution.JobConfigSpec) *execution.JobConfig { + return &execution.JobConfig{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNamespace, + Name: name, + UID: testutils.MakeUID(name), + }, + Spec: spec, + } +} diff --git a/pkg/execution/controllers/croncontroller/schedule_test.go b/pkg/execution/controllers/croncontroller/schedule_test.go index 5c5bb1d..c11f8ba 100644 --- a/pkg/execution/controllers/croncontroller/schedule_test.go +++ b/pkg/execution/controllers/croncontroller/schedule_test.go @@ -30,13 +30,16 @@ import ( "github.com/furiko-io/furiko/pkg/execution/controllers/croncontroller" "github.com/furiko-io/furiko/pkg/execution/util/cronparser" "github.com/furiko-io/furiko/pkg/runtime/controllercontext/mock" + "github.com/furiko-io/furiko/pkg/utils/testutils" +) + +const ( + now = "2021-02-09T04:06:13.234Z" ) var ( - now = mktime("2021-02-09T04:06:13.234Z") - metaNow = metav1.NewTime(now) - lastScheduleTime = metav1.NewTime(mktime("2021-02-09T04:02:00Z")) - longLastScheduleTime = metav1.NewTime(mktime("2021-02-09T02:46:00Z")) + lastScheduleTime = metav1.NewTime(testutils.Mktime("2021-02-09T04:02:00Z")) + longLastScheduleTime = metav1.NewTime(testutils.Mktime("2021-02-09T02:46:00Z")) jobConfigEveryMinute = &execution.JobConfig{ ObjectMeta: metav1.ObjectMeta{ @@ -76,7 +79,7 @@ var ( }, }, Status: execution.JobConfigStatus{ - LastScheduleTime: mkmtime("2021-10-27T22:15:00Z"), + LastScheduleTime: testutils.Mkmtimep("2021-10-27T22:15:00Z"), }, } @@ -139,7 +142,7 @@ var ( }, }, Status: execution.JobConfigStatus{ - LastScheduleTime: mkmtime("2021-06-24T09:00:00Z"), + LastScheduleTime: testutils.Mkmtimep("2021-06-24T09:00:00Z"), }, } @@ -153,7 +156,7 @@ var ( Expression: "* * * * *", }, Constraints: &execution.ScheduleContraints{ - NotBefore: &metaNow, + NotBefore: testutils.Mkmtimep(now), }, }, }, @@ -163,19 +166,6 @@ var ( } ) -func mktime(value string) time.Time { - ts, err := time.Parse(time.RFC3339, value) - if err != nil { - panic(err) // panic ok for tests - } - return ts -} - -func mkmtime(value string) *metav1.Time { - mt := metav1.NewTime(mktime(value)) - return &mt -} - func TestSchedule(t *testing.T) { tests := []struct { name string @@ -187,23 +177,23 @@ func TestSchedule(t *testing.T) { name: "Schedule every minute", jobConfig: jobConfigEveryMinute, cases: []time.Time{ - mktime("2021-02-09T04:07:00Z"), - mktime("2021-02-09T04:08:00Z"), - mktime("2021-02-09T04:09:00Z"), + testutils.Mktime("2021-02-09T04:07:00Z"), + testutils.Mktime("2021-02-09T04:08:00Z"), + testutils.Mktime("2021-02-09T04:09:00Z"), }, }, { name: "No future schedules", jobConfig: jobConfigPointInTime, cases: []time.Time{ - mktime("2021-02-09T12:00:00Z"), + testutils.Mktime("2021-02-09T12:00:00Z"), {}, // should return zero time }, }, { name: "No future schedules with LastScheduleTime", jobConfig: jobConfigPointInTimeWithAlreadySetLastScheduleTime, - fromTime: mktime("2021-10-28T06:00:00+08:00"), + fromTime: testutils.Mktime("2021-10-28T06:00:00+08:00"), cases: []time.Time{ {}, // should return zero time }, @@ -212,48 +202,48 @@ func TestSchedule(t *testing.T) { name: "Start from LastScheduleTime", jobConfig: jobConfigWithLastScheduleTime, cases: []time.Time{ - mktime("2021-02-09T04:03:00Z"), - mktime("2021-02-09T04:04:00Z"), - mktime("2021-02-09T04:05:00Z"), - mktime("2021-02-09T04:06:00Z"), + testutils.Mktime("2021-02-09T04:03:00Z"), + testutils.Mktime("2021-02-09T04:04:00Z"), + testutils.Mktime("2021-02-09T04:05:00Z"), + testutils.Mktime("2021-02-09T04:06:00Z"), }, }, { name: "Enforce maximum LastScheduleTime ago", jobConfig: jobConfigWithLastScheduleTimeLongAgo, cases: []time.Time{ - mktime("2021-02-09T04:02:00Z"), - mktime("2021-02-09T04:03:00Z"), - mktime("2021-02-09T04:04:00Z"), - mktime("2021-02-09T04:05:00Z"), - mktime("2021-02-09T04:06:00Z"), + testutils.Mktime("2021-02-09T04:02:00Z"), + testutils.Mktime("2021-02-09T04:03:00Z"), + testutils.Mktime("2021-02-09T04:04:00Z"), + testutils.Mktime("2021-02-09T04:05:00Z"), + testutils.Mktime("2021-02-09T04:06:00Z"), }, }, { name: "Using custom timezone", jobConfig: jobConfigWithTimezone, - fromTime: mktime("2021-02-09T04:07:00+08:00"), + fromTime: testutils.Mktime("2021-02-09T04:07:00+08:00"), cases: []time.Time{ - mktime("2021-02-09T06:00:00+08:00"), - mktime("2021-02-09T08:00:00+08:00"), - mktime("2021-02-09T10:00:00+08:00"), + testutils.Mktime("2021-02-09T06:00:00+08:00"), + testutils.Mktime("2021-02-09T08:00:00+08:00"), + testutils.Mktime("2021-02-09T10:00:00+08:00"), }, }, { name: "Using custom timezone and lastScheduleTime", jobConfig: jobConfigWithTimezoneAndLastScheduleTime, - fromTime: mktime("2021-06-24T17:19:40+08:00"), + fromTime: testutils.Mktime("2021-06-24T17:19:40+08:00"), cases: []time.Time{ - mktime("2021-06-24T17:00:00+07:00"), + testutils.Mktime("2021-06-24T17:00:00+07:00"), }, }, { name: "Respect ScheduleNotBeforeTimestamp", jobConfig: jobConfigWithLastScheduleTimeAndNotBefore, cases: []time.Time{ - mktime("2021-02-09T04:07:00Z"), - mktime("2021-02-09T04:08:00Z"), - mktime("2021-02-09T04:09:00Z"), + testutils.Mktime("2021-02-09T04:07:00Z"), + testutils.Mktime("2021-02-09T04:08:00Z"), + testutils.Mktime("2021-02-09T04:09:00Z"), }, }, } @@ -261,7 +251,7 @@ func TestSchedule(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { // Change fromTime if specified. - now := now + now := testutils.Mktime(now) if !tt.fromTime.IsZero() { now = tt.fromTime } @@ -308,6 +298,7 @@ func TestSchedule(t *testing.T) { } func TestSchedule_FlushNextScheduleTime(t *testing.T) { + now := testutils.Mktime(now) croncontroller.Clock = clock.NewFakeClock(now) jobConfig := &execution.JobConfig{ ObjectMeta: metav1.ObjectMeta{ @@ -341,7 +332,7 @@ func TestSchedule_FlushNextScheduleTime(t *testing.T) { // Bump next schedule time next := schedule.GetNextScheduleTime(jobConfig, now, expr) - want := mktime("2021-02-09T04:10:00Z") + want := testutils.Mktime("2021-02-09T04:10:00Z") if !next.Equal(want) { t.Errorf("expected next to be %v, got %v", want, next) return @@ -357,7 +348,7 @@ func TestSchedule_FlushNextScheduleTime(t *testing.T) { // Should still use old schedule because it's cached next2 := schedule.GetNextScheduleTime(jobConfig, next, expr) - want2 := mktime("2021-02-09T04:15:00Z") + want2 := testutils.Mktime("2021-02-09T04:15:00Z") if !next2.Equal(want2) { t.Errorf("expected next to be %v, got %v", want2, next2) return @@ -376,11 +367,11 @@ func TestSchedule_FlushNextScheduleTime(t *testing.T) { schedule.BumpNextScheduleTime(jobConfig, next, expr) return nil } - if err := flushAndCheck(mktime("2021-02-09T04:11:00Z")); err != nil { + if err := flushAndCheck(testutils.Mktime("2021-02-09T04:11:00Z")); err != nil { t.Error(err) return } - if err := flushAndCheck(mktime("2021-02-09T04:12:00Z")); err != nil { // idempotent + if err := flushAndCheck(testutils.Mktime("2021-02-09T04:12:00Z")); err != nil { // idempotent t.Error(err) return } diff --git a/pkg/execution/controllers/croncontroller/util.go b/pkg/execution/controllers/croncontroller/util.go index a675d92..7bbf2b3 100644 --- a/pkg/execution/controllers/croncontroller/util.go +++ b/pkg/execution/controllers/croncontroller/util.go @@ -51,8 +51,14 @@ func JobConfigKeyFunc(config *execution.JobConfig, scheduleTime time.Time) (stri } // Add Unix timestamp (seconds) in key. - wrappedKey := fmt.Sprintf("%v.%v", key, scheduleTime.Unix()) - return wrappedKey, nil + return JoinJobConfigKeyName(key, scheduleTime), nil +} + +// JoinJobConfigKeyName joins a key with a scheduled timestamp for a JobConfig. +// Performs the reverse of SplitJobConfigKeyName. +func JoinJobConfigKeyName(key string, ts time.Time) string { + wrappedKey := fmt.Sprintf("%v.%v", key, ts.Unix()) + return wrappedKey } // SplitJobConfigKeyName splits a key into name and scheduled timestamp for a JobConfig. diff --git a/pkg/execution/controllers/croncontroller/util_test.go b/pkg/execution/controllers/croncontroller/util_test.go index a780c4b..9b80969 100644 --- a/pkg/execution/controllers/croncontroller/util_test.go +++ b/pkg/execution/controllers/croncontroller/util_test.go @@ -18,7 +18,6 @@ package croncontroller_test import ( "fmt" - "reflect" "testing" "time" @@ -26,9 +25,12 @@ import ( execution "github.com/furiko-io/furiko/apis/execution/v1alpha1" "github.com/furiko-io/furiko/pkg/execution/controllers/croncontroller" + "github.com/furiko-io/furiko/pkg/utils/testutils" ) const ( + scheduleUnix = 1604188800 + scheduleTime = "2020-11-01T00:00:00Z" testNamespace = "test" testName = "sample-job.1605774500690" ) @@ -42,8 +44,8 @@ func TestJobConfigKeyFunc(t *testing.T) { }{ { name: "basic test", - scheduleTime: time.Unix(1604188800, 0), - want: fmt.Sprintf("%v/%v.%v", testNamespace, testName, 1604188800), + scheduleTime: testutils.Mktime(scheduleTime), + want: fmt.Sprintf("%v/%v.%v", testNamespace, testName, scheduleUnix), }, } for _, tt := range tests { @@ -77,9 +79,9 @@ func TestSplitJobConfigKey(t *testing.T) { }{ { name: "basic test", - key: fmt.Sprintf("%v.%v", testName, 1604188800), + key: fmt.Sprintf("%v.%v", testName, scheduleUnix), wantName: testName, - wantTS: time.Unix(1604188800, 0), + wantTS: testutils.Mktime(scheduleTime), }, { name: "empty key", @@ -114,7 +116,7 @@ func TestSplitJobConfigKey(t *testing.T) { if gotName != tt.wantName { t.Errorf("SplitJobConfigKeyName() gotName = %v, want %v", gotName, tt.wantName) } - if !reflect.DeepEqual(gotTS, tt.wantTS) { + if !gotTS.Equal(tt.wantTS) { t.Errorf("SplitJobConfigKeyName() gotTS = %v, want %v", gotTS, tt.wantTS) } }) diff --git a/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig.go b/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig.go index 08e4bf4..11182a9 100644 --- a/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig.go +++ b/pkg/execution/controllers/jobqueuecontroller/reconciler_perjobconfig.go @@ -127,7 +127,7 @@ func (w *PerConfigReconciler) SyncOne(ctx context.Context, namespace, name strin if !ok { continue } - if err := w.startJob(ctx, rj, store, activeCount); err != nil { + if err := w.startJob(ctx, rjc, rj, store, activeCount); err != nil { return errors.Wrapf(err, "cannot start job") } @@ -204,17 +204,18 @@ func (w *PerConfigReconciler) canStartJob( func (w *PerConfigReconciler) startJob( ctx context.Context, + rjc *execution.JobConfig, rj *execution.Job, store controllercontext.ActiveJobStore, oldCount int64, ) (err error) { // Atomically update the count, otherwise return an error to retry creation via workqueue. - if !store.CheckAndAdd(rj, oldCount) { + if !store.CheckAndAdd(rjc, oldCount) { return fmt.Errorf("concurrent add, try again") } defer func() { if err != nil { - store.Delete(rj) + store.Delete(rjc) } }() diff --git a/pkg/execution/stores/activejobstore/store.go b/pkg/execution/stores/activejobstore/store.go index 3cb5df1..0375d73 100644 --- a/pkg/execution/stores/activejobstore/store.go +++ b/pkg/execution/stores/activejobstore/store.go @@ -68,12 +68,9 @@ func (s *Store) CountActiveJobsForConfig(rjc *execution.JobConfig) int64 { return s.counter.Get(string(rjc.UID)) } -// CheckAndAdd atomically increments the active count for the given Job. -func (s *Store) CheckAndAdd(rj *execution.Job, oldCount int64) bool { - key, ok := s.getKey(rj) - if !ok { - return true - } +// CheckAndAdd atomically increments the active count for the given JobConfig. +func (s *Store) CheckAndAdd(rjc *execution.JobConfig, oldCount int64) bool { + key := string(rjc.UID) res := s.counter.CheckAndAdd(key, oldCount) if res { klog.V(5).InfoS("activejobstore: incremented job counter", "key", key, "count", oldCount+1) @@ -82,12 +79,8 @@ func (s *Store) CheckAndAdd(rj *execution.Job, oldCount int64) bool { } // Delete the Job from the store. -func (s *Store) Delete(rj *execution.Job) { - key, ok := s.getKey(rj) - if !ok { - return - } - +func (s *Store) Delete(rjc *execution.JobConfig) { + key := string(rjc.UID) count := s.decrement(key) klog.V(5).InfoS("activejobstore: decremented job counter", "key", key, "count", count) } diff --git a/pkg/execution/stores/activejobstore/store_test.go b/pkg/execution/stores/activejobstore/store_test.go index 6fea234..188e575 100644 --- a/pkg/execution/stores/activejobstore/store_test.go +++ b/pkg/execution/stores/activejobstore/store_test.go @@ -155,7 +155,7 @@ func TestStore(t *testing.T) { StartTime: &startTime, }, } - for !store.CheckAndAdd(rj, store.CountActiveJobsForConfig(rjc2)) { + for !store.CheckAndAdd(rjc2, store.CountActiveJobsForConfig(rjc2)) { } createdRj, err := createJob(ctx, client, rj) assert.NoError(t, err) @@ -241,7 +241,7 @@ func TestStore_CheckAndAdd_NotStarted(t *testing.T) { // This procedure is done in the JobQueueController. for { count := store.CountActiveJobsForConfig(rjc1) - if !store.CheckAndAdd(rj, count) { + if !store.CheckAndAdd(rjc1, count) { continue } newRj := rj.DeepCopy() @@ -256,7 +256,7 @@ func TestStore_CheckAndAdd_NotStarted(t *testing.T) { assert.Equal(t, int64(1), store.CountActiveJobsForConfig(rjc1)) // Delete the key, ensure that it drops back to 0. - store.Delete(rj) + store.Delete(rjc1) assert.Equal(t, int64(0), store.CountActiveJobsForConfig(rjc1)) } diff --git a/pkg/runtime/controllercontext/context_stores.go b/pkg/runtime/controllercontext/context_stores.go index e73a2dc..b58d293 100644 --- a/pkg/runtime/controllercontext/context_stores.go +++ b/pkg/runtime/controllercontext/context_stores.go @@ -55,8 +55,8 @@ func (c *ContextStores) Register(store Store) { type ActiveJobStore interface { CountActiveJobsForConfig(rjc *execution.JobConfig) int64 - CheckAndAdd(rj *execution.Job, oldCount int64) (success bool) - Delete(rj *execution.Job) + CheckAndAdd(rjc *execution.JobConfig, oldCount int64) (success bool) + Delete(rjc *execution.JobConfig) } // ActiveJobStore returns the active job store. diff --git a/pkg/runtime/testing/concurrency.go b/pkg/runtime/testing/concurrency.go new file mode 100644 index 0000000..8ce80a0 --- /dev/null +++ b/pkg/runtime/testing/concurrency.go @@ -0,0 +1,29 @@ +/* + * Copyright 2022 The Furiko Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package testing + +import ( + configv1 "github.com/furiko-io/furiko/apis/config/v1" +) + +var ( + // ReconcilerDefaultConcurrency is the default concurrency to use for tests. + // For simplicity, reconcilers should not have concurrent workers. + ReconcilerDefaultConcurrency = &configv1.Concurrency{ + Workers: 1, + } +) diff --git a/pkg/utils/testutils/uid.go b/pkg/utils/testutils/uid.go new file mode 100644 index 0000000..b814e6b --- /dev/null +++ b/pkg/utils/testutils/uid.go @@ -0,0 +1,40 @@ +/* + * Copyright 2022 The Furiko Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package testutils + +import ( + "hash/fnv" + "math/rand" + + "github.com/google/uuid" + "k8s.io/apimachinery/pkg/types" +) + +// MakeUID returns a deterministically generated UID given an input string. +// nolint:gosec +func MakeUID(name string) types.UID { + h := fnv.New64a() + if _, err := h.Write([]byte(name)); err != nil { + panic(err) + } + rnd := rand.New(rand.NewSource(int64(h.Sum64()))) + uid, err := uuid.NewRandomFromReader(rnd) + if err != nil { + panic(err) + } + return types.UID(uid.String()) +} diff --git a/pkg/utils/testutils/uid_test.go b/pkg/utils/testutils/uid_test.go new file mode 100644 index 0000000..fb92347 --- /dev/null +++ b/pkg/utils/testutils/uid_test.go @@ -0,0 +1,30 @@ +/* + * Copyright 2022 The Furiko Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package testutils_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/furiko-io/furiko/pkg/utils/testutils" +) + +func TestMakeUID(t *testing.T) { + assert.Equal(t, testutils.MakeUID("a"), testutils.MakeUID("a")) + assert.NotEqual(t, testutils.MakeUID("a"), testutils.MakeUID("b")) +}