From 97e1415a45daf8fa589f8375a1a1761403f6e45c Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Mon, 22 Jul 2024 05:36:17 -0400 Subject: [PATCH 1/2] feat: add lock update mechanism for jobs, closes #762 - Add `jobOutUpdateLockRequest` channel in the executor - Implement lock update requests in the job execution process - Add `Lock()` method to the `Job` interface - Update the scheduler to handle lock update requests - Add a test case to verify the new locking mechanism --- executor.go | 20 ++++++++++++++++++-- job.go | 8 ++++++++ scheduler.go | 26 +++++++++++++++++++++----- scheduler_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/executor.go b/executor.go index 1b13285d..a8e2b046 100644 --- a/executor.go +++ b/executor.go @@ -30,6 +30,8 @@ type executor struct { jobsOutCompleted chan uuid.UUID // used to request jobs from the scheduler jobOutRequest chan jobOutRequest + // used to request jobs from the scheduler + jobOutUpdateLockRequest chan jobOutUpdateLockRequest // used by the executor to receive a stop signal from the scheduler stopCh chan struct{} @@ -376,7 +378,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { e.incrementJobCounter(j, Skip) return } - defer func() { _ = lock.Unlock(j.ctx) }() + e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{ + id: j.id, + lock: lock, + } + + defer func() { + _ = lock.Unlock(j.ctx) + }() } else if e.locker != nil { lock, err := e.locker.Lock(j.ctx, j.name) if err != nil { @@ -385,7 +394,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { e.incrementJobCounter(j, Skip) return } - defer func() { _ = lock.Unlock(j.ctx) }() + e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{ + id: j.id, + lock: lock, + } + + defer func() { + _ = lock.Unlock(j.ctx) + }() } _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) diff --git a/job.go b/job.go index 6f09f3d8..d43b7d1b 100644 --- a/job.go +++ b/job.go @@ -30,6 +30,7 @@ type internalJob struct { nextScheduled []time.Time lastRun time.Time + lastLock Lock function any parameters []any timer clockwork.Timer @@ -993,6 +994,7 @@ type Job interface { RunNow() error // Tags returns the job's string tags. Tags() []string + Lock() Lock } var _ Job = (*job)(nil) @@ -1091,3 +1093,9 @@ func (j job) RunNow() error { } return err } + +func (j job) Lock() Lock { + ij := requestJob(j.id, j.jobOutRequest) + + return ij.lastLock +} diff --git a/scheduler.go b/scheduler.go index 4131747d..0b78c1e4 100644 --- a/scheduler.go +++ b/scheduler.go @@ -107,6 +107,11 @@ type jobOutRequest struct { outChan chan internalJob } +type jobOutUpdateLockRequest struct { + id uuid.UUID + lock Lock +} + type runJobRequest struct { id uuid.UUID outChan chan error @@ -131,11 +136,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { logger: &noOpLogger{}, clock: clockwork.NewRealClock(), - jobsIn: make(chan jobIn), - jobsOutForRescheduling: make(chan uuid.UUID), - jobsOutCompleted: make(chan uuid.UUID), - jobOutRequest: make(chan jobOutRequest, 1000), - done: make(chan error), + jobsIn: make(chan jobIn), + jobsOutForRescheduling: make(chan uuid.UUID), + jobsOutCompleted: make(chan uuid.UUID), + jobOutRequest: make(chan jobOutRequest, 1000), + jobOutUpdateLockRequest: make(chan jobOutUpdateLockRequest), + done: make(chan error), } s := &scheduler{ @@ -190,6 +196,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { case out := <-s.jobOutRequestCh: s.selectJobOutRequest(out) + case out := <-s.exec.jobOutUpdateLockRequest: + s.jobOutUpdateLockRequest(out) + case out := <-s.allJobsOutRequest: s.selectAllJobsOutRequest(out) @@ -425,6 +434,13 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) { close(out.outChan) } +func (s *scheduler) jobOutUpdateLockRequest(out jobOutUpdateLockRequest) { + if j, ok := s.jobs[out.id]; ok { + j.lastLock = out.lock + s.jobs[out.id] = j + } +} + func (s *scheduler) selectNewJob(in newJobIn) { j := in.job if s.started { diff --git a/scheduler_test.go b/scheduler_test.go index 212eb70d..4266a7f9 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2536,3 +2536,44 @@ func TestScheduler_WithMonitor(t *testing.T) { }) } } + +func TestJob_Lock(t *testing.T) { + locker := &testLocker{ + notLocked: make(chan struct{}, 1), + } + + s := newTestScheduler(t, + WithDistributedLocker(locker), + ) + + jobRan := make(chan struct{}) + j, err := s.NewJob( + DurationJob(time.Millisecond*100), + NewTask(func() { + time.Sleep(50 * time.Millisecond) + jobRan <- struct{}{} + }), + ) + require.NoError(t, err) + + s.Start() + defer s.Shutdown() + + select { + case <-jobRan: + // Job has run + case <-time.After(200 * time.Millisecond): + t.Fatal("Job did not run in time") + } + + require.Eventually(t, func() bool { + if locker.jobLocked { + return true + } + + return false + }, 200*time.Millisecond, 100*time.Millisecond, "Job should be locked") + + lock := j.Lock() + assert.NotNil(t, lock, "Job Lock() should return a non-nil Locker") +} From febe994b1e531a7d5c25eda722e0dccff216e1a7 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Mon, 12 Aug 2024 18:51:46 -0400 Subject: [PATCH 2/2] feat: add Context() method and update job retrieval, closes #762 - Add Context() method to Job interface and implement it in job struct - Update requestJob function to include a timeout flag - Modify existing methods to use the new requestJob signature --- job.go | 15 +++++++++++---- util.go | 12 +++++++++--- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/job.go b/job.go index d43b7d1b..1bed1997 100644 --- a/job.go +++ b/job.go @@ -995,6 +995,7 @@ type Job interface { // Tags returns the job's string tags. Tags() []string Lock() Lock + Context() context.Context } var _ Job = (*job)(nil) @@ -1016,7 +1017,7 @@ func (j job) ID() uuid.UUID { } func (j job) LastRun() (time.Time, error) { - ij := requestJob(j.id, j.jobOutRequest) + ij := requestJob(j.id, j.jobOutRequest, true) if ij == nil || ij.id == uuid.Nil { return time.Time{}, ErrJobNotFound } @@ -1028,7 +1029,7 @@ func (j job) Name() string { } func (j job) NextRun() (time.Time, error) { - ij := requestJob(j.id, j.jobOutRequest) + ij := requestJob(j.id, j.jobOutRequest, true) if ij == nil || ij.id == uuid.Nil { return time.Time{}, ErrJobNotFound } @@ -1041,7 +1042,7 @@ func (j job) NextRun() (time.Time, error) { } func (j job) NextRuns(count int) ([]time.Time, error) { - ij := requestJob(j.id, j.jobOutRequest) + ij := requestJob(j.id, j.jobOutRequest, true) if ij == nil || ij.id == uuid.Nil { return nil, ErrJobNotFound } @@ -1095,7 +1096,13 @@ func (j job) RunNow() error { } func (j job) Lock() Lock { - ij := requestJob(j.id, j.jobOutRequest) + ij := requestJob(j.id, j.jobOutRequest, true) return ij.lastLock } + +func (j job) Context() context.Context { + ij := requestJob(j.id, j.jobOutRequest, false) + + return ij.ctx +} diff --git a/util.go b/util.go index a4e5b6fd..65511e75 100644 --- a/util.go +++ b/util.go @@ -36,9 +36,15 @@ func callJobFuncWithParams(jobFunc any, params ...any) error { return nil } -func requestJob(id uuid.UUID, ch chan jobOutRequest) *internalJob { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() +func requestJob(id uuid.UUID, ch chan jobOutRequest, timeout bool) *internalJob { + var cancel context.CancelFunc + ctx := context.Background() + + if timeout { + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + defer cancel() + } + return requestJobCtx(ctx, id, ch) }