Skip to content

Commit

Permalink
fix nextRun with singleton mode reporting incorrect time (#705)
Browse files Browse the repository at this point in the history
* fix nextRun with singleton mode reporting incorrect time

* only remove past if >1, sort next scheduled

* update test, remove no longer needed lastScheduledRun
  • Loading branch information
JohnRoesler authored Apr 6, 2024
1 parent f021cc4 commit 3b653b9
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 10 deletions.
14 changes: 11 additions & 3 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ type internalJob struct {
name string
tags []string
jobSchedule
lastScheduledRun time.Time
nextScheduled time.Time

// as some jobs may queue up, it's possible to
// have multiple nextScheduled times
nextScheduled []time.Time

lastRun time.Time
function any
parameters []any
Expand Down Expand Up @@ -894,7 +897,12 @@ func (j job) NextRun() (time.Time, error) {
if ij == nil || ij.id == uuid.Nil {
return time.Time{}, ErrJobNotFound
}
return ij.nextScheduled, nil
if len(ij.nextScheduled) == 0 {
return time.Time{}, nil
}
// the first element is the next scheduled run with subsequent
// runs following after in the slice
return ij.nextScheduled[0], nil
}

func (j job) Tags() []string {
Expand Down
56 changes: 56 additions & 0 deletions job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,3 +492,59 @@ func TestWithEventListeners(t *testing.T) {
})
}
}

func TestJob_NextRun(t *testing.T) {
tests := []struct {
name string
f func()
}{
{
"simple",
func() {},
},
{
"sleep 3 seconds",
func() {
time.Sleep(300 * time.Millisecond)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testTime := time.Now()

s := newTestScheduler(t)

// run a job every 10 milliseconds that starts 10 milliseconds after the current time
j, err := s.NewJob(
DurationJob(
100*time.Millisecond,
),
NewTask(
func() {},
),
WithStartAt(WithStartDateTime(testTime.Add(100*time.Millisecond))),
WithSingletonMode(LimitModeReschedule),
)
require.NoError(t, err)

s.Start()
nextRun, err := j.NextRun()
require.NoError(t, err)

assert.Equal(t, testTime.Add(100*time.Millisecond), nextRun)

time.Sleep(150 * time.Millisecond)

nextRun, err = j.NextRun()
assert.NoError(t, err)

assert.Equal(t, testTime.Add(200*time.Millisecond), nextRun)
assert.Equal(t, 200*time.Millisecond, nextRun.Sub(testTime))

err = s.Shutdown()
require.NoError(t, err)
})
}
}
34 changes: 28 additions & 6 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,18 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
// so we don't need to reschedule it.
return
}
j.lastScheduledRun = j.nextScheduled

next := j.next(j.lastScheduledRun)
var scheduleFrom time.Time
if len(j.nextScheduled) > 0 {
// always grab the last element in the slice as that is the furthest
// out in the future and the time from which we want to calculate
// the subsequent next run time.
slices.SortStableFunc(j.nextScheduled, func(a, b time.Time) int {
return a.Compare(b)
})
scheduleFrom = j.nextScheduled[len(j.nextScheduled)-1]
}

next := j.next(scheduleFrom)
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
// since they are one time only, they do not need rescheduling.
Expand All @@ -316,7 +325,7 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
next = j.next(next)
}
}
j.nextScheduled = next
j.nextScheduled = append(j.nextScheduled, next)
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
// set the actual timer on the job here and listen for
// shut down events so that the job doesn't attempt to
Expand All @@ -340,6 +349,19 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
return
}

// if the job has more than one nextScheduled time,
// we need to remove any that are in the past.
if len(j.nextScheduled) > 1 {
var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
newNextScheduled = append(newNextScheduled, t)
}
j.nextScheduled = newNextScheduled
}

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
// job if it has reached the limit.
Expand Down Expand Up @@ -400,7 +422,7 @@ func (s *scheduler) selectNewJob(in newJobIn) {
}
})
}
j.nextScheduled = next
j.nextScheduled = append(j.nextScheduled, next)
}

s.jobs[j.id] = j
Expand Down Expand Up @@ -451,7 +473,7 @@ func (s *scheduler) selectStart() {
}
})
}
j.nextScheduled = next
j.nextScheduled = append(j.nextScheduled, next)
s.jobs[id] = j
}
select {
Expand Down
2 changes: 1 addition & 1 deletion scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
WithSingletonMode(LimitModeReschedule),
},
func() time.Duration {
return 20 * time.Second
return 10 * time.Second
},
1,
},
Expand Down

0 comments on commit 3b653b9

Please sign in to comment.