Skip to content

Commit

Permalink
fix cases where default on send out is resulting in job not going out (
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnRoesler authored Mar 5, 2024
1 parent 387cbe4 commit 27f2cba
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
46 changes: 22 additions & 24 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,7 @@ func (e *executor) start() {
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
if jIn.shouldSendOut {
select {
case e.jobIDsOut <- jIn.id:
default:
}
}
e.sendOutToScheduler(&jIn)
}
} else {
// since we're not using LimitModeReschedule, but instead using LimitModeWait
Expand All @@ -136,6 +131,7 @@ func (e *executor) start() {
// at which point this call would block.
// TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- jIn
e.sendOutToScheduler(&jIn)
}
} else {
// no limit mode, so we're either running a regular job or
Expand Down Expand Up @@ -171,20 +167,17 @@ func (e *executor) start() {
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn
e.sendOutToScheduler(&jIn)
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
if jIn.shouldSendOut {
select {
case e.jobIDsOut <- jIn.id:
default:
}
}
e.sendOutToScheduler(&jIn)
}
} else {
// wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn
e.sendOutToScheduler(&jIn)
}
} else {
select {
Expand Down Expand Up @@ -213,6 +206,20 @@ func (e *executor) start() {
}
}

func (e *executor) sendOutToScheduler(jIn *jobIn) {
if jIn.shouldSendOut {
select {
case e.jobIDsOut <- jIn.id:
case <-e.ctx.Done():
return
}
}
// we need to set this to false now, because to handle
// non-limit jobs, we send out from the e.runJob function
// and in this case we don't want to send out twice.
jIn.shouldSendOut = false
}

func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for {
Expand Down Expand Up @@ -250,10 +257,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
// was a singleton already running, and we want to
// allow another job to be scheduled
if limitMode == LimitModeReschedule {
select {
case <-rescheduleLimiter:
default:
}
<-rescheduleLimiter
}
continue
}
Expand All @@ -271,10 +275,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith

// remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule {
select {
case <-rescheduleLimiter:
default:
}
<-rescheduleLimiter
}
case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name)
Expand Down Expand Up @@ -306,10 +307,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup

// remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule {
select {
case <-rescheduleLimiter:
default:
}
<-rescheduleLimiter
}
case <-e.ctx.Done():
e.logger.Debug("singletonModeRunner shutting down", "name", name)
Expand Down
4 changes: 3 additions & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,9 @@ type Job interface {
NextRun() (time.Time, error)
// RunNow runs the job once, now. This does not alter
// the existing run schedule, and will respect all job
// and scheduler limits.
// and scheduler limits. This means that running a job now may
// cause the job's regular interval to be rescheduled due to
// the instance being run by RunNow blocking your run limit.
RunNow() error
// Tags returns the job's string tags.
Tags() []string
Expand Down
3 changes: 1 addition & 2 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/google/uuid"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
Expand Down Expand Up @@ -1508,7 +1507,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
WithSingletonMode(LimitModeReschedule),
},
func() time.Duration {
return 10 * time.Second
return 20 * time.Second
},
1,
},
Expand Down

0 comments on commit 27f2cba

Please sign in to comment.