Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix panic in PeriodicJobEnqueuer #73

Merged
merged 2 commits into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fixed a panic in the periodic job enqueuer caused by sometimes trying to reset a `time.Ticker` with a negative or zero duration. Fixed in PR #73.

## [0.0.9] - 2023-11-23

### Fixed
Expand Down
25 changes: 17 additions & 8 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,18 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {

s.TestSignals.EnteredLoop.Signal(struct{}{})

tickerUntilNextRun := time.NewTicker(1 * time.Hour) // duration is Reset immediately below
defer tickerUntilNextRun.Stop()
timerUntilNextRun := time.NewTimer(0) // duration is Reset immediately below
<-timerUntilNextRun.C

loop:
for {
tickerUntilNextRun.Reset(s.timeUntilNextRun())
// We know it is safe to directly call Reset because the only way we can
// get to this line is if the timer has already fired and been received
// from:
timerUntilNextRun.Reset(s.timeUntilNextRun())

select {
case <-tickerUntilNextRun.C:
case <-timerUntilNextRun.C:
var insertParamsMany []*dbadapter.JobInsertParams

now := s.TimeNowUTC()
Expand All @@ -165,6 +168,12 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
s.insertBatch(ctx, insertParamsMany)

case <-ctx.Done():
// Clean up timer resources. We know it has _not_ received from the
// timer since its last reset because that would have led us to the case
// above instead of here.
if !timerUntilNextRun.Stop() {
<-timerUntilNextRun.C
}
Comment on lines +171 to +176
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this here to ensure we're stopping and draining the timer before returning from this loop.

break loop
}
}
Expand Down Expand Up @@ -213,9 +222,9 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
)

for _, periodicJob := range s.periodicJobs {
// In case we detect a job that should've run before now, immediately
// short circuit with a zero. In addition to being marginally faster, it
// prevents us accidentally returning a negative number below.
// In case we detect a job that should've run before now, immediately short
// circuit with a 0 duration. This avoids needlessly iterating through the
// rest of the loop when we already know we're overdue for the next job.
if periodicJob.nextRunAt.Before(now) {
return 0
}
Expand All @@ -225,5 +234,5 @@ func (s *PeriodicJobEnqueuer) timeUntilNextRun() time.Duration {
}
}

return firstNextRunAt.Sub(s.TimeNowUTC())
return firstNextRunAt.Sub(now)
}
32 changes: 32 additions & 0 deletions internal/maintenance/periodic_job_enqueuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,38 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
require.Equal(t, 7*24*time.Hour, svc.timeUntilNextRun())
})

// To ensure we are protected against runs that are supposed to have already happened,
// this test uses a totally-not-safe schedule to enqueue every 0.5ms.
t.Run("RapidScheduling", func(t *testing.T) {
t.Parallel()

svc, _ := setup(t)

svc.periodicJobs = []*PeriodicJob{
{ScheduleFunc: periodicIntervalSchedule(time.Microsecond), ConstructorFunc: jobConstructorFunc("periodic_job_1us")},
}
// make a longer list of jobs so the loop has to run for longer
for i := 1; i < 100; i++ {
svc.periodicJobs = append(svc.periodicJobs,
&PeriodicJob{
ScheduleFunc: periodicIntervalSchedule(time.Duration(i) * time.Hour),
ConstructorFunc: jobConstructorFunc(fmt.Sprintf("periodic_job_%dh", i)),
},
)
}

require.NoError(t, svc.Start(ctx))

svc.TestSignals.EnteredLoop.WaitOrTimeout()

periodicJobs := make([]*PeriodicJob, len(svc.periodicJobs))
copy(periodicJobs, svc.periodicJobs)

for i := 0; i < 100; i++ {
svc.TestSignals.InsertedJobs.WaitOrTimeout()
}
})

t.Run("NoJobsConfigured", func(t *testing.T) {
t.Parallel()

Expand Down
Loading