Skip to content

Commit

Permalink
fix case where OneTimeJob with concurrent limit and limited runs fail…
Browse files Browse the repository at this point in the history
…s to run (#703)
  • Loading branch information
JohnRoesler authored Mar 26, 2024
1 parent 9ae7545 commit dcd4eda
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 17 deletions.
35 changes: 18 additions & 17 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,23 +300,6 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
}
j.lastScheduledRun = j.nextScheduled

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
// job if it has reached the limit.
if j.limitRunsTo != nil {
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
go func() {
select {
case <-s.shutdownCtx.Done():
return
case s.removeJobCh <- id:
}
}()
return
}
}

next := j.next(j.lastScheduledRun)
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
Expand Down Expand Up @@ -356,6 +339,24 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
if !ok {
return
}

// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
// job if it has reached the limit.
if j.limitRunsTo != nil {
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
go func() {
select {
case <-s.shutdownCtx.Done():
return
case s.removeJobCh <- id:
}
}()
return
}
}

j.lastRun = s.now()
s.jobs[id] = j
}
Expand Down
75 changes: 75 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,81 @@ func TestScheduler_OneTimeJob(t *testing.T) {
}
}

func TestScheduler_WithLimitedRuns(t *testing.T) {
goleak.VerifyNone(t)

tests := []struct {
name string
schedulerOpts []SchedulerOption
job JobDefinition
jobOpts []JobOption
runLimit uint
expectedRuns int
}{
{
"simple",
nil,
DurationJob(time.Millisecond * 100),
nil,
1,
1,
},
{
"OneTimeJob, WithLimitConcurrentJobs",
[]SchedulerOption{
WithLimitConcurrentJobs(1, LimitModeWait),
},
OneTimeJob(OneTimeJobStartImmediately()),
nil,
1,
1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, tt.schedulerOpts...)

jobRan := make(chan struct{}, 10)

jobOpts := []JobOption{
WithLimitedRuns(tt.runLimit),
}
jobOpts = append(jobOpts, tt.jobOpts...)

_, err := s.NewJob(
tt.job,
NewTask(func() {
jobRan <- struct{}{}
}),
jobOpts...,
)
require.NoError(t, err)

s.Start()
time.Sleep(time.Millisecond * 150)

assert.NoError(t, s.Shutdown())

var runCount int
for runCount < tt.expectedRuns {
select {
case <-jobRan:
runCount++
case <-time.After(time.Second):
t.Fatal("timed out waiting for job to run")
}
}
select {
case <-jobRan:
t.Fatal("job ran more than expected")
default:
}
assert.Equal(t, tt.expectedRuns, runCount)
})
}
}

func TestScheduler_Jobs(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit dcd4eda

Please sign in to comment.