Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Context() method and update job retrieval, closes #762 #787

Open
wants to merge 2 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type executor struct {
jobsOutCompleted chan uuid.UUID
// used to request jobs from the scheduler
jobOutRequest chan jobOutRequest
// used to request jobs from the scheduler
jobOutUpdateLockRequest chan jobOutUpdateLockRequest

// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
Expand Down Expand Up @@ -376,7 +378,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Skip)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
id: j.id,
lock: lock,
}

defer func() {
_ = lock.Unlock(j.ctx)
}()
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
Expand All @@ -385,7 +394,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
e.incrementJobCounter(j, Skip)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
id: j.id,
lock: lock,
}

defer func() {
_ = lock.Unlock(j.ctx)
}()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)

Expand Down
21 changes: 18 additions & 3 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type internalJob struct {
nextScheduled []time.Time

lastRun time.Time
lastLock Lock
function any
parameters []any
timer clockwork.Timer
Expand Down Expand Up @@ -993,6 +994,8 @@ type Job interface {
RunNow() error
// Tags returns the job's string tags.
Tags() []string
Lock() Lock
Context() context.Context
}

var _ Job = (*job)(nil)
Expand All @@ -1014,7 +1017,7 @@ func (j job) ID() uuid.UUID {
}

func (j job) LastRun() (time.Time, error) {
ij := requestJob(j.id, j.jobOutRequest)
ij := requestJob(j.id, j.jobOutRequest, true)
if ij == nil || ij.id == uuid.Nil {
return time.Time{}, ErrJobNotFound
}
Expand All @@ -1026,7 +1029,7 @@ func (j job) Name() string {
}

func (j job) NextRun() (time.Time, error) {
ij := requestJob(j.id, j.jobOutRequest)
ij := requestJob(j.id, j.jobOutRequest, true)
if ij == nil || ij.id == uuid.Nil {
return time.Time{}, ErrJobNotFound
}
Expand All @@ -1039,7 +1042,7 @@ func (j job) NextRun() (time.Time, error) {
}

func (j job) NextRuns(count int) ([]time.Time, error) {
ij := requestJob(j.id, j.jobOutRequest)
ij := requestJob(j.id, j.jobOutRequest, true)
if ij == nil || ij.id == uuid.Nil {
return nil, ErrJobNotFound
}
Expand Down Expand Up @@ -1091,3 +1094,15 @@ func (j job) RunNow() error {
}
return err
}

func (j job) Lock() Lock {
ij := requestJob(j.id, j.jobOutRequest, true)

return ij.lastLock
}

func (j job) Context() context.Context {
ij := requestJob(j.id, j.jobOutRequest, false)

return ij.ctx
}
26 changes: 21 additions & 5 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ type jobOutRequest struct {
outChan chan internalJob
}

type jobOutUpdateLockRequest struct {
id uuid.UUID
lock Lock
}

type runJobRequest struct {
id uuid.UUID
outChan chan error
Expand All @@ -131,11 +136,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
logger: &noOpLogger{},
clock: clockwork.NewRealClock(),

jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
jobsIn: make(chan jobIn),
jobsOutForRescheduling: make(chan uuid.UUID),
jobsOutCompleted: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000),
jobOutUpdateLockRequest: make(chan jobOutUpdateLockRequest),
done: make(chan error),
}

s := &scheduler{
Expand Down Expand Up @@ -190,6 +196,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
case out := <-s.jobOutRequestCh:
s.selectJobOutRequest(out)

case out := <-s.exec.jobOutUpdateLockRequest:
s.jobOutUpdateLockRequest(out)

case out := <-s.allJobsOutRequest:
s.selectAllJobsOutRequest(out)

Expand Down Expand Up @@ -425,6 +434,13 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
close(out.outChan)
}

func (s *scheduler) jobOutUpdateLockRequest(out jobOutUpdateLockRequest) {
if j, ok := s.jobs[out.id]; ok {
j.lastLock = out.lock
s.jobs[out.id] = j
}
}

func (s *scheduler) selectNewJob(in newJobIn) {
j := in.job
if s.started {
Expand Down
41 changes: 41 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2536,3 +2536,44 @@
})
}
}

func TestJob_Lock(t *testing.T) {
locker := &testLocker{
notLocked: make(chan struct{}, 1),
}

s := newTestScheduler(t,
WithDistributedLocker(locker),
)

jobRan := make(chan struct{})
j, err := s.NewJob(
DurationJob(time.Millisecond*100),
NewTask(func() {
time.Sleep(50 * time.Millisecond)
jobRan <- struct{}{}
}),
)
require.NoError(t, err)

s.Start()
defer s.Shutdown()

select {
case <-jobRan:
// Job has run
case <-time.After(200 * time.Millisecond):
t.Fatal("Job did not run in time")
}

require.Eventually(t, func() bool {
if locker.jobLocked {
return true
}

return false
}, 200*time.Millisecond, 100*time.Millisecond, "Job should be locked")

lock := j.Lock()
assert.NotNil(t, lock, "Job Lock() should return a non-nil Locker")
}
12 changes: 9 additions & 3 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ func callJobFuncWithParams(jobFunc any, params ...any) error {
return nil
}

func requestJob(id uuid.UUID, ch chan jobOutRequest) *internalJob {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
func requestJob(id uuid.UUID, ch chan jobOutRequest, timeout bool) *internalJob {
var cancel context.CancelFunc
ctx := context.Background()

if timeout {
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
}

return requestJobCtx(ctx, id, ch)
}

Expand Down
Loading