From 4dd41297ba7ed72b5058cda4cea65119a9e81b38 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 28 Mar 2024 23:43:10 -0500 Subject: [PATCH 1/3] fix nextRun with singleton mode reporting incorrect time --- job.go | 15 ++++++++++++--- job_test.go | 39 +++++++++++++++++++++++++++++++++++++++ scheduler.go | 22 ++++++++++++++++++---- 3 files changed, 69 insertions(+), 7 deletions(-) diff --git a/job.go b/job.go index 7641dbe8..4d3e2f99 100644 --- a/job.go +++ b/job.go @@ -24,8 +24,12 @@ type internalJob struct { name string tags []string jobSchedule - lastScheduledRun time.Time - nextScheduled time.Time + lastScheduledRun time.Time + + // as some jobs may queue up, it's possible to + // have multiple nextScheduled times + nextScheduled []time.Time + lastRun time.Time function any parameters []any @@ -894,7 +898,12 @@ func (j job) NextRun() (time.Time, error) { if ij == nil || ij.id == uuid.Nil { return time.Time{}, ErrJobNotFound } - return ij.nextScheduled, nil + if len(ij.nextScheduled) == 0 { + return time.Time{}, nil + } + // the first element is the next scheduled run with subsequent + // runs following after in the slice + return ij.nextScheduled[0], nil } func (j job) Tags() []string { diff --git a/job_test.go b/job_test.go index 84c428ab..ee6f0439 100644 --- a/job_test.go +++ b/job_test.go @@ -492,3 +492,42 @@ func TestWithEventListeners(t *testing.T) { }) } } + +func TestJob_NextRun(t *testing.T) { + testTime := time.Now() + + s := newTestScheduler(t) + + // run a job every 10 milliseconds that starts 10 milliseconds after the current time + j, err := s.NewJob( + DurationJob( + 10*time.Millisecond, + ), + NewTask( + func() {}, + ), + WithStartAt(WithStartDateTime(testTime.Add(10*time.Millisecond))), + WithSingletonMode(LimitModeReschedule), + ) + require.NoError(t, err) + + s.Start() + nextRun, err := j.NextRun() + require.NoError(t, err) + + // `NextRun` should report `testTime.Add(10*time.Millisecond)` + assert.Equal(t, testTime.Add(10*time.Millisecond), nextRun) + + // sleep for 11ms to wait for the next job + time.Sleep(11 * time.Millisecond) + + nextRun, err = j.NextRun() + assert.NoError(t, err) + + // `NextRun` should report a time 20 milliseconds after `testTime`, but instead reports a value that is `30ms` after + assert.Equal(t, testTime.Add(20*time.Millisecond), nextRun) + assert.Equal(t, 20*time.Millisecond, nextRun.Sub(testTime)) + + err = s.Shutdown() + require.NoError(t, err) +} diff --git a/scheduler.go b/scheduler.go index dd1323c7..cf093a79 100644 --- a/scheduler.go +++ b/scheduler.go @@ -298,7 +298,12 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { // so we don't need to reschedule it. return } - j.lastScheduledRun = j.nextScheduled + if len(j.nextScheduled) > 0 { + // always grab the last element in the slice as that is the furthest + // out in the future and the time from which we want to calculate + // the subsequent next run time. + j.lastScheduledRun = j.nextScheduled[len(j.nextScheduled)-1] + } next := j.next(j.lastScheduledRun) if next.IsZero() { @@ -316,7 +321,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { next = j.next(next) } } - j.nextScheduled = next + j.nextScheduled = append(j.nextScheduled, next) j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { // set the actual timer on the job here and listen for // shut down events so that the job doesn't attempt to @@ -340,6 +345,15 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { return } + var newNextScheduled []time.Time + for _, t := range j.nextScheduled { + if t.Before(s.now()) { + continue + } + newNextScheduled = append(newNextScheduled, t) + } + j.nextScheduled = newNextScheduled + // if the job has a limited number of runs set, we need to // check how many runs have occurred and stop running this // job if it has reached the limit. @@ -400,7 +414,7 @@ func (s *scheduler) selectNewJob(in newJobIn) { } }) } - j.nextScheduled = next + j.nextScheduled = append(j.nextScheduled, next) } s.jobs[j.id] = j @@ -451,7 +465,7 @@ func (s *scheduler) selectStart() { } }) } - j.nextScheduled = next + j.nextScheduled = append(j.nextScheduled, next) s.jobs[id] = j } select { From 700e016227506a2de6de2916dc3d8a2bb22b1428 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 29 Mar 2024 10:07:13 -0500 Subject: [PATCH 2/3] only remove past if >1, sort next scheduled --- job_test.go | 15 ++++++--------- scheduler.go | 19 +++++++++++++------ scheduler_test.go | 2 +- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/job_test.go b/job_test.go index ee6f0439..ee92938f 100644 --- a/job_test.go +++ b/job_test.go @@ -501,12 +501,12 @@ func TestJob_NextRun(t *testing.T) { // run a job every 10 milliseconds that starts 10 milliseconds after the current time j, err := s.NewJob( DurationJob( - 10*time.Millisecond, + 100*time.Millisecond, ), NewTask( func() {}, ), - WithStartAt(WithStartDateTime(testTime.Add(10*time.Millisecond))), + WithStartAt(WithStartDateTime(testTime.Add(100*time.Millisecond))), WithSingletonMode(LimitModeReschedule), ) require.NoError(t, err) @@ -515,18 +515,15 @@ func TestJob_NextRun(t *testing.T) { nextRun, err := j.NextRun() require.NoError(t, err) - // `NextRun` should report `testTime.Add(10*time.Millisecond)` - assert.Equal(t, testTime.Add(10*time.Millisecond), nextRun) + assert.Equal(t, testTime.Add(100*time.Millisecond), nextRun) - // sleep for 11ms to wait for the next job - time.Sleep(11 * time.Millisecond) + time.Sleep(150 * time.Millisecond) nextRun, err = j.NextRun() assert.NoError(t, err) - // `NextRun` should report a time 20 milliseconds after `testTime`, but instead reports a value that is `30ms` after - assert.Equal(t, testTime.Add(20*time.Millisecond), nextRun) - assert.Equal(t, 20*time.Millisecond, nextRun.Sub(testTime)) + assert.Equal(t, testTime.Add(200*time.Millisecond), nextRun) + assert.Equal(t, 200*time.Millisecond, nextRun.Sub(testTime)) err = s.Shutdown() require.NoError(t, err) diff --git a/scheduler.go b/scheduler.go index cf093a79..eee61c2f 100644 --- a/scheduler.go +++ b/scheduler.go @@ -302,6 +302,9 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { // always grab the last element in the slice as that is the furthest // out in the future and the time from which we want to calculate // the subsequent next run time. + slices.SortStableFunc(j.nextScheduled, func(a, b time.Time) int { + return a.Compare(b) + }) j.lastScheduledRun = j.nextScheduled[len(j.nextScheduled)-1] } @@ -345,14 +348,18 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { return } - var newNextScheduled []time.Time - for _, t := range j.nextScheduled { - if t.Before(s.now()) { - continue + // if the job has more than one nextScheduled time, + // we need to remove any that are in the past. + if len(j.nextScheduled) > 1 { + var newNextScheduled []time.Time + for _, t := range j.nextScheduled { + if t.Before(s.now()) { + continue + } + newNextScheduled = append(newNextScheduled, t) } - newNextScheduled = append(newNextScheduled, t) + j.nextScheduled = newNextScheduled } - j.nextScheduled = newNextScheduled // if the job has a limited number of runs set, we need to // check how many runs have occurred and stop running this diff --git a/scheduler_test.go b/scheduler_test.go index ec69faec..1f521f53 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -1622,7 +1622,7 @@ func TestScheduler_RunJobNow(t *testing.T) { WithSingletonMode(LimitModeReschedule), }, func() time.Duration { - return 20 * time.Second + return 10 * time.Second }, 1, }, From 862dafebe9d4d61e6aaca441be134d6341f29b21 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Fri, 5 Apr 2024 20:47:32 -0500 Subject: [PATCH 3/3] update test, remove no longer needed lastScheduledRun --- job.go | 1 - job_test.go | 70 +++++++++++++++++++++++++++++++++------------------- scheduler.go | 5 ++-- 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/job.go b/job.go index 4d3e2f99..1be7c807 100644 --- a/job.go +++ b/job.go @@ -24,7 +24,6 @@ type internalJob struct { name string tags []string jobSchedule - lastScheduledRun time.Time // as some jobs may queue up, it's possible to // have multiple nextScheduled times diff --git a/job_test.go b/job_test.go index ee92938f..c089f6e7 100644 --- a/job_test.go +++ b/job_test.go @@ -494,37 +494,57 @@ func TestWithEventListeners(t *testing.T) { } func TestJob_NextRun(t *testing.T) { - testTime := time.Now() + tests := []struct { + name string + f func() + }{ + { + "simple", + func() {}, + }, + { + "sleep 3 seconds", + func() { + time.Sleep(300 * time.Millisecond) + }, + }, + } - s := newTestScheduler(t) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testTime := time.Now() - // run a job every 10 milliseconds that starts 10 milliseconds after the current time - j, err := s.NewJob( - DurationJob( - 100*time.Millisecond, - ), - NewTask( - func() {}, - ), - WithStartAt(WithStartDateTime(testTime.Add(100*time.Millisecond))), - WithSingletonMode(LimitModeReschedule), - ) - require.NoError(t, err) + s := newTestScheduler(t) - s.Start() - nextRun, err := j.NextRun() - require.NoError(t, err) + // run a job every 10 milliseconds that starts 10 milliseconds after the current time + j, err := s.NewJob( + DurationJob( + 100*time.Millisecond, + ), + NewTask( + func() {}, + ), + WithStartAt(WithStartDateTime(testTime.Add(100*time.Millisecond))), + WithSingletonMode(LimitModeReschedule), + ) + require.NoError(t, err) - assert.Equal(t, testTime.Add(100*time.Millisecond), nextRun) + s.Start() + nextRun, err := j.NextRun() + require.NoError(t, err) - time.Sleep(150 * time.Millisecond) + assert.Equal(t, testTime.Add(100*time.Millisecond), nextRun) - nextRun, err = j.NextRun() - assert.NoError(t, err) + time.Sleep(150 * time.Millisecond) - assert.Equal(t, testTime.Add(200*time.Millisecond), nextRun) - assert.Equal(t, 200*time.Millisecond, nextRun.Sub(testTime)) + nextRun, err = j.NextRun() + assert.NoError(t, err) - err = s.Shutdown() - require.NoError(t, err) + assert.Equal(t, testTime.Add(200*time.Millisecond), nextRun) + assert.Equal(t, 200*time.Millisecond, nextRun.Sub(testTime)) + + err = s.Shutdown() + require.NoError(t, err) + }) + } } diff --git a/scheduler.go b/scheduler.go index eee61c2f..7980349a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -298,6 +298,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { // so we don't need to reschedule it. return } + var scheduleFrom time.Time if len(j.nextScheduled) > 0 { // always grab the last element in the slice as that is the furthest // out in the future and the time from which we want to calculate @@ -305,10 +306,10 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { slices.SortStableFunc(j.nextScheduled, func(a, b time.Time) int { return a.Compare(b) }) - j.lastScheduledRun = j.nextScheduled[len(j.nextScheduled)-1] + scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1] } - next := j.next(j.lastScheduledRun) + next := j.next(scheduleFrom) if next.IsZero() { // the job's next function will return zero for OneTime jobs. // since they are one time only, they do not need rescheduling.