Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(croncontroller): Implement heap-based Schedule #95

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,23 @@ import (
configv1alpha1 "github.com/furiko-io/furiko/apis/config/v1alpha1"
)

const (
// DefaultCronTimezone is the default timezone value that will be used if there
// is no timezone configuration for the JobConfig or a default value set for the
// controller.
DefaultCronTimezone = "UTC"

// DefaultCronMaxDowntimeThresholdSeconds is the default maximum downtime
// threshold (in seconds) that the cron controller can tolerate to automatically
// back-schedule missed schedules, if a value is not explicitly set.
DefaultCronMaxDowntimeThresholdSeconds = 300

// DefaultCronMaxMissedSchedules is the default maximum number of schedules that
// the controller should attempt to back-schedule, if a value is not explicitly
// set.
DefaultCronMaxMissedSchedules = 5
)

var (
DefaultJobExecutionConfig = &configv1alpha1.JobExecutionConfig{
DefaultTTLSecondsAfterFinished: pointer.Int64(3600),
Expand All @@ -38,8 +55,8 @@ var (
CronHashNames: pointer.Bool(true),
CronHashSecondsByDefault: pointer.Bool(false),
CronHashFields: pointer.Bool(true),
MaxMissedSchedules: pointer.Int64(5),
MaxDowntimeThresholdSeconds: 300,
DefaultTimezone: pointer.String("UTC"),
MaxMissedSchedules: pointer.Int64(DefaultCronMaxMissedSchedules),
MaxDowntimeThresholdSeconds: DefaultCronMaxDowntimeThresholdSeconds,
DefaultTimezone: pointer.String(DefaultCronTimezone),
}
)
4 changes: 4 additions & 0 deletions pkg/execution/controllers/croncontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func (c *Controller) Run(ctx context.Context) error {
return controllerutil.ErrWaitForCacheSyncTimeout
}

if err := c.cronWorker.Init(); err != nil {
klog.ErrorS(err, "croncontroller: cannot initialize cron worker")
}

c.reconciler.Start(c.ctx)
c.cronWorker.Start(c.ctx)

Expand Down
231 changes: 133 additions & 98 deletions pkg/execution/controllers/croncontroller/cron_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,34 @@ import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

utiltrace "k8s.io/utils/trace"

configv1alpha1 "github.com/furiko-io/furiko/apis/config/v1alpha1"
execution "github.com/furiko-io/furiko/apis/execution/v1alpha1"
"github.com/furiko-io/furiko/pkg/core/tzutils"
"github.com/furiko-io/furiko/pkg/execution/util/cronparser"
"github.com/furiko-io/furiko/pkg/config"
"github.com/furiko-io/furiko/pkg/execution/util/schedule"
)

const (
// CronWorkerInterval is the interval between checking if JobConfigs should be
// cronWorkerInterval is the interval between checking if JobConfigs should be
// enqueued. We only really need this as small as once per second.
CronWorkerInterval = time.Second
cronWorkerInterval = time.Second
)

var (
Clock clock.Clock = &clock.RealClock{}
)

