Skip to content

Commit

Permalink
Definitively fix race in periodic job enqueuer (#438)
Browse files Browse the repository at this point in the history
Related to #409. I think I finally cracked it. For real this time. No
workarounds, no hacks.

If you look at the "after start" tests, they look roughly like this,
start the client, then add some jobs:

    startService(t, svc)

    svc.Add(
        &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
    )
    svc.Add(
        &PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
    )

This would usually work as expected, with the enqueuer starting up, then
jobs added, then those new jobs scheduled.

_However_, what can happen is that the "after start" jobs are actually
added in _before_ the enqueuer has finished entering its run loop. So,
the service is starting up, makes a first call to `timeUntilNextRun`,
and because the newly added periodic jobs have not been assigned a first
run time, the code below would get a `timerUntilNextRun` of zero, which
would cause it to enter the loop and run an insert iteration
immediately, despite the jobs not actually being appropriate for
scheduling as of yet:

    timerUntilNextRun := time.NewTimer(s.timeUntilNextRun())

    for {
            select {
            case <-timerUntilNextRun.C:

The fix is to modify `timeUntilNextRun` so that it ignores periodic jobs
that have not yet been scheduled:

    for _, periodicJob := range s.periodicJobs {
        // Jobs may have been added after service start, but before this
        // function runs for the first time. They're not scheduled properly yet,
        // but they will be soon, at which point this function will run again.
        // Skip them for now.
        if periodicJob.nextRunAt.IsZero() {
                continue
        }

Those jobs will be scheduled soon, then `timeUntilNextRun` called again,
and the expected "time until" values returned.

I'm confident enough in the fix that I've reverted most of #416 so the
5s jobs go back to 500ms.

Fixes #409.
  • Loading branch information
brandur authored Jul 9, 2024
1 parent 79c093f commit b44f59a
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 26 deletions.
23 changes: 20 additions & 3 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {

s.insertBatch(ctx, insertParamsMany, insertParamsUnique)

if len(insertParamsMany) > 0 {
if len(insertParamsMany) > 0 || len(insertParamsUnique) > 0 {
s.Logger.DebugContext(ctx, s.Name+": Inserted RunOnStart jobs", "num_jobs", len(insertParamsMany)+len(insertParamsUnique))
}
}
Expand Down Expand Up @@ -300,7 +300,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
defer s.mu.RUnlock()

for _, periodicJob := range s.periodicJobs {
if !periodicJob.nextRunAt.Before(nowWithMargin) {
if periodicJob.nextRunAt.IsZero() || !periodicJob.nextRunAt.Before(nowWithMargin) {
continue
}

Expand Down Expand Up @@ -405,6 +405,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany
s.Logger.ErrorContext(ctx, s.Name+": Error committing transaction", "error", err.Error())
return
}

s.TestSignals.InsertedJobs.Signal(struct{}{})
}

Expand All @@ -427,14 +428,16 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c
return insertParams, uniqueOpts, true
}

const periodicJobEnqueuerVeryLongDuration = 24 * time.Hour

func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
s.mu.RLock()
defer s.mu.RUnlock()

// With no configured jobs, just return a big duration for the loop to block
// on.
if len(s.periodicJobs) < 1 {
return 24 * time.Hour
return periodicJobEnqueuerVeryLongDuration
}

var (
Expand All @@ -443,6 +446,14 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
)

for _, periodicJob := range s.periodicJobs {
// Jobs may have been added after service start, but before this
// function runs for the first time. They're not scheduled properly yet,
// but they will be soon, at which point this function will run again.
// Skip them for now.
if periodicJob.nextRunAt.IsZero() {
continue
}

// In case we detect a job that should've run before now, immediately short
// circuit with a 0 duration. This avoids needlessly iterating through the
// rest of the loop when we already know we're overdue for the next job.
Expand All @@ -455,5 +466,11 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
}
}

// Only encountered unscheduled jobs (see comment above). Don't schedule
// anything for now.
if firstNextRunAt.IsZero() {
return periodicJobEnqueuerVeryLongDuration
}

return firstNextRunAt.Sub(now)
}
92 changes: 69 additions & 23 deletions internal/maintenance/periodic_job_enqueuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,15 +366,15 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
startService(t, svc)

svc.Add(
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
)
svc.Add(
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
)

svc.TestSignals.InsertedJobs.WaitOrTimeout()
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)
})

