diff --git a/executor.go b/executor.go index 784e58eb..d815ad0a 100644 --- a/executor.go +++ b/executor.go @@ -8,12 +8,12 @@ import ( ) const ( - // default is that if a limit on maximum concurrent jobs is set - // and the limit is reached, a job will skip it's run and try - // again on the next occurrence in the schedule + // RescheduleMode - the default is that if a limit on maximum + // concurrent jobs is set and the limit is reached, a job will + // skip it's run and try again on the next occurrence in the schedule RescheduleMode limitMode = iota - // in wait mode if a limit on maximum concurrent jobs is set + // WaitMode - if a limit on maximum concurrent jobs is set // and the limit is reached, a job will wait to try and run // until a spot in the limit is freed up. // diff --git a/scheduler.go b/scheduler.go index e217610c..a158ec01 100644 --- a/scheduler.go +++ b/scheduler.go @@ -146,6 +146,11 @@ func (s *Scheduler) Location() *time.Location { return s.location } +type nextRun struct { + duration time.Duration + dateTime time.Time +} + // scheduleNextRun Compute the instant when this Job should run next func (s *Scheduler) scheduleNextRun(job *Job) { now := s.now() @@ -163,11 +168,11 @@ func (s *Scheduler) scheduleNextRun(job *Job) { if job.neverRan() { // Increment startAtTime to the future if !job.startAtTime.IsZero() && job.startAtTime.Before(now) { - duration := s.durationToNextRun(job.startAtTime, job) + duration := s.durationToNextRun(job.startAtTime, job).duration job.startAtTime = job.startAtTime.Add(duration) if job.startAtTime.Before(now) { diff := now.Sub(job.startAtTime) - duration := s.durationToNextRun(job.startAtTime, job) + duration := s.durationToNextRun(job.startAtTime, job).duration count := diff / duration if diff%duration != 0 { count++ @@ -183,79 +188,91 @@ func (s *Scheduler) scheduleNextRun(job *Job) { return } - durationToNextRun := s.durationToNextRun(lastRun, job) + next := s.durationToNextRun(lastRun, job) - job.setNextRun(lastRun.Add(durationToNextRun)) - job.setTimer(time.AfterFunc(durationToNextRun, func() { + if next.dateTime.IsZero() { + job.setNextRun(lastRun.Add(next.duration)) + } else { + job.setNextRun(next.dateTime) + } + job.setTimer(time.AfterFunc(next.duration, func() { + if !next.dateTime.IsZero() { + for { + if time.Now().Unix() >= next.dateTime.Unix() { + break + } + } + } s.run(job) s.scheduleNextRun(job) })) } // durationToNextRun calculate how much time to the next run, depending on unit -func (s *Scheduler) durationToNextRun(lastRun time.Time, job *Job) time.Duration { +func (s *Scheduler) durationToNextRun(lastRun time.Time, job *Job) nextRun { // job can be scheduled with .StartAt() if job.getStartAtTime().After(lastRun) { - return job.getStartAtTime().Sub(s.now()) + return nextRun{duration: job.getStartAtTime().Sub(s.now()), dateTime: job.getStartAtTime()} } - var d time.Duration + var next nextRun switch job.getUnit() { case milliseconds, seconds, minutes, hours: - d = s.calculateDuration(job) + next.duration = s.calculateDuration(job) case days: - d = s.calculateDays(job, lastRun) + next = s.calculateDays(job, lastRun) case weeks: if len(job.scheduledWeekday) != 0 { // weekday selected, Every().Monday(), for example - d = s.calculateWeekday(job, lastRun) + next = s.calculateWeekday(job, lastRun) } else { - d = s.calculateWeeks(job, lastRun) + next = s.calculateWeeks(job, lastRun) } case months: - d = s.calculateMonths(job, lastRun) + next = s.calculateMonths(job, lastRun) case duration: - d = job.getDuration() + next.duration = job.getDuration() case crontab: - d = job.cronSchedule.Next(lastRun).Sub(lastRun) + next.dateTime = job.cronSchedule.Next(lastRun) + next.duration = next.dateTime.Sub(lastRun) } - return d + return next } -func (s *Scheduler) calculateMonths(job *Job, lastRun time.Time) time.Duration { +func (s *Scheduler) calculateMonths(job *Job, lastRun time.Time) nextRun { lastRunRoundedMidnight := s.roundToMidnight(lastRun) if job.dayOfTheMonth > 0 { // calculate days to job.dayOfTheMonth jobDay := time.Date(lastRun.Year(), lastRun.Month(), job.dayOfTheMonth, 0, 0, 0, 0, s.Location()).Add(job.getAtTime()) difference := absDuration(lastRun.Sub(jobDay)) - nextRun := lastRun + next := lastRun if jobDay.Before(lastRun) { // shouldn't run this month; schedule for next interval minus day difference - nextRun = nextRun.AddDate(0, job.interval, -0) - nextRun = nextRun.Add(-difference) + next = next.AddDate(0, job.interval, -0) + next = next.Add(-difference) } else { if job.interval == 1 { // every month counts current month - nextRun = nextRun.AddDate(0, job.interval-1, 0) + next = next.AddDate(0, job.interval-1, 0) } else { // should run next month interval - nextRun = nextRun.AddDate(0, job.interval, 0) + next = next.AddDate(0, job.interval, 0) } - nextRun = nextRun.Add(difference) + next = next.Add(difference) } - return until(lastRun, nextRun) + return nextRun{duration: until(lastRun, next), dateTime: next} } - nextRun := lastRunRoundedMidnight.Add(job.getAtTime()).AddDate(0, job.interval, 0) - return until(lastRunRoundedMidnight, nextRun) + next := lastRunRoundedMidnight.Add(job.getAtTime()).AddDate(0, job.interval, 0) + return nextRun{duration: until(lastRunRoundedMidnight, next), dateTime: next} } -func (s *Scheduler) calculateWeekday(job *Job, lastRun time.Time) time.Duration { +func (s *Scheduler) calculateWeekday(job *Job, lastRun time.Time) nextRun { daysToWeekday := remainingDaysToWeekday(lastRun.Weekday(), job.Weekdays()) totalDaysDifference := s.calculateTotalDaysDifference(lastRun, daysToWeekday, job) - nextRun := s.roundToMidnight(lastRun).Add(job.getAtTime()).AddDate(0, 0, totalDaysDifference) - return until(lastRun, nextRun) + next := s.roundToMidnight(lastRun).Add(job.getAtTime()).AddDate(0, 0, totalDaysDifference) + return nextRun{duration: until(lastRun, next), dateTime: next} } -func (s *Scheduler) calculateWeeks(job *Job, lastRun time.Time) time.Duration { +func (s *Scheduler) calculateWeeks(job *Job, lastRun time.Time) nextRun { totalDaysDifference := int(job.interval) * 7 - nextRun := s.roundToMidnight(lastRun).Add(job.getAtTime()).AddDate(0, 0, totalDaysDifference) - return until(lastRun, nextRun) + next := s.roundToMidnight(lastRun).Add(job.getAtTime()).AddDate(0, 0, totalDaysDifference) + return nextRun{duration: until(lastRun, next), dateTime: next} } func (s *Scheduler) calculateTotalDaysDifference(lastRun time.Time, daysToWeekday int, job *Job) int { @@ -278,7 +295,7 @@ func (s *Scheduler) calculateTotalDaysDifference(lastRun time.Time, daysToWeekda return daysToWeekday } -func (s *Scheduler) calculateDays(job *Job, lastRun time.Time) time.Duration { +func (s *Scheduler) calculateDays(job *Job, lastRun time.Time) nextRun { if job.interval == 1 { lastRunDayPlusJobAtTime := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), 0, 0, 0, 0, s.Location()).Add(job.getAtTime()) @@ -290,12 +307,12 @@ func (s *Scheduler) calculateDays(job *Job, lastRun time.Time) time.Duration { } if shouldRunToday(lastRun, lastRunDayPlusJobAtTime) { - return until(lastRun, s.roundToMidnight(lastRun).Add(job.getAtTime())) + return nextRun{duration: until(lastRun, s.roundToMidnight(lastRun).Add(job.getAtTime())), dateTime: s.roundToMidnight(lastRun).Add(job.getAtTime())} } } nextRunAtTime := s.roundToMidnight(lastRun).Add(job.getAtTime()).AddDate(0, 0, job.interval).In(s.Location()) - return until(lastRun, nextRunAtTime) + return nextRun{duration: until(lastRun, nextRunAtTime), dateTime: nextRunAtTime} } func until(from time.Time, until time.Time) time.Duration { diff --git a/scheduler_test.go b/scheduler_test.go index 31c56f49..46b0d948 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -749,7 +749,7 @@ func TestScheduler_CalculateNextRun(t *testing.T) { t.Run(tc.name, func(t *testing.T) { s := NewScheduler(time.UTC) s.time = ft - got := s.durationToNextRun(tc.job.LastRun(), tc.job) + got := s.durationToNextRun(tc.job.LastRun(), tc.job).duration assert.Equalf(t, tc.wantTimeUntilNextRun, got, fmt.Sprintf("expected %s / got %s", tc.wantTimeUntilNextRun.String(), got.String())) }) } @@ -959,7 +959,7 @@ func TestCalculateMonths2(t *testing.T) { t.Run(tc.description, func(t *testing.T) { s := NewScheduler(time.UTC) s.time = maySixth2021At0500 - got := s.durationToNextRun(tc.job.LastRun(), tc.job) + got := s.durationToNextRun(tc.job.LastRun(), tc.job).duration assert.Equalf(t, tc.wantTimeUntilNextRun, got, fmt.Sprintf("expected %s / got %s", tc.wantTimeUntilNextRun.String(), got.String())) }) } @@ -1493,10 +1493,10 @@ func TestScheduler_WaitForSchedules(t *testing.T) { var counterMutex sync.RWMutex counter := 0 - _, err := s.Every("1s").Do(func() { counterMutex.Lock(); defer counterMutex.Unlock(); counter++ }) + _, err := s.Every("1s").Do(func() { counterMutex.Lock(); defer counterMutex.Unlock(); counter++; log.Println("job 1") }) require.NoError(t, err) - _, err = s.CronWithSeconds("*/1 * * * * *").Do(func() { counterMutex.Lock(); defer counterMutex.Unlock(); counter++ }) + _, err = s.CronWithSeconds("*/1 * * * * *").Do(func() { counterMutex.Lock(); defer counterMutex.Unlock(); counter++; log.Println("job 2") }) require.NoError(t, err) s.StartAsync() @@ -1565,7 +1565,7 @@ func TestScheduler_CallNextWeekDay(t *testing.T) { require.NoError(t, err) job.lastRun = lastRun - got := s.durationToNextRun(lastRun, job) + got := s.durationToNextRun(lastRun, job).duration assert.Equal(t, wantTimeUntilNextRun, got) }) @@ -1595,11 +1595,11 @@ func TestScheduler_CheckNextWeekDay(t *testing.T) { require.NoError(t, err) job.lastRun = lastRun - gotFirst := s.durationToNextRun(lastRun, job) + gotFirst := s.durationToNextRun(lastRun, job).duration assert.Equal(t, wantTimeUntilNextFirstRun, gotFirst) job.lastRun = secondLastRun - gotSecond := s.durationToNextRun(secondLastRun, job) + gotSecond := s.durationToNextRun(secondLastRun, job).duration assert.Equal(t, wantTimeUntilNextSecondRun, gotSecond) }) @@ -1646,7 +1646,7 @@ func TestScheduler_CheckEveryWeekHigherThanOne(t *testing.T) { lastRun := januaryDay2020At(day) job.lastRun = lastRun - got := s.durationToNextRun(lastRun, job) + got := s.durationToNextRun(lastRun, job).duration if numJob < len(tc.weekDays) { assert.Equal(t, wantTimeUntilNextRunOneDay, got)