Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-742: bug in NextRun #743

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,18 +357,16 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
return
}

// if the job has more than one nextScheduled time,
// if the job has nextScheduled time in the past,
// we need to remove any that are in the past.
if len(j.nextScheduled) > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
newNextScheduled = append(newNextScheduled, t)
var newNextScheduled []time.Time
for _, t := range j.nextScheduled {
if t.Before(s.now()) {
continue
}
j.nextScheduled = newNextScheduled
newNextScheduled = append(newNextScheduled, t)
}
j.nextScheduled = newNextScheduled

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
Expand Down
96 changes: 89 additions & 7 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand Down Expand Up @@ -1185,6 +1187,92 @@ func TestScheduler_LimitModeAndSingleton(t *testing.T) {
}
}

func TestScheduler_OneTimeJob_DoesNotCleanupNext(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

schedulerStartTime := time.Date(2024, time.April, 3, 4, 5, 0, 0, time.UTC)

tests := []struct {
name string
runAt time.Time
fakeClock clockwork.FakeClock
assertErr require.ErrorAssertionFunc
// asserts things about schedules, advance time and perform new assertions
advanceAndAsserts []func(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how you set up this test!

t *testing.T,
j Job,
clock clockwork.FakeClock,
runs *atomic.Uint32,
)
}{
{
name: "exhausted run do does not cleanup next item",
runAt: time.Date(2024, time.April, 22, 4, 5, 0, 0, time.UTC),
fakeClock: clockwork.NewFakeClockAt(schedulerStartTime),
advanceAndAsserts: []func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32){
func(t *testing.T, j Job, clock clockwork.FakeClock, runs *atomic.Uint32) {
require.Equal(t, uint32(0), runs.Load())

// last not initialized
lastRunAt, err := j.LastRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, lastRunAt)

// next is now
expected := time.Date(2024, time.April, 22, 4, 5, 0, 0, time.UTC)
nextRunAt, err := j.NextRun()
require.NoError(t, err)
require.Equal(t, expected, nextRunAt.UTC())

// advance and eventually run
oneSecondAfterNextRun := expected.Add(1 * time.Second)

clock.Advance(oneSecondAfterNextRun.Sub(schedulerStartTime))
require.Eventually(t, func() bool {
return assert.Equal(t, uint32(1), runs.Load())
}, 3*time.Second, 100*time.Millisecond)

// last was run
lastRunAt, err = j.LastRun()
require.NoError(t, err)
require.WithinDuration(t, expected, lastRunAt, 1*time.Second)

nextRunAt, err = j.NextRun()
require.NoError(t, err)
require.Equal(t, time.Time{}, nextRunAt)
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, WithClock(tt.fakeClock), WithLocation(time.UTC))
t.Cleanup(func() {
require.NoError(t, s.Shutdown())
})

runs := atomic.Uint32{}
j, err := s.NewJob(
OneTimeJob(OneTimeJobStartDateTime(tt.runAt)),
NewTask(func() {
runs.Add(1)
}),
)
if tt.assertErr != nil {
tt.assertErr(t, err)
} else {
require.NoError(t, err)
s.Start()

for _, advanceAndAssert := range tt.advanceAndAsserts {
advanceAndAssert(t, j, tt.fakeClock, &runs)
}
}
})
}
}

var _ Elector = (*testElector)(nil)

type testElector struct {
Expand Down Expand Up @@ -1980,7 +2068,7 @@ func TestScheduler_OneTimeJob(t *testing.T) {

s := newTestScheduler(t)

j, err := s.NewJob(
_, err := s.NewJob(
OneTimeJob(tt.startAt()),
NewTask(func() {
jobRan <- struct{}{}
Expand All @@ -1996,12 +2084,6 @@ func TestScheduler_OneTimeJob(t *testing.T) {
t.Fatal("timed out waiting for job to run")
}

var nextRun time.Time
for ; nextRun.IsZero(); nextRun, err = j.NextRun() { //nolint:revive
}
assert.NoError(t, err)
assert.True(t, nextRun.Before(time.Now()))

assert.NoError(t, s.Shutdown())
})
}
Expand Down