// CronWorker enqueues Job names to be scheduled, based on the cron schedule of the config.
// It will enqueue one item for each schedule interval, which is a 1:1 correspondence with a Job
// to be created.
type CronWorker struct {
*Context
schedule *Schedule
schedule *schedule.Schedule
handler EnqueueHandler
mu sync.Mutex
}
Expand All @@ -59,18 +64,38 @@ type EnqueueHandler interface {

func NewCronWorker(ctrlContext *Context, handler EnqueueHandler) *CronWorker {
return &CronWorker{
Context: ctrlContext,
handler: handler,
schedule: NewSchedule(ctrlContext),
Context: ctrlContext,
handler: handler,
}
}

func (w *CronWorker) WorkerName() string {
return fmt.Sprintf("%v.CronWorker", controllerName)
}

// Init is called before we start the worker. We will initialize all JobConfigs
// into the Schedule.
func (w *CronWorker) Init() error {
jobConfigs, err := w.jobconfigInformer.Lister().JobConfigs(metav1.NamespaceAll).List(labels.Everything())
if err != nil {
return errors.Wrapf(err, "cannot list all jobconfigs")
}

sched, err := schedule.New(
jobConfigs,
schedule.WithClock(Clock),
schedule.WithConfigLoader(w.Configs()),
)
if err != nil {
return err
}

w.schedule = sched
return nil
}

func (w *CronWorker) Start(ctx context.Context) {
go ClockTickUntil(instrumentWorkerMetrics(w.WorkerName(), w.Work), CronWorkerInterval, ctx.Done())
go ClockTickUntil(instrumentWorkerMetrics(w.WorkerName(), w.Work), cronWorkerInterval, ctx.Done())
}

// Work runs a single iteration of synchronizing all JobConfigs.
Expand All @@ -81,7 +106,10 @@ func (w *CronWorker) Work() {
trace := utiltrace.New(
"cron_schedule_all",
)
defer trace.LogIfLong(CronWorkerInterval / 2)

// NOTE(irvinlim): Log if it takes longer than the worker interval to run the
// entire work routine.
defer trace.LogIfLong(cronWorkerInterval)

// Load dynamic configuration.
cfg, err := w.Configs().Cron()
Expand All @@ -90,42 +118,56 @@ func (w *CronWorker) Work() {
return
}

// Get all keys to be flushed.
w.flushKeys()
now := Clock.Now()
maxMissedSchedules := config.DefaultCronMaxMissedSchedules
if spec := cfg.MaxMissedSchedules; spec != nil {
maxMissedSchedules = int(*spec)
}
scheduledCount := make(map[string]int)

// Flush all JobConfigs that had been updated.
w.refreshUpdatedJobConfigs(now)
trace.Step("Flushing of JobConfig updates done")

// Get all JobConfigs from the cache.
jobConfigList, err := w.jobconfigInformer.Lister().JobConfigs(metav1.NamespaceAll).List(labels.Everything())
if err != nil {
klog.ErrorS(err, "croncontroller: list JobConfig error", "worker", w.WorkerName())
return
}
trace.Step("List all JobConfigs done")
var scheduled uint64
for {
// Get the next job config that is due for scheduling, otherwise return early.
key, ts, ok := w.schedule.Pop(Clock.Now())
if !ok {
break
}

// Create cron parser instance
parser := cronparser.NewParser(cfg)
// Should not happen.
if ts.IsZero() {
continue
}

// Sync each job config.
// TODO(irvinlim): Theoretically it is more computationally efficient to use a
// heap instead of iterating all job configs for scheduling.
for _, jobConfig := range jobConfigList {
if err := w.syncOne(jobConfig, cfg, parser); err != nil {
klog.ErrorS(err, "croncontroller: sync JobConfig error",
// Enqueue the job.
if err := w.syncOne(now, key, ts, scheduledCount, maxMissedSchedules); err != nil {
klog.ErrorS(err, "croncontroller: cannot sync single jobconfig to be scheduled",
"worker", w.WorkerName(),
"namespace", jobConfig.GetNamespace(),
"name", jobConfig.GetName(),
"key", key,
"scheduleTime", ts,
)
continue
}

scheduled++
trace.Step("Sync JobConfig to be scheduled")
}

if scheduled > 0 {
klog.V(4).InfoS("croncontroller: successfully scheduled jobs", "count", scheduled)
}
trace.Step("Sync all JobConfigs done")
}

// flushKeys will read all keys to be flushed, and flush it from the nextScheduleTime precomputed map.
func (w *CronWorker) flushKeys() {
// refreshUpdatedJobConfigs will read all keys to be flushed, and recompute the
// next schedule time for all job configs that were updated.
func (w *CronWorker) refreshUpdatedJobConfigs(now time.Time) {
flushes := 0
defer func() {
if flushes > 0 {
klog.V(4).InfoS("croncontroller: flushed all job configs that need updating",
klog.V(4).InfoS("croncontroller: refreshed all job configs that need updating",
"worker", w.WorkerName(),
"len", flushes,
)
Expand All @@ -136,8 +178,24 @@ func (w *CronWorker) flushKeys() {
for flushes < 1000 {
select {
case jobConfig := <-w.updatedConfigs:
w.schedule.FlushNextScheduleTime(jobConfig)
flushes++

// Delete and add it back to the heap. We use the current time as the reference
// time, assuming that its previous schedule time is in the future.
if err := w.schedule.Delete(jobConfig); err != nil {
klog.ErrorS(err, "croncontroller: cannot remove outdated job config from heap",
"namespace", jobConfig.Namespace,
"name", jobConfig.Name,
)
continue
}
if _, err := w.schedule.Bump(jobConfig, now); err != nil {
klog.ErrorS(err, "croncontroller: cannot bump updated job config in heap",
"namespace", jobConfig.Namespace,
"name", jobConfig.Name,
)
continue
}
default:
// Nothing more to flush.
return
Expand All @@ -146,88 +204,65 @@ func (w *CronWorker) flushKeys() {
}

// syncOne reconciles a single JobConfig and enqueues Jobs to be created.
func (w *CronWorker) syncOne(
jobConfig *execution.JobConfig,
cfg *configv1alpha1.CronExecutionConfig,
parser *cronparser.Parser,
) error {
schedule := jobConfig.Spec.Schedule
if schedule == nil || schedule.Disabled || schedule.Cron == nil || len(schedule.Cron.Expression) == 0 {
return nil
}

namespacedName, err := cache.MetaNamespaceKeyFunc(jobConfig)
if err != nil {
return errors.Wrapf(err, "cannot get namespaced name")
}

expr, err := parser.Parse(schedule.Cron.Expression, namespacedName)
func (w *CronWorker) syncOne(now time.Time, key string, ts time.Time, counts map[string]int, maxCount int) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return errors.Wrapf(err, "cannot parse cron schedule: %v", schedule.Cron.Expression)
return errors.Wrapf(err, "cannot split namespace and name from key %v", key)
}

// Get time in configured timezone.
tzstring := w.getTimezone(schedule.Cron, cfg)
timezone, err := tzutils.ParseTimezone(tzstring)
jobConfig, err := w.jobconfigInformer.Lister().JobConfigs(namespace).Get(name)
if err != nil {
return errors.Wrapf(err, "cannot parse timezone: %v", tzstring)
return errors.Wrapf(err, "cannot get jobconfig %v", key)
}
now := Clock.Now().In(timezone)

var maxMissedSchedules int
if spec := cfg.MaxMissedSchedules; spec != nil {
maxMissedSchedules = int(*spec)
}

// Get next scheduled time repeatedly and enqueue for each scheduled time.
// This helps to prevent missed executions (maybe due to worker stuck).
for i := 0; i < maxMissedSchedules; i++ {
next := w.schedule.GetNextScheduleTime(jobConfig, now, expr)
// Check that it does not exceed maximum backschedule limit.
if counts[key] >= maxCount {
// Bump to next schedule time.
newNext, err := w.schedule.Bump(jobConfig, now)
if err != nil {
return errors.Wrapf(err, "cannot bump to new next schedule time")
}

klog.V(6).InfoS("croncontroller: get next schedule for job config",
"namespace", jobConfig.Namespace,
"name", jobConfig.Name,
"now", now,
"next", next,
klog.ErrorS(
errors.New("missed too many schedules"),
"croncontroller: skipping back-scheduling for JobConfig",
"worker", w.WorkerName(),
"namespace", jobConfig.GetNamespace(),
"name", jobConfig.GetName(),
"maxCount", maxCount,
"newNextSchedule", newNext,
)

// There is no next schedule time.
if next.IsZero() {
return nil
}
return nil
}

// Skip if the next schedule is in the future.
if next.After(now) {
return nil
}
// Update counter.
counts[key]++

// Enqueue the job.
if err := w.handler.EnqueueJobConfig(jobConfig, next); err != nil {
return errors.Wrapf(err, "cannot enqueue job for %v", next)
}
// Enqueue the job.
if err := w.handler.EnqueueJobConfig(jobConfig, ts); err != nil {
return errors.Wrapf(err, "cannot enqueue job for %v", ts)
}

// Bump the next schedule time for the job config.
w.schedule.BumpNextScheduleTime(jobConfig, next, expr)
klog.V(3).InfoS("croncontroller: scheduled job",
"worker", w.WorkerName(),
"namespace", jobConfig.GetNamespace(),
"name", jobConfig.GetName(),
"scheduleTime", ts,
)

klog.V(2).InfoS("croncontroller: scheduled job by cron",
"worker", w.WorkerName(),
"namespace", jobConfig.GetNamespace(),
"name", jobConfig.GetName(),
"cron_schedule", schedule.Cron.Expression,
"schedule_time", next,
"timezone", timezone,
)
// Bump to next schedule time.
newNext, err := w.schedule.Bump(jobConfig, ts)
if err != nil {
return errors.Wrapf(err, "cannot bump to new next schedule time")
}

// Reached here, means we missed more too many schedules.
// Just bump next scheduled time.
err = fmt.Errorf("missed too many schedules, maximum=%v", maxMissedSchedules)
klog.ErrorS(err, "croncontroller: skipping back-scheduling for JobConfig",
klog.V(6).InfoS("croncontroller: bumped job config's next schedule time",
"worker", w.WorkerName(),
"namespace", jobConfig.GetNamespace(),
"name", jobConfig.GetName(),
"previous", ts,
"next", newNext,
)
w.schedule.BumpNextScheduleTime(jobConfig, now, expr)

return nil
}
Expand Down
Loading