Skip to content

Commit 2769f09

Browse files
manuelartemanuelarte
and
manuelarte
authored
removes nextRuns in the past when job skipped by locker (#829)
Co-authored-by: manuelarte <manuel.doncel.martos@gmail.com>
1 parent 08b53d7 commit 2769f09

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

executor.go

+13
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type executor struct {
3131
// used to request jobs from the scheduler
3232
jobOutRequest chan jobOutRequest
3333

34+
// sends out job needs to update the next runs
35+
jobUpdateNextRuns chan uuid.UUID
36+
3437
// used by the executor to receive a stop signal from the scheduler
3538
stopCh chan struct{}
3639
// the timeout value when stopping
@@ -247,6 +250,14 @@ func (e *executor) sendOutForRescheduling(jIn *jobIn) {
247250
jIn.shouldSendOut = false
248251
}
249252

253+
func (e *executor) sendOutForNextRunUpdate(jIn *jobIn) {
254+
select {
255+
case e.jobUpdateNextRuns <- jIn.id:
256+
case <-e.ctx.Done():
257+
return
258+
}
259+
}
260+
250261
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
251262
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
252263
for {
@@ -376,6 +387,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
376387
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
377388
e.sendOutForRescheduling(&jIn)
378389
e.incrementJobCounter(j, Skip)
390+
e.sendOutForNextRunUpdate(&jIn)
379391
return
380392
}
381393
defer func() { _ = lock.Unlock(j.ctx) }()
@@ -385,6 +397,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
385397
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
386398
e.sendOutForRescheduling(&jIn)
387399
e.incrementJobCounter(j, Skip)
400+
e.sendOutForNextRunUpdate(&jIn)
388401
return
389402
}
390403
defer func() { _ = lock.Unlock(j.ctx) }()

scheduler.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
138138

139139
jobsIn: make(chan jobIn),
140140
jobsOutForRescheduling: make(chan uuid.UUID),
141+
jobUpdateNextRuns: make(chan uuid.UUID),
141142
jobsOutCompleted: make(chan uuid.UUID),
142143
jobOutRequest: make(chan jobOutRequest, 1000),
143144
done: make(chan error),
@@ -176,7 +177,8 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
176177
select {
177178
case id := <-s.exec.jobsOutForRescheduling:
178179
s.selectExecJobsOutForRescheduling(id)
179-
180+
case id := <-s.exec.jobUpdateNextRuns:
181+
s.updateNextScheduled(id)
180182
case id := <-s.exec.jobsOutCompleted:
181183
s.selectExecJobsOutCompleted(id)
182184

@@ -405,6 +407,22 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
405407
s.jobs[id] = j
406408
}
407409

410+
func (s *scheduler) updateNextScheduled(id uuid.UUID) {
411+
j, ok := s.jobs[id]
412+
if !ok {
413+
return
414+
}
415+
var newNextScheduled []time.Time
416+
for _, t := range j.nextScheduled {
417+
if t.Before(s.now()) {
418+
continue
419+
}
420+
newNextScheduled = append(newNextScheduled, t)
421+
}
422+
j.nextScheduled = newNextScheduled
423+
s.jobs[id] = j
424+
}
425+
408426
func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
409427
j, ok := s.jobs[id]
410428
if !ok {

0 commit comments

Comments
 (0)