diff --git a/README.md b/README.md index 7fc19575..953a1df9 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,7 @@ other instances checking to see if a new leader needs to be elected. (don't see what you need? request on slack to get a repo created to contribute it!) - [**Locker**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedLocker): A locker can be used to lock each run of a job to a single instance of gocron. +Locker can be at job or scheduler, if it is defined both at job and scheduler then locker of job will take precedence. - Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=) (don't see what you need? request on slack to get a repo created to contribute it!) diff --git a/errors.go b/errors.go index e2801244..16438435 100644 --- a/errors.go +++ b/errors.go @@ -36,6 +36,7 @@ var ( ErrWithClockNil = fmt.Errorf("gocron: WithClock: clock must not be nil") ErrWithDistributedElectorNil = fmt.Errorf("gocron: WithDistributedElector: elector must not be nil") ErrWithDistributedLockerNil = fmt.Errorf("gocron: WithDistributedLocker: locker must not be nil") + ErrWithDistributedJobLockerNil = fmt.Errorf("gocron: WithDistributedJobLocker: locker must not be nil") ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0") ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil") ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil") diff --git a/executor.go b/executor.go index 1b51e57b..cf6cc016 100644 --- a/executor.go +++ b/executor.go @@ -335,6 +335,13 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { e.sendOutForRescheduling(&jIn) return } + } else if j.locker != nil { + lock, err := j.locker.Lock(j.ctx, j.name) + if err != nil { + e.sendOutForRescheduling(&jIn) + return + } + defer func() { _ = lock.Unlock(j.ctx) }() } else if e.locker != nil { lock, err := e.locker.Lock(j.ctx, j.name) if err != nil { diff --git a/job.go b/job.go index 1be7c807..e9394998 100644 --- a/job.go +++ b/job.go @@ -42,6 +42,8 @@ type internalJob struct { afterJobRuns func(jobID uuid.UUID, jobName string) beforeJobRuns func(jobID uuid.UUID, jobName string) afterJobRunsWithError func(jobID uuid.UUID, jobName string, err error) + + locker Locker } // stop is used to stop the job's timer and cancel the context @@ -485,6 +487,19 @@ func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition { // JobOption defines the constructor for job options. type JobOption func(*internalJob) error +// WithDistributedJobLocker sets the locker to be used by multiple +// Scheduler instances to ensure that only one instance of each +// job is run. +func WithDistributedJobLocker(locker Locker) JobOption { + return func(j *internalJob) error { + if locker == nil { + return ErrWithDistributedJobLockerNil + } + j.locker = locker + return nil + } +} + // WithEventListeners sets the event listeners that should be // run for the job. func WithEventListeners(eventListeners ...EventListener) JobOption { diff --git a/scheduler_test.go b/scheduler_test.go index 1f521f53..f26a7c64 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -728,6 +728,14 @@ func TestScheduler_NewJobErrors(t *testing.T) { nil, ErrOneTimeJobStartDateTimePast, }, + { + "WithDistributedJobLocker is nil", + DurationJob( + time.Second, + ), + []JobOption{WithDistributedJobLocker(nil)}, + ErrWithDistributedJobLockerNil, + }, } for _, tt := range tests { @@ -1199,17 +1207,19 @@ func TestScheduler_WithDistributed(t *testing.T) { goleak.VerifyNone(t) tests := []struct { - name string - count int - opt SchedulerOption - assertions func(*testing.T) + name string + count int + schedulerOpts []SchedulerOption + jobOpts []JobOption + assertions func(*testing.T) }{ { "3 schedulers with elector", 3, - WithDistributedElector(&testElector{ - notLeader: notLeader, - }), + []SchedulerOption{ + WithDistributedElector(&testElector{notLeader: notLeader}), + }, + nil, func(t *testing.T) { timeout := time.Now().Add(1 * time.Second) var notLeaderCount int @@ -1229,9 +1239,32 @@ func TestScheduler_WithDistributed(t *testing.T) { { "3 schedulers with locker", 3, - WithDistributedLocker(&testLocker{ - notLocked: notLocked, - }), + []SchedulerOption{ + WithDistributedLocker(&testLocker{notLocked: notLocked}), + }, + nil, + func(t *testing.T) { + timeout := time.Now().Add(1 * time.Second) + var notLockedCount int + for { + if time.Now().After(timeout) { + break + } + select { + case <-notLocked: + notLockedCount++ + default: + } + } + }, + }, + { + "3 schedulers and job with Distributed locker", + 3, + nil, + []JobOption{ + WithDistributedJobLocker(&testLocker{notLocked: notLocked}), + }, func(t *testing.T) { timeout := time.Now().Add(1 * time.Second) var notLockedCount int @@ -1257,12 +1290,17 @@ func TestScheduler_WithDistributed(t *testing.T) { for i := tt.count; i > 0; i-- { s := newTestScheduler(t, - tt.opt, + tt.schedulerOpts..., ) + jobOpts := []JobOption{ + WithStartAt( + WithStartImmediately(), + ), + } + jobOpts = append(jobOpts, tt.jobOpts...) go func() { s.Start() - _, err := s.NewJob( DurationJob( time.Second, @@ -1273,9 +1311,7 @@ func TestScheduler_WithDistributed(t *testing.T) { jobsRan <- struct{}{} }, ), - WithStartAt( - WithStartImmediately(), - ), + jobOpts..., ) require.NoError(t, err)