diff --git a/job.go b/job.go index 7641dbe8..1be7c807 100644 --- a/job.go +++ b/job.go @@ -24,8 +24,11 @@ type internalJob struct { name string tags []string jobSchedule - lastScheduledRun time.Time - nextScheduled time.Time + + // as some jobs may queue up, it's possible to + // have multiple nextScheduled times + nextScheduled []time.Time + lastRun time.Time function any parameters []any @@ -894,7 +897,12 @@ func (j job) NextRun() (time.Time, error) { if ij == nil || ij.id == uuid.Nil { return time.Time{}, ErrJobNotFound } - return ij.nextScheduled, nil + if len(ij.nextScheduled) == 0 { + return time.Time{}, nil + } + // the first element is the next scheduled run with subsequent + // runs following after in the slice + return ij.nextScheduled[0], nil } func (j job) Tags() []string { diff --git a/job_test.go b/job_test.go index 84c428ab..c089f6e7 100644 --- a/job_test.go +++ b/job_test.go @@ -492,3 +492,59 @@ func TestWithEventListeners(t *testing.T) { }) } } + +func TestJob_NextRun(t *testing.T) { + tests := []struct { + name string + f func() + }{ + { + "simple", + func() {}, + }, + { + "sleep 3 seconds", + func() { + time.Sleep(300 * time.Millisecond) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testTime := time.Now() + + s := newTestScheduler(t) + + // run a job every 10 milliseconds that starts 10 milliseconds after the current time + j, err := s.NewJob( + DurationJob( + 100*time.Millisecond, + ), + NewTask( + func() {}, + ), + WithStartAt(WithStartDateTime(testTime.Add(100*time.Millisecond))), + WithSingletonMode(LimitModeReschedule), + ) + require.NoError(t, err) + + s.Start() + nextRun, err := j.NextRun() + require.NoError(t, err) + + assert.Equal(t, testTime.Add(100*time.Millisecond), nextRun) + + time.Sleep(150 * time.Millisecond) + + nextRun, err = j.NextRun() + assert.NoError(t, err) + + assert.Equal(t, testTime.Add(200*time.Millisecond), nextRun) + assert.Equal(t, 200*time.Millisecond, nextRun.Sub(testTime)) + + err = s.Shutdown() + require.NoError(t, err) + }) + } +} diff --git a/scheduler.go b/scheduler.go index dd1323c7..7980349a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -298,9 +298,18 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { // so we don't need to reschedule it. return } - j.lastScheduledRun = j.nextScheduled - - next := j.next(j.lastScheduledRun) + var scheduleFrom time.Time + if len(j.nextScheduled) > 0 { + // always grab the last element in the slice as that is the furthest + // out in the future and the time from which we want to calculate + // the subsequent next run time. + slices.SortStableFunc(j.nextScheduled, func(a, b time.Time) int { + return a.Compare(b) + }) + scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1] + } + + next := j.next(scheduleFrom) if next.IsZero() { // the job's next function will return zero for OneTime jobs. // since they are one time only, they do not need rescheduling. @@ -316,7 +325,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { next = j.next(next) } } - j.nextScheduled = next + j.nextScheduled = append(j.nextScheduled, next) j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { // set the actual timer on the job here and listen for // shut down events so that the job doesn't attempt to @@ -340,6 +349,19 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { return } + // if the job has more than one nextScheduled time, + // we need to remove any that are in the past. + if len(j.nextScheduled) > 1 { + var newNextScheduled []time.Time + for _, t := range j.nextScheduled { + if t.Before(s.now()) { + continue + } + newNextScheduled = append(newNextScheduled, t) + } + j.nextScheduled = newNextScheduled + } + // if the job has a limited number of runs set, we need to // check how many runs have occurred and stop running this // job if it has reached the limit. @@ -400,7 +422,7 @@ func (s *scheduler) selectNewJob(in newJobIn) { } }) } - j.nextScheduled = next + j.nextScheduled = append(j.nextScheduled, next) } s.jobs[j.id] = j @@ -451,7 +473,7 @@ func (s *scheduler) selectStart() { } }) } - j.nextScheduled = next + j.nextScheduled = append(j.nextScheduled, next) s.jobs[id] = j } select { diff --git a/scheduler_test.go b/scheduler_test.go index ec69faec..1f521f53 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -1622,7 +1622,7 @@ func TestScheduler_RunJobNow(t *testing.T) { WithSingletonMode(LimitModeReschedule), }, func() time.Duration { - return 20 * time.Second + return 10 * time.Second }, 1, },