t.Run("AddManyAfterStart", func(t *testing.T) {
Expand All @@ -385,13 +385,13 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
startService(t, svc)

svc.AddMany([]*PeriodicJob{
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
})

svc.TestSignals.InsertedJobs.WaitOrTimeout()
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)
})

t.Run("ClearAfterStart", func(t *testing.T) {
Expand All @@ -402,20 +402,20 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
startService(t, svc)

handles := svc.AddMany([]*PeriodicJob{
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
})

svc.TestSignals.InsertedJobs.WaitOrTimeout()
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)

svc.Clear()

require.Empty(t, svc.periodicJobs)

handleAfterClear := svc.Add(
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_new", false)},
&PeriodicJob{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_new", false)},
)

// Handles are not reused.
Expand All @@ -431,13 +431,13 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
startService(t, svc)

handles := svc.AddMany([]*PeriodicJob{
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
})

svc.TestSignals.InsertedJobs.WaitOrTimeout()
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)

svc.Remove(handles[1])

Expand All @@ -452,15 +452,15 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
startService(t, svc)

handles := svc.AddMany([]*PeriodicJob{
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s", false)},
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_other", false)},
{ScheduleFunc: periodicIntervalSchedule(5 * time.Second), ConstructorFunc: jobConstructorFunc("periodic_job_5s_start", false), RunOnStart: true},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms", false)},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_other", false)},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorFunc("periodic_job_500ms_start", false), RunOnStart: true},
})

svc.TestSignals.InsertedJobs.WaitOrTimeout()
requireNJobs(t, bundle.exec, "periodic_job_5s", 0)
requireNJobs(t, bundle.exec, "periodic_job_5s_other", 0)
requireNJobs(t, bundle.exec, "periodic_job_5s_start", 1)
requireNJobs(t, bundle.exec, "periodic_job_500ms", 0)
requireNJobs(t, bundle.exec, "periodic_job_500ms_other", 0)
requireNJobs(t, bundle.exec, "periodic_job_500ms_start", 1)

svc.RemoveMany([]rivertype.PeriodicJobHandle{handles[1], handles[2]})

Expand Down Expand Up @@ -597,4 +597,50 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
// Should be no jobs in the DB either:
requireNJobs(t, bundle.exec, "unique_periodic_job_500ms", 0)
})

t.Run("TimeUntilNextRun", func(t *testing.T) {
t.Parallel()

svc, _ := setup(t)

now := svc.Time.StubNowUTC(time.Now())

// no jobs
require.Equal(t, periodicJobEnqueuerVeryLongDuration, svc.timeUntilNextRun())

svc.periodicJobs = map[rivertype.PeriodicJobHandle]*PeriodicJob{
1: {nextRunAt: now.Add(2 * time.Hour)},
2: {nextRunAt: now.Add(1 * time.Hour)},
3: {nextRunAt: now.Add(3 * time.Hour)},
}

// pick job with soonest next run
require.Equal(t, 1*time.Hour, svc.timeUntilNextRun())

svc.periodicJobs = map[rivertype.PeriodicJobHandle]*PeriodicJob{
1: {nextRunAt: now.Add(2 * time.Hour)},
2: {nextRunAt: now.Add(-1 * time.Hour)},
3: {nextRunAt: now.Add(3 * time.Hour)},
}

// job is already behind so time until next run is 0
require.Equal(t, time.Duration(0), svc.timeUntilNextRun())

svc.periodicJobs = map[rivertype.PeriodicJobHandle]*PeriodicJob{
1: {},
2: {},
}

// jobs not scheduled yet
require.Equal(t, periodicJobEnqueuerVeryLongDuration, svc.timeUntilNextRun())

svc.periodicJobs = map[rivertype.PeriodicJobHandle]*PeriodicJob{
1: {},
2: {nextRunAt: now.Add(1 * time.Hour)},
3: {},
}

// pick job with soonest next run amongst some not scheduled yet
require.Equal(t, 1*time.Hour, svc.timeUntilNextRun())
})
}

0 comments on commit b44f59a

Please sign in to comment.