From 8336eea622a2199aa54829830b9170a55c316202 Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Mon, 19 Jun 2017 15:56:20 -0400 Subject: [PATCH 1/9] typo --- redis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis.go b/redis.go index 9da02018..0b8e6428 100644 --- a/redis.go +++ b/redis.go @@ -320,7 +320,7 @@ return requeuedCount ` // KEYS[1] = job queue to push onto -// KEYS[2] = Unique job's key. Test for existance and set if we push. +// KEYS[2] = Unique job's key. Test for existence and set if we push. // ARGV[1] = job var redisLuaEnqueueUnique = ` if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then From e7bdc9a05202f422645e0be51c85d11b29bc7a20 Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Tue, 20 Jun 2017 14:03:04 -0400 Subject: [PATCH 2/9] partially reworked locking to include lock_info --- dead_pool_reaper.go | 4 +-- redis.go | 70 +++++++++++++++++++++++++++++++++++---------- worker.go | 17 +++++++++-- worker_pool_test.go | 40 +++++++++++++++++--------- worker_test.go | 44 ++++++++++++++++++++++++---- 5 files changed, 137 insertions(+), 38 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index 2d11fa03..a7c14463 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -106,13 +106,13 @@ func (r *deadPoolReaper) reap() error { } func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error { - numArgs := len(jobTypes) * 2 + numArgs := len(jobTypes) * 3 redisRequeueScript := redis.NewScript(numArgs, redisLuaReenqueueJob) var scriptArgs = make([]interface{}, 0, numArgs) for _, jobType := range jobTypes { // pops from in progress, push into job queue and decrement the queue lock - scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType)) + scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType), poolID) } conn := r.pool.Get() diff --git a/redis.go b/redis.go index 0b8e6428..12021b85 100644 --- a/redis.go +++ b/redis.go @@ -66,6 +66,11 @@ func redisKeyJobsLock(namespace, jobName string) string { return redisKeyJobs(namespace, jobName) + ":" + lockKeySuffix } +var lockInfoKeySuffix = "lock_info" +func redisKeyJobsLockInfo(namespace, jobName string) string { + return redisKeyJobs(namespace, jobName) + ":" + lockInfoKeySuffix +} + var concurrencyKeySuffix = "max_concurrency" func redisKeyJobsConcurrency(namespace, jobName string) string { return redisKeyJobs(namespace, jobName) + ":" + concurrencyKeySuffix @@ -93,39 +98,66 @@ func redisKeyLastPeriodicEnqueue(namespace string) string { return redisNamespacePrefix(namespace) + "last_periodic_enqueue" } -// Used by Lua scripts below and needs to follow same naming convention as redisKeyJobs* functions above -// note: all assume the local var jobQueue is present, which is the the val of redisKeyJobs() +// Helpers functions used by Lua scripts below that need to match naming convention as redisKeyJobs* functions above +// note: all assume the local var jobQueue is in scope, which is the the val of redisKeyJobs() +// note: acquire/release lock functions assume getLockKey and getLockKeyInfo are in scope var redisLuaJobsPausedKey = fmt.Sprintf(` local function getPauseKey(jobQueue) return string.format("%%s:%s", jobQueue) end`, pauseKeySuffix) -var redisLuaJobsLockedKey = fmt.Sprintf(` +var redisLuaJobsLockKey = fmt.Sprintf(` local function getLockKey(jobQueue) return string.format("%%s:%s", jobQueue) end`, lockKeySuffix) +var redisLuaJobsLockInfoKey = fmt.Sprintf(` +local function getLockInfoKey(jobQueue) + return string.format("%%s:%s", jobQueue) +end`, lockInfoKeySuffix) + var redisLuaJobsConcurrencyKey = fmt.Sprintf(` local function getConcurrencyKey(jobQueue) return string.format("%%s:%s", jobQueue) end`, concurrencyKeySuffix) +var redisLuaAcquireLock = fmt.Sprintf(` +local function acquireLock(jobQueue, lockID) + redis.call('incr', getLockKey(jobQueue)) + redis.call('hincrby', getLockInfoKey(jobQueue), lockID, 1) +end +`) + +var redisLuaReleaseLock = fmt.Sprintf(` +local function releaseLock(jobQueue, workerPoolId) + redis.call('decr', getLockKey(jobQueue)) + redis.call('hincrby', getLockInfoKey(jobQueue), workerPoolId, -1) +end +`) + // Used to fetch the next job to run // // KEYS[1] = the 1st job queue we want to try, eg, "work:jobs:emails" // KEYS[2] = the 1st job queue's in prog queue, eg, "work:jobs:emails:97c84119d13cb54119a38743:inprogress" -// KEYS[3] = the 2nd job queue... -// KEYS[4] = the 2nd job queue's in prog queue... +// KEYS[3] = the 1st job queue's lockId, currently worker pool Id eg, "97c84119d13cb54119a38743" +// KEYS[4] = the 2nd job queue... +// KEYS[5] = the 2nd job queue's in prog queue... +// KEYS[6] = the 2nd job queue's lockId... // ... // KEYS[N] = the last job queue... // KEYS[N+1] = the last job queue's in prog queue... +// KEYS[N+2] = the last job queue's lockId... var redisLuaFetchJob = fmt.Sprintf(` -- getPauseKey will be inserted below %s -- getLockKey will be inserted below %s +-- getLockInfoKey will be inserted below +%s -- getConcurrencyKey will be inserted below %s +-- acquireLock will be inserted below +%s local function haveJobs(jobQueue) return redis.call('llen', jobQueue) > 0 @@ -135,13 +167,12 @@ local function isPaused(pauseKey) return redis.call('get', pauseKey) end -local function canRun(lockKey, maxConcurrency) +local function canRun(lockKey, lockInfoKey, maxConcurrency) local activeJobs = tonumber(redis.call('get', lockKey)) if (not maxConcurrency or maxConcurrency == 0) or (not activeJobs or activeJobs < maxConcurrency) then -- default case: maxConcurrency not defined or set to 0 means no cap on concurrent jobs OR -- maxConcurrency set, but lock does not yet exist OR -- maxConcurrency set, lock is set, but not yet at max concurrency - redis.call('incr', lockKey) return true else -- we are at max capacity for running jobs @@ -149,22 +180,26 @@ local function canRun(lockKey, maxConcurrency) end end -local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency +local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency, lockID local keylen = #KEYS -for i=1,keylen,2 do +for i=1,keylen,3 do jobQueue = KEYS[i] inProgQueue = KEYS[i+1] + lockID = KEYS[i+2] pauseKey = getPauseKey(jobQueue) lockKey = getLockKey(jobQueue) maxConcurrency = tonumber(redis.call('get', getConcurrencyKey(jobQueue))) if haveJobs(jobQueue) and not isPaused(pauseKey) and canRun(lockKey, maxConcurrency) then res = redis.call('rpoplpush', jobQueue, inProgQueue) - return {res, jobQueue, inProgQueue} + if res then + acquireLock(jobQueue, lockID) + return {res, jobQueue, inProgQueue} + end end end -return nil`, redisLuaJobsPausedKey, redisLuaJobsLockedKey, redisLuaJobsConcurrencyKey) +return nil`, redisLuaJobsPausedKey, redisLuaJobsLockKey, redisLuaJobsLockInfoKey, redisLuaJobsConcurrencyKey, redisLuaAcquireLock) // Used by the reaper to re-enqueue jobs that were in progress // @@ -178,19 +213,24 @@ return nil`, redisLuaJobsPausedKey, redisLuaJobsLockedKey, redisLuaJobsConcurren var redisLuaReenqueueJob = fmt.Sprintf(` -- getLockKey inserted below %s +-- getLockInfoKey will be inserted below +%s +-- releaseLock will be inserted below +%s local keylen = #KEYS -local res, jobQueue, inProgQueue -for i=1,keylen,2 do +local res, jobQueue, inProgQueue, lockID +for i=1,keylen,3 do inProgQueue = KEYS[i] jobQueue = KEYS[i+1] + lockID = KEYS[i+2] res = redis.call('rpoplpush', inProgQueue, jobQueue) if res then - redis.call('decr', getLockKey(jobQueue)) + releaseLock(jobQueue, lockID) return {res, inProgQueue, jobQueue} end end -return nil`, redisLuaJobsLockedKey) +return nil`, redisLuaJobsLockKey, redisLuaJobsLockInfoKey, redisLuaReleaseLock) // KEYS[1] = zset of jobs (retry or scheduled), eg work:retry // KEYS[2] = zset of dead, eg work:dead. If we don't know the jobName of a job, we'll put it in dead. diff --git a/worker.go b/worker.go index 90da5d3d..b989ded2 100644 --- a/worker.go +++ b/worker.go @@ -63,7 +63,7 @@ func (w *worker) updateMiddlewareAndJobTypes(middleware []*middlewareHandler, jo } w.sampler = sampler w.jobTypes = jobTypes - w.redisFetchScript = redis.NewScript(len(jobTypes)*2, redisLuaFetchJob) + w.redisFetchScript = redis.NewScript(len(jobTypes)*3, redisLuaFetchJob) } func (w *worker) start() { @@ -104,6 +104,7 @@ func (w *worker) loop() { timer.Reset(0) case <-timer.C: gotJob := true + jobsPerLoop := 1 for gotJob { job, err := w.fetchJob() if err != nil { @@ -111,6 +112,12 @@ func (w *worker) loop() { gotJob = false timer.Reset(10 * time.Millisecond) } else if job != nil { + fmt.Printf("worker %v Fetched job num %v -- %v\n", w.workerID, jobsPerLoop, job) + jobsPerLoop++ + if jobsPerLoop > 1 { + + + } w.processJob(job) consequtiveNoJobs = 0 } else { @@ -124,6 +131,7 @@ func (w *worker) loop() { if idx >= int64(len(sleepBackoffsInMilliseconds)) { idx = int64(len(sleepBackoffsInMilliseconds)) - 1 } + // fmt.Printf("Jobs per loop=%v -- worker id=%v\n", jobsPerLoop, w.workerID) timer.Reset(time.Duration(sleepBackoffsInMilliseconds[idx]) * time.Millisecond) } } @@ -135,10 +143,10 @@ func (w *worker) fetchJob() (*Job, error) { // resort queues // NOTE: we could optimize this to only resort every second, or something. w.sampler.sample() - var scriptArgs = make([]interface{}, 0, len(w.sampler.samples)*2) + var scriptArgs = make([]interface{}, 0, len(w.sampler.samples)*3) for _, s := range w.sampler.samples { - scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg) + scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, w.poolID) } conn := w.pool.Get() @@ -224,6 +232,7 @@ func (w *worker) removeJobFromInProgress(job *Job) { conn.Send("MULTI") conn.Send("LREM", job.inProgQueue, 1, job.rawJSON) conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) + conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) if _, err := conn.Do("EXEC"); err != nil { logError("worker.remove_job_from_in_progress.lrem", err) } @@ -265,6 +274,7 @@ func (w *worker) addToRetry(job *Job, runErr error) { conn.Send("MULTI") conn.Send("LREM", job.inProgQueue, 1, job.rawJSON) conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) + conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) conn.Send("ZADD", redisKeyRetry(w.namespace), nowEpochSeconds()+backoff(job), rawJSON) if _, err = conn.Do("EXEC"); err != nil { logError("worker.add_to_retry.exec", err) @@ -290,6 +300,7 @@ func (w *worker) addToDead(job *Job, runErr error) { conn.Send("MULTI") conn.Send("LREM", job.inProgQueue, 1, job.rawJSON) conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) + conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) conn.Send("ZADD", redisKeyDead(w.namespace), nowEpochSeconds(), rawJSON) _, err = conn.Do("EXEC") if err != nil { diff --git a/worker_pool_test.go b/worker_pool_test.go index 5b95ef11..cce43b28 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -118,9 +118,8 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { pool := newTestPool(":6379") ns := "work" job1 := "job1" - numJobs, concurrency, sleepTime := 5, 5, 2 + numJobs, concurrency, sleepTime := 5, 5, 1000 wp := setupTestWorkerPool(pool, ns, job1, concurrency, JobOptions{Priority: 1, MaxConcurrency: 1}) - wp.Start() // enqueue some jobs enqueuer := NewEnqueuer(ns, pool) @@ -128,14 +127,22 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) assert.Nil(t, err) } - assert.True(t, int64(numJobs) >= listSize(pool, redisKeyJobs(ns, job1))) + jobsQueued := listSize(pool, redisKeyJobs(ns, job1)) + // make sure we've some jobs queued up + assert.True(t, jobsQueued > 1, "should be %v jobs queued up, but only found %v", numJobs, jobsQueued) // now make sure the during the duration of job execution there is never > 1 job in flight + // wp.Start() start := time.Now() totalRuntime := time.Duration(sleepTime * numJobs) * time.Millisecond for time.Since(start) < totalRuntime { jobsInProgress := listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1)) - assert.True(t, jobsInProgress <= 1, fmt.Sprintf("jobsInProgress should never exceed 1: actual=%d", jobsInProgress)) + assert.True(t, jobsInProgress <= 1, "jobsInProgress should never exceed 1: actual=%d", jobsInProgress) + // lock count for the job and lock info for the pool should never exceed 1 + jobLockCount := getInt64(pool, redisKeyJobsLock(ns, job1)) + assert.True(t, jobLockCount <= 1, "global lock count for job should never exceed 1, got: %v", jobLockCount) + wpLockCount := hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID) + assert.True(t, wpLockCount <= 1, "lock count for the worker pool should never exceed 1: actual=%v", wpLockCount) time.Sleep(time.Duration(sleepTime) * time.Millisecond) } wp.Drain() @@ -144,6 +151,8 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { // At this point it should all be empty. assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) } func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { @@ -151,9 +160,6 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { ns, job1 := "work", "job1" numJobs, concurrency, sleepTime := 5, 5, 2 wp := setupTestWorkerPool(pool, ns, job1, concurrency, JobOptions{Priority: 1, MaxConcurrency: 1}) - // reset the backoff times to help with testing - sleepBackoffsInMilliseconds = []int64{10, 10, 10, 10, 10} - wp.Start() // enqueue some jobs enqueuer := NewEnqueuer(ns, pool) @@ -161,11 +167,11 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) assert.Nil(t, err) } - assert.True(t, int64(numJobs) >= listSize(pool, redisKeyJobs(ns, job1))) - + // allow time for a job to start + time.Sleep(1 * time.Millisecond) // pause work and allow time for any outstanding jobs to finish pauseJobs(ns, job1, pool) - time.Sleep(time.Duration(sleepTime * 2) * time.Millisecond) + time.Sleep(2 * time.Millisecond) // check that we still have some jobs to process assert.True(t, listSize(pool, redisKeyJobs(ns, job1)) > int64(0)) @@ -174,6 +180,9 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { totalRuntime := time.Duration(sleepTime * numJobs) * time.Millisecond for time.Since(start) < totalRuntime { assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) + // lock count for the job and lock info for the pool should both be at 1 while job is running + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) time.Sleep(time.Duration(sleepTime) * time.Millisecond) } @@ -187,6 +196,8 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { // At this point it should all be empty. assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) } // Test Helpers @@ -197,11 +208,14 @@ func (t *TestContext) SleepyJob(job *Job) error { } func setupTestWorkerPool(pool *redis.Pool, namespace, jobName string, concurrency int, jobOpts JobOptions) *WorkerPool { - deleteQueue(pool, namespace, jobName) - deleteRetryAndDead(pool, namespace) - deletePausedAndLockedKeys(namespace, jobName, pool) + cleanKeyspace(namespace, pool) + //deleteQueue(pool, namespace, jobName) + //deleteRetryAndDead(pool, namespace) + //deletePausedAndLockedKeys(namespace, jobName, pool) wp := NewWorkerPool(TestContext{}, uint(concurrency), namespace, pool) wp.JobWithOptions(jobName, jobOpts, (*TestContext).SleepyJob) + // reset the backoff times to help with testing + sleepBackoffsInMilliseconds = []int64{10, 10, 10, 10, 10} return wp } diff --git a/worker_test.go b/worker_test.go index 9f01c2e7..6ebaf64a 100644 --- a/worker_test.go +++ b/worker_test.go @@ -93,6 +93,7 @@ func TestWorkerInProgress(t *testing.T) { job1 := "job1" deleteQueue(pool, ns, job1) deleteRetryAndDead(pool, ns) + deletePausedAndLockedKeys(ns, job1, pool) jobTypes := make(map[string]*jobType) jobTypes[job1] = &jobType{ @@ -117,6 +118,8 @@ func TestWorkerInProgress(t *testing.T) { time.Sleep(10 * time.Millisecond) assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 1, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) + assert.EqualValues(t, 1, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 1, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), w.poolID)) // nothing in the worker status w.observer.drain() @@ -143,6 +146,7 @@ func TestWorkerRetry(t *testing.T) { job1 := "job1" deleteQueue(pool, ns, job1) deleteRetryAndDead(pool, ns) + deletePausedAndLockedKeys(ns, job1, pool) jobTypes := make(map[string]*jobType) jobTypes[job1] = &jobType{ @@ -167,6 +171,8 @@ func TestWorkerRetry(t *testing.T) { assert.EqualValues(t, 0, zsetSize(pool, redisKeyDead(ns))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), w.poolID)) // Get the job on the retry queue ts, job := jobOnZset(pool, redisKeyRetry(ns)) @@ -239,6 +245,7 @@ func TestWorkerDead(t *testing.T) { deleteQueue(pool, ns, job1) deleteQueue(pool, ns, job2) deleteRetryAndDead(pool, ns) + deletePausedAndLockedKeys(ns, job1, pool) jobTypes := make(map[string]*jobType) jobTypes[job1] = &jobType{ @@ -273,8 +280,10 @@ func TestWorkerDead(t *testing.T) { assert.EqualValues(t, 1, zsetSize(pool, redisKeyDead(ns))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) + assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) + assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) - // Get the job on the retry queue + // Get the job on the dead queue ts, job := jobOnZset(pool, redisKeyDead(ns)) assert.True(t, ts <= nowEpochSeconds()) @@ -431,7 +440,7 @@ func zsetSize(pool *redis.Pool, key string) int64 { v, err := redis.Int64(conn.Do("ZCARD", key)) if err != nil { - panic("could not delete retry/dead queue: " + err.Error()) + panic("could not get ZSET size: " + err.Error()) } return v } @@ -442,7 +451,29 @@ func listSize(pool *redis.Pool, key string) int64 { v, err := redis.Int64(conn.Do("LLEN", key)) if err != nil { - panic("could not delete retry/dead queue: " + err.Error()) + panic("could not get list length: " + err.Error()) + } + return v +} + +func getInt64(pool *redis.Pool, key string) int64 { + conn := pool.Get() + defer conn.Close() + + v, err := redis.Int64(conn.Do("GET", key)) + if err != nil { + panic("could not GET int64: " + err.Error()) + } + return v +} + +func hgetInt64(pool *redis.Pool, redisKey, hashKey string) int64 { + conn := pool.Get() + defer conn.Close() + + v, err := redis.Int64(conn.Do("HGET", redisKey, hashKey)) + if err != nil { + panic("could not HGET int64: " + err.Error()) } return v } @@ -453,7 +484,7 @@ func jobOnZset(pool *redis.Pool, key string) (int64, *Job) { v, err := conn.Do("ZRANGE", key, 0, 0, "WITHSCORES") if err != nil { - panic("could not delete retry/dead queue: " + err.Error()) + panic("ZRANGE error: " + err.Error()) } vv := v.([]interface{}) @@ -478,7 +509,7 @@ func jobOnQueue(pool *redis.Pool, key string) *Job { rawJSON, err := redis.Bytes(conn.Do("RPOP", key)) if err != nil { - panic("could not delete retry/dead queue: " + err.Error()) + panic("could RPOP from job queue: " + err.Error()) } job, err := newJob(rawJSON, nil, nil) @@ -545,5 +576,8 @@ func deletePausedAndLockedKeys(namespace, jobName string, pool *redis.Pool) erro if _, err := conn.Do("DEL", redisKeyJobsLock(namespace, jobName)); err != nil { return err } + if _, err := conn.Do("DEL", redisKeyJobsLockInfo(namespace, jobName)); err != nil { + return err + } return nil } From d8613054891d6d25deec6b281d98ebefdb3186be Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Tue, 20 Jun 2017 15:02:42 -0400 Subject: [PATCH 3/9] bugfix --- redis.go | 6 +++--- worker.go | 8 -------- worker_pool_test.go | 31 ++++++++++++++++++------------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/redis.go b/redis.go index 12021b85..ccb8ffa2 100644 --- a/redis.go +++ b/redis.go @@ -129,9 +129,9 @@ end `) var redisLuaReleaseLock = fmt.Sprintf(` -local function releaseLock(jobQueue, workerPoolId) +local function releaseLock(jobQueue, lockID) redis.call('decr', getLockKey(jobQueue)) - redis.call('hincrby', getLockInfoKey(jobQueue), workerPoolId, -1) + redis.call('hincrby', getLockInfoKey(jobQueue), lockID, -1) end `) @@ -167,7 +167,7 @@ local function isPaused(pauseKey) return redis.call('get', pauseKey) end -local function canRun(lockKey, lockInfoKey, maxConcurrency) +local function canRun(lockKey, maxConcurrency) local activeJobs = tonumber(redis.call('get', lockKey)) if (not maxConcurrency or maxConcurrency == 0) or (not activeJobs or activeJobs < maxConcurrency) then -- default case: maxConcurrency not defined or set to 0 means no cap on concurrent jobs OR diff --git a/worker.go b/worker.go index b989ded2..23c79be1 100644 --- a/worker.go +++ b/worker.go @@ -104,7 +104,6 @@ func (w *worker) loop() { timer.Reset(0) case <-timer.C: gotJob := true - jobsPerLoop := 1 for gotJob { job, err := w.fetchJob() if err != nil { @@ -112,12 +111,6 @@ func (w *worker) loop() { gotJob = false timer.Reset(10 * time.Millisecond) } else if job != nil { - fmt.Printf("worker %v Fetched job num %v -- %v\n", w.workerID, jobsPerLoop, job) - jobsPerLoop++ - if jobsPerLoop > 1 { - - - } w.processJob(job) consequtiveNoJobs = 0 } else { @@ -131,7 +124,6 @@ func (w *worker) loop() { if idx >= int64(len(sleepBackoffsInMilliseconds)) { idx = int64(len(sleepBackoffsInMilliseconds)) - 1 } - // fmt.Printf("Jobs per loop=%v -- worker id=%v\n", jobsPerLoop, w.workerID) timer.Reset(time.Duration(sleepBackoffsInMilliseconds[idx]) * time.Millisecond) } } diff --git a/worker_pool_test.go b/worker_pool_test.go index cce43b28..052ffe1e 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -118,7 +118,7 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { pool := newTestPool(":6379") ns := "work" job1 := "job1" - numJobs, concurrency, sleepTime := 5, 5, 1000 + numJobs, concurrency, sleepTime := 5, 5, 2 wp := setupTestWorkerPool(pool, ns, job1, concurrency, JobOptions{Priority: 1, MaxConcurrency: 1}) wp.Start() // enqueue some jobs @@ -127,18 +127,20 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) assert.Nil(t, err) } + + // make sure we've enough jobs queued up to make an interesting test jobsQueued := listSize(pool, redisKeyJobs(ns, job1)) - // make sure we've some jobs queued up - assert.True(t, jobsQueued > 1, "should be %v jobs queued up, but only found %v", numJobs, jobsQueued) + assert.True(t, jobsQueued > 3, "should be at least 3 jobs queued up, but only found %v", jobsQueued) // now make sure the during the duration of job execution there is never > 1 job in flight - // wp.Start() start := time.Now() totalRuntime := time.Duration(sleepTime * numJobs) * time.Millisecond + time.Sleep(10 * time.Millisecond) for time.Since(start) < totalRuntime { + // jobs in progress, lock count for the job and lock info for the pool should never exceed 1 jobsInProgress := listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1)) assert.True(t, jobsInProgress <= 1, "jobsInProgress should never exceed 1: actual=%d", jobsInProgress) - // lock count for the job and lock info for the pool should never exceed 1 + jobLockCount := getInt64(pool, redisKeyJobsLock(ns, job1)) assert.True(t, jobLockCount <= 1, "global lock count for job should never exceed 1, got: %v", jobLockCount) wpLockCount := hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID) @@ -167,13 +169,17 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) assert.Nil(t, err) } - // allow time for a job to start - time.Sleep(1 * time.Millisecond) - // pause work and allow time for any outstanding jobs to finish + // provide time for jobs to process + time.Sleep(10 * time.Millisecond) + + // pause work, provide time for outstanding jobs to finish and queue up another job pauseJobs(ns, job1, pool) time.Sleep(2 * time.Millisecond) + _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) + assert.Nil(t, err) + // check that we still have some jobs to process - assert.True(t, listSize(pool, redisKeyJobs(ns, job1)) > int64(0)) + assert.True(t, listSize(pool, redisKeyJobs(ns, job1)) >= 1) // now make sure no jobs get started until we unpause start := time.Now() @@ -208,10 +214,9 @@ func (t *TestContext) SleepyJob(job *Job) error { } func setupTestWorkerPool(pool *redis.Pool, namespace, jobName string, concurrency int, jobOpts JobOptions) *WorkerPool { - cleanKeyspace(namespace, pool) - //deleteQueue(pool, namespace, jobName) - //deleteRetryAndDead(pool, namespace) - //deletePausedAndLockedKeys(namespace, jobName, pool) + deleteQueue(pool, namespace, jobName) + deleteRetryAndDead(pool, namespace) + deletePausedAndLockedKeys(namespace, jobName, pool) wp := NewWorkerPool(TestContext{}, uint(concurrency), namespace, pool) wp.JobWithOptions(jobName, jobOpts, (*TestContext).SleepyJob) From 7f698ff4f6e76d9d0f358c7cd138c062dae7c6dd Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Tue, 20 Jun 2017 22:16:26 -0400 Subject: [PATCH 4/9] reap stale locks --- dead_pool_reaper.go | 27 ++++++++++++++++-- dead_pool_reaper_test.go | 61 +++++++++++++++++++++++++++++++++++++--- redis.go | 36 ++++++++++++++++++++++-- 3 files changed, 115 insertions(+), 9 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index dd7e9e55..c6f140bb 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -93,10 +93,12 @@ func (r *deadPoolReaper) reap() error { return err } } - // Remove dead pool from worker pools set - _, err = conn.Do("SREM", workerPoolsKey, deadPoolID) - if err != nil { + if _, err = conn.Do("SREM", workerPoolsKey, deadPoolID); err != nil { + return err + } + // Cleanup any stale lock info + if err = r.cleanStaleLockInfo(deadPoolID, jobTypes); err != nil { return err } } @@ -104,6 +106,25 @@ func (r *deadPoolReaper) reap() error { return nil } +func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) error { + numKeys := len(jobTypes) * 2 + redisReapLocksScript := redis.NewScript(numKeys, redisLuaReapStaleLocks) + var scriptArgs = make([]interface{}, 0, numKeys+1) // +1 for argv[1] + + for _, jobType := range jobTypes { + scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType)) + } + scriptArgs = append(scriptArgs, poolID) // ARGV[1] + + conn := r.pool.Get() + defer conn.Close() + if _, err := redisReapLocksScript.Do(conn, scriptArgs...); err != nil { + return err + } + + return nil +} + func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error { numArgs := len(jobTypes) * 3 redisRequeueScript := redis.NewScript(numArgs, redisLuaReenqueueJob) diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index c4bc38e2..b1003fce 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -57,6 +57,8 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) _, err = conn.Do("incr", redisKeyJobsLock(ns, "type1")) assert.NoError(t, err) + _, err = conn.Do("hincrby", redisKeyJobsLockInfo(ns, "type1"), "2", 1) // worker pool 2 has lock + assert.NoError(t, err) // Ensure 0 jobs in jobs queue jobsCount, err := redis.Int(conn.Do("llen", redisKeyJobs(ns, "type1"))) @@ -82,10 +84,10 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 0, jobsCount) - // Lock count should get decremented - lockCount, err := redis.Int(conn.Do("get", redisKeyJobsLock(ns, "type1"))) - assert.NoError(t, err) - assert.Equal(t, 0, lockCount) + // Locks should get cleaned up + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, "type1"))) + v, _ := conn.Do("HGET", redisKeyJobsLockInfo(ns, "type1"), "2") + assert.Nil(t, v) } func TestDeadPoolReaperNoHeartbeat(t *testing.T) { @@ -273,3 +275,54 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { staleHeart.stop() wp.deadPoolReaper.stop() } + +func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + + conn := pool.Get() + defer conn.Close() + job1, job2 := "type1", "type2" + workerPoolID1, workerPoolID2 := "1", "2" + lock1 := redisKeyJobsLock(ns, job1) + lock2 := redisKeyJobsLock(ns, job2) + lockInfo1 := redisKeyJobsLockInfo(ns, job1) + lockInfo2 := redisKeyJobsLockInfo(ns, job2) + + // Create redis data + var err error + err = conn.Send("SET", lock1, 3) + assert.NoError(t, err) + err = conn.Send("SET", lock2, 1) + assert.NoError(t, err) + err = conn.Send("HSET", lockInfo1, workerPoolID1, 1) // workerPoolID1 holds 1 lock on job1 + assert.NoError(t, err) + err = conn.Send("HSET", lockInfo1, workerPoolID2, 2) // workerPoolID2 holds 2 locks on job1 + assert.NoError(t, err) + err = conn.Send("HSET", lockInfo2, workerPoolID2, 2) // test that we don't go below 0 on job2 lock + assert.NoError(t, err) + err = conn.Flush() + assert.NoError(t, err) + + reaper := newDeadPoolReaper(ns, pool) + // clean lock info for workerPoolID1 + reaper.cleanStaleLockInfo(workerPoolID1, []string{job1, job2}) + assert.NoError(t, err) + assert.EqualValues(t, 2, getInt64(pool, lock1)) // job1 lock should be decr by 1 + assert.EqualValues(t, 1, getInt64(pool, lock2)) // job2 lock is unchanged + v, _ := conn.Do("HGET", lockInfo1, workerPoolID1) // workerPoolID1 removed from job1's lock info + assert.Nil(t, v) + + // now clean lock info for workerPoolID2 + reaper.cleanStaleLockInfo(workerPoolID2, []string{job1, job2}) + assert.NoError(t, err) + // both locks should be at 0 + assert.EqualValues(t, 0, getInt64(pool, lock1)) + assert.EqualValues(t, 0, getInt64(pool, lock2)) + // worker pool ID 2 removed from both lock info hashes + v, err = conn.Do("HGET", lockInfo1, workerPoolID2) + assert.Nil(t, v) + v, err = conn.Do("HGET", lockInfo2, workerPoolID2) + assert.Nil(t, v) +} diff --git a/redis.go b/redis.go index ccb8ffa2..efe9e92d 100644 --- a/redis.go +++ b/redis.go @@ -232,6 +232,38 @@ for i=1,keylen,3 do end return nil`, redisLuaJobsLockKey, redisLuaJobsLockInfoKey, redisLuaReleaseLock) +// Used by the reaper to clean up stale locks +// +// KEYS[1] = the 1st job's lock +// KEYS[2] = the 1st job's lock info hash +// KEYS[3] = the 2nd job's lock +// KEYS[4] = the 2nd job's lock info hash +// ... +// KEYS[N] = the last job's lock +// KEYS[N+1] = the last job's lock info haash +// ARGV[1] = the dead worker pool id +var redisLuaReapStaleLocks = ` +local keylen = #KEYS +local lock, lockInfo, deadLockCount +local deadPoolID = ARGV[1] + +for i=1,keylen,2 do + lock = KEYS[i] + lockInfo = KEYS[i+1] + deadLockCount = tonumber(redis.call('hget', lockInfo, deadPoolID)) + + if deadLockCount then + redis.call('decrby', lock, deadLockCount) + redis.call('hdel', lockInfo, deadPoolID) + + if tonumber(redis.call('get', lock)) < 0 then + redis.call('set', lock, 0) + end + end +end +return nil +` + // KEYS[1] = zset of jobs (retry or scheduled), eg work:retry // KEYS[2] = zset of dead, eg work:dead. If we don't know the jobName of a job, we'll put it in dead. // KEYS[3...] = known job queues, eg ["work:jobs:create_watch", "work:jobs:send_email", ...] @@ -275,8 +307,8 @@ for i=1,jobCount do j = cjson.decode(jobs[i]) if j['id'] == ARGV[2] then redis.call('zrem', KEYS[1], jobs[i]) - deletedCount = deletedCount + 1 - jobBytes = jobs[i] + deletedCount = deletedCount + 1 + jobBytes = jobs[i] end end return {deletedCount, jobBytes} From 71b1eed3f86e2419ef0b1155f65f3dd955d935bd Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Tue, 20 Jun 2017 22:31:19 -0400 Subject: [PATCH 5/9] ARGV[1] cleanup in Lua scripts --- dead_pool_reaper.go | 9 +++++---- redis.go | 32 +++++++++++++++----------------- worker.go | 9 +++++---- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index c6f140bb..2312ed29 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -126,14 +126,15 @@ func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) er } func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error { - numArgs := len(jobTypes) * 3 - redisRequeueScript := redis.NewScript(numArgs, redisLuaReenqueueJob) - var scriptArgs = make([]interface{}, 0, numArgs) + numKeys := len(jobTypes) * 2 + redisRequeueScript := redis.NewScript(numKeys, redisLuaReenqueueJob) + var scriptArgs = make([]interface{}, 0, numKeys+1) for _, jobType := range jobTypes { // pops from in progress, push into job queue and decrement the queue lock - scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType), poolID) + scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType)) // KEYS[1...] } + scriptArgs = append(scriptArgs, poolID) // ARGV[1] conn := r.pool.Get() defer conn.Close() diff --git a/redis.go b/redis.go index efe9e92d..3cd264df 100644 --- a/redis.go +++ b/redis.go @@ -122,16 +122,16 @@ local function getConcurrencyKey(jobQueue) end`, concurrencyKeySuffix) var redisLuaAcquireLock = fmt.Sprintf(` -local function acquireLock(jobQueue, lockID) +local function acquireLock(jobQueue, workerPoolID) redis.call('incr', getLockKey(jobQueue)) - redis.call('hincrby', getLockInfoKey(jobQueue), lockID, 1) + redis.call('hincrby', getLockInfoKey(jobQueue), workerPoolID, 1) end `) var redisLuaReleaseLock = fmt.Sprintf(` -local function releaseLock(jobQueue, lockID) +local function releaseLock(jobQueue, workerPoolID) redis.call('decr', getLockKey(jobQueue)) - redis.call('hincrby', getLockInfoKey(jobQueue), lockID, -1) + redis.call('hincrby', getLockInfoKey(jobQueue), workerPoolID, -1) end `) @@ -139,14 +139,12 @@ end // // KEYS[1] = the 1st job queue we want to try, eg, "work:jobs:emails" // KEYS[2] = the 1st job queue's in prog queue, eg, "work:jobs:emails:97c84119d13cb54119a38743:inprogress" -// KEYS[3] = the 1st job queue's lockId, currently worker pool Id eg, "97c84119d13cb54119a38743" -// KEYS[4] = the 2nd job queue... -// KEYS[5] = the 2nd job queue's in prog queue... -// KEYS[6] = the 2nd job queue's lockId... +// KEYS[3] = the 2nd job queue... +// KEYS[4] = the 2nd job queue's in prog queue... // ... // KEYS[N] = the last job queue... // KEYS[N+1] = the last job queue's in prog queue... -// KEYS[N+2] = the last job queue's lockId... +// ARGV[1] = job queue's workerPoolID var redisLuaFetchJob = fmt.Sprintf(` -- getPauseKey will be inserted below %s @@ -180,13 +178,13 @@ local function canRun(lockKey, maxConcurrency) end end -local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency, lockID +local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency, workerPoolID local keylen = #KEYS +workerPoolID = ARGV[1] -for i=1,keylen,3 do +for i=1,keylen,2 do jobQueue = KEYS[i] inProgQueue = KEYS[i+1] - lockID = KEYS[i+2] pauseKey = getPauseKey(jobQueue) lockKey = getLockKey(jobQueue) maxConcurrency = tonumber(redis.call('get', getConcurrencyKey(jobQueue))) @@ -194,7 +192,7 @@ for i=1,keylen,3 do if haveJobs(jobQueue) and not isPaused(pauseKey) and canRun(lockKey, maxConcurrency) then res = redis.call('rpoplpush', jobQueue, inProgQueue) if res then - acquireLock(jobQueue, lockID) + acquireLock(jobQueue, workerPoolID) return {res, jobQueue, inProgQueue} end end @@ -219,14 +217,14 @@ var redisLuaReenqueueJob = fmt.Sprintf(` %s local keylen = #KEYS -local res, jobQueue, inProgQueue, lockID -for i=1,keylen,3 do +local res, jobQueue, inProgQueue, workerPoolID +workerPoolID = ARGV[1] +for i=1,keylen,2 do inProgQueue = KEYS[i] jobQueue = KEYS[i+1] - lockID = KEYS[i+2] res = redis.call('rpoplpush', inProgQueue, jobQueue) if res then - releaseLock(jobQueue, lockID) + releaseLock(jobQueue, workerPoolID) return {res, inProgQueue, jobQueue} end end diff --git a/worker.go b/worker.go index 23c79be1..1b16ef39 100644 --- a/worker.go +++ b/worker.go @@ -63,7 +63,7 @@ func (w *worker) updateMiddlewareAndJobTypes(middleware []*middlewareHandler, jo } w.sampler = sampler w.jobTypes = jobTypes - w.redisFetchScript = redis.NewScript(len(jobTypes)*3, redisLuaFetchJob) + w.redisFetchScript = redis.NewScript(len(jobTypes)*2, redisLuaFetchJob) } func (w *worker) start() { @@ -135,12 +135,13 @@ func (w *worker) fetchJob() (*Job, error) { // resort queues // NOTE: we could optimize this to only resort every second, or something. w.sampler.sample() - var scriptArgs = make([]interface{}, 0, len(w.sampler.samples)*3) + numKeys := len(w.sampler.samples)*2 + var scriptArgs = make([]interface{}, 0, numKeys+1) for _, s := range w.sampler.samples { - scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, w.poolID) + scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg) // KEYS[1...] } - + scriptArgs = append(scriptArgs, w.poolID) // ARGV[1] conn := w.pool.Get() defer conn.Close() From 9f15c0132b488edbd97a17695ba74c674cb66aa7 Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Tue, 20 Jun 2017 22:50:39 -0400 Subject: [PATCH 6/9] reap stale locks from current list of jobs if heartbeat expires --- dead_pool_reaper.go | 18 ++++++++++++------ dead_pool_reaper_test.go | 31 ++++++++++++++++++++++++------- redis.go | 1 + worker_pool.go | 2 +- 4 files changed, 38 insertions(+), 14 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index 2312ed29..a4dd11d0 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -16,21 +16,23 @@ const ( ) type deadPoolReaper struct { - namespace string - pool *redis.Pool - deadTime time.Duration - reapPeriod time.Duration + namespace string + pool *redis.Pool + deadTime time.Duration + reapPeriod time.Duration + curJobTypes []string stopChan chan struct{} doneStoppingChan chan struct{} } -func newDeadPoolReaper(namespace string, pool *redis.Pool) *deadPoolReaper { +func newDeadPoolReaper(namespace string, pool *redis.Pool, curJobTypes []string) *deadPoolReaper { return &deadPoolReaper{ namespace: namespace, pool: pool, deadTime: deadTime, reapPeriod: reapPeriod, + curJobTypes: curJobTypes, stopChan: make(chan struct{}), doneStoppingChan: make(chan struct{}), } @@ -86,19 +88,23 @@ func (r *deadPoolReaper) reap() error { // Cleanup all dead pools for deadPoolID, jobTypes := range deadPoolIDs { + lockJobTypes := jobTypes // if we found jobs from the heartbeat, requeue them and remove the heartbeat if len(jobTypes) > 0 { r.requeueInProgressJobs(deadPoolID, jobTypes) if _, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID)); err != nil { return err } + } else { + // try to clean up locks for the current set of jobs if heartbeat was not found + lockJobTypes = r.curJobTypes } // Remove dead pool from worker pools set if _, err = conn.Do("SREM", workerPoolsKey, deadPoolID); err != nil { return err } // Cleanup any stale lock info - if err = r.cleanStaleLockInfo(deadPoolID, jobTypes); err != nil { + if err = r.cleanStaleLockInfo(deadPoolID, lockJobTypes); err != nil { return err } } diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index b1003fce..46a8650f 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -47,7 +47,7 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool) + reaper := newDeadPoolReaper(ns, pool, []string{}) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools) @@ -108,11 +108,20 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { assert.NoError(t, err) err = conn.Send("SADD", workerPoolsKey, "3") assert.NoError(t, err) + // stale lock info + err = conn.Send("SET", redisKeyJobsLock(ns, "type1"), 3) + assert.NoError(t, err) + err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "1", 1) + assert.NoError(t, err) + err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "2", 1) + assert.NoError(t, err) + err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "3", 1) + assert.NoError(t, err) err = conn.Flush() assert.NoError(t, err) // Test getting dead pool ids - reaper := newDeadPoolReaper(ns, pool) + reaper := newDeadPoolReaper(ns, pool, []string{"type1"}) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, deadPools, map[string][]string{"1": {}, "2": {}, "3": {}}) @@ -154,6 +163,13 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { jobsCount, err = redis.Int(conn.Do("scard", redisKeyWorkerPools(ns))) assert.NoError(t, err) assert.Equal(t, 0, jobsCount) + + // Stale lock info was cleaned up using reap.curJobTypes + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, "type1"))) + for _, poolID := range []string{"1", "2", "3"} { + v, _ := conn.Do("HGET", redisKeyJobsLockInfo(ns, "type1"), poolID) + assert.Nil(t, v) + } } func TestDeadPoolReaperNoJobTypes(t *testing.T) { @@ -188,7 +204,7 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool) + reaper := newDeadPoolReaper(ns, pool, []string{}) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"2": {"type1", "type2"}}, deadPools) @@ -263,7 +279,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { // setup a worker pool and start the reaper, which should restart the stale job above wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1}) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool) + wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}) wp.deadPoolReaper.deadTime = expectedDeadTime wp.deadPoolReaper.start() // provide some initialization time @@ -284,6 +300,7 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { conn := pool.Get() defer conn.Close() job1, job2 := "type1", "type2" + jobNames := []string{job1, job2} workerPoolID1, workerPoolID2 := "1", "2" lock1 := redisKeyJobsLock(ns, job1) lock2 := redisKeyJobsLock(ns, job2) @@ -305,9 +322,9 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { err = conn.Flush() assert.NoError(t, err) - reaper := newDeadPoolReaper(ns, pool) + reaper := newDeadPoolReaper(ns, pool, jobNames) // clean lock info for workerPoolID1 - reaper.cleanStaleLockInfo(workerPoolID1, []string{job1, job2}) + reaper.cleanStaleLockInfo(workerPoolID1, jobNames) assert.NoError(t, err) assert.EqualValues(t, 2, getInt64(pool, lock1)) // job1 lock should be decr by 1 assert.EqualValues(t, 1, getInt64(pool, lock2)) // job2 lock is unchanged @@ -315,7 +332,7 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { assert.Nil(t, v) // now clean lock info for workerPoolID2 - reaper.cleanStaleLockInfo(workerPoolID2, []string{job1, job2}) + reaper.cleanStaleLockInfo(workerPoolID2, jobNames) assert.NoError(t, err) // both locks should be at 0 assert.EqualValues(t, 0, getInt64(pool, lock1)) diff --git a/redis.go b/redis.go index 3cd264df..4198ee92 100644 --- a/redis.go +++ b/redis.go @@ -208,6 +208,7 @@ return nil`, redisLuaJobsPausedKey, redisLuaJobsLockKey, redisLuaJobsLockInfoKey // ... // KEYS[N] = the last job's in progress queue // KEYS[N+1] = the last job's job queue +// ARGV[1] = workerPoolID for job queue var redisLuaReenqueueJob = fmt.Sprintf(` -- getLockKey inserted below %s diff --git a/worker_pool.go b/worker_pool.go index 04f19c9a..6a35d540 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -234,7 +234,7 @@ func (wp *WorkerPool) startRequeuers() { } wp.retrier = newRequeuer(wp.namespace, wp.pool, redisKeyRetry(wp.namespace), jobNames) wp.scheduler = newRequeuer(wp.namespace, wp.pool, redisKeyScheduled(wp.namespace), jobNames) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool) + wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, jobNames) wp.retrier.start() wp.scheduler.start() wp.deadPoolReaper.start() From 4499aa8354f64e4fd9ce3bd6dd4c3545dc1e124a Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Tue, 20 Jun 2017 23:06:35 -0400 Subject: [PATCH 7/9] small cleanup --- dead_pool_reaper.go | 5 ++--- dead_pool_reaper_test.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index a4dd11d0..70952107 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -174,11 +174,9 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) { deadPools := map[string][]string{} for _, workerPoolID := range workerPoolIDs { heartbeatKey := redisKeyHeartbeat(r.namespace, workerPoolID) - - // Check that last heartbeat was long enough ago to consider the pool dead heartbeatAt, err := redis.Int64(conn.Do("HGET", heartbeatKey, "heartbeat_at")) if err == redis.ErrNil { - // dead pool with no heartbeat + // heartbeat expired, save dead pool and use cur set of jobs from reaper deadPools[workerPoolID] = []string{} continue } @@ -186,6 +184,7 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) { return nil, err } + // Check that last heartbeat was long enough ago to consider the pool dead if time.Unix(heartbeatAt, 0).Add(r.deadTime).After(time.Now()) { continue } diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index 46a8650f..56fd7815 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -124,7 +124,7 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { reaper := newDeadPoolReaper(ns, pool, []string{"type1"}) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) - assert.Equal(t, deadPools, map[string][]string{"1": {}, "2": {}, "3": {}}) + assert.Equal(t, map[string][]string{"1": {}, "2": {}, "3": {}}, deadPools) // Test requeueing jobs _, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo") From 0eaa50a73b89b0c6b6464096c81c30c884ec1333 Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Wed, 21 Jun 2017 15:27:21 -0400 Subject: [PATCH 8/9] explicitly pass in KEYS to lua scripts for redis cluster support --- dead_pool_reaper.go | 13 ++--- dead_pool_reaper_test.go | 4 +- priority_sampler.go | 20 +++++--- priority_sampler_test.go | 15 ++++-- redis.go | 103 ++++++++++++--------------------------- worker.go | 16 ++++-- worker_pool_test.go | 4 +- 7 files changed, 78 insertions(+), 97 deletions(-) diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index 70952107..f276f068 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -10,9 +10,10 @@ import ( ) const ( - deadTime = 5 * time.Minute - reapPeriod = 10 * time.Minute - reapJitterSecs = 30 + deadTime = 5 * time.Minute + reapPeriod = 10 * time.Minute + reapJitterSecs = 30 + requeueKeysPerJob = 4 ) type deadPoolReaper struct { @@ -120,7 +121,7 @@ func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) er for _, jobType := range jobTypes { scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType)) } - scriptArgs = append(scriptArgs, poolID) // ARGV[1] + scriptArgs = append(scriptArgs, poolID) // ARGV[1] conn := r.pool.Get() defer conn.Close() @@ -132,13 +133,13 @@ func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) er } func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error { - numKeys := len(jobTypes) * 2 + numKeys := len(jobTypes) * requeueKeysPerJob redisRequeueScript := redis.NewScript(numKeys, redisLuaReenqueueJob) var scriptArgs = make([]interface{}, 0, numKeys+1) for _, jobType := range jobTypes { // pops from in progress, push into job queue and decrement the queue lock - scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType)) // KEYS[1...] + scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType), redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType)) // KEYS[1-4 * N] } scriptArgs = append(scriptArgs, poolID) // ARGV[1] diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index 56fd7815..fc31a949 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -326,8 +326,8 @@ func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { // clean lock info for workerPoolID1 reaper.cleanStaleLockInfo(workerPoolID1, jobNames) assert.NoError(t, err) - assert.EqualValues(t, 2, getInt64(pool, lock1)) // job1 lock should be decr by 1 - assert.EqualValues(t, 1, getInt64(pool, lock2)) // job2 lock is unchanged + assert.EqualValues(t, 2, getInt64(pool, lock1)) // job1 lock should be decr by 1 + assert.EqualValues(t, 1, getInt64(pool, lock2)) // job2 lock is unchanged v, _ := conn.Do("HGET", lockInfo1, workerPoolID1) // workerPoolID1 removed from job1's lock info assert.Nil(t, v) diff --git a/priority_sampler.go b/priority_sampler.go index 9c1c2cbd..a31d017f 100644 --- a/priority_sampler.go +++ b/priority_sampler.go @@ -13,15 +13,23 @@ type sampleItem struct { priority uint // payload: - redisJobs string - redisJobsInProg string + redisJobs string + redisJobsInProg string + redisJobsPaused string + redisJobsLock string + redisJobsLockInfo string + redisJobsMaxConcurrency string } -func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg string) { +func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg, redisJobsPaused, redisJobsLock, redisJobsLockInfo, redisJobsMaxConcurrency string) { sample := sampleItem{ - priority: priority, - redisJobs: redisJobs, - redisJobsInProg: redisJobsInProg, + priority: priority, + redisJobs: redisJobs, + redisJobsInProg: redisJobsInProg, + redisJobsPaused: redisJobsPaused, + redisJobsLock: redisJobsLock, + redisJobsLockInfo: redisJobsLockInfo, + redisJobsMaxConcurrency: redisJobsMaxConcurrency, } s.samples = append(s.samples, sample) s.sum += priority diff --git a/priority_sampler_test.go b/priority_sampler_test.go index 3ab254da..97485aff 100644 --- a/priority_sampler_test.go +++ b/priority_sampler_test.go @@ -8,9 +8,10 @@ import ( func TestPrioritySampler(t *testing.T) { ps := prioritySampler{} - ps.add(5, "jobs.5", "jobsinprog.5") - ps.add(2, "jobs.2a", "jobsinprog.2a") - ps.add(1, "jobs.1b", "jobsinprog.1b") + + ps.add(5, "jobs.5", "jobsinprog.5", "jobspaused.5", "jobslock.5", "jobslockinfo.5", "jobsconcurrency.5") + ps.add(2, "jobs.2a", "jobsinprog.2a", "jobspaused.2a", "jobslock.2a", "jobslockinfo.2a", "jobsconcurrency.2a") + ps.add(1, "jobs.1b", "jobsinprog.1b", "jobspaused.1b", "jobslock.1b", "jobslockinfo.1b", "jobsconcurrency.1b") var c5 = 0 var c2 = 0 @@ -41,7 +42,13 @@ func TestPrioritySampler(t *testing.T) { func BenchmarkPrioritySampler(b *testing.B) { ps := prioritySampler{} for i := 0; i < 200; i++ { - ps.add(uint(i)+1, "jobs."+fmt.Sprint(i), "jobsinprog."+fmt.Sprint(i)) + ps.add(uint(i)+1, + "jobs."+fmt.Sprint(i), + "jobsinprog."+fmt.Sprint(i), + "jobspaused."+fmt.Sprint(i), + "jobslock."+fmt.Sprint(i), + "jobslockinfo."+fmt.Sprint(i), + "jobsmaxconcurrency."+fmt.Sprint(i)) } b.ResetTimer() diff --git a/redis.go b/redis.go index 4198ee92..628eb079 100644 --- a/redis.go +++ b/redis.go @@ -56,24 +56,20 @@ func redisKeyHeartbeat(namespace, workerPoolID string) string { return redisNamespacePrefix(namespace) + "worker_pools:" + workerPoolID } -var pauseKeySuffix = "paused" func redisKeyJobsPaused(namespace, jobName string) string { - return redisKeyJobs(namespace, jobName) + ":" + pauseKeySuffix + return redisKeyJobs(namespace, jobName) + ":paused" } -var lockKeySuffix = "lock" func redisKeyJobsLock(namespace, jobName string) string { - return redisKeyJobs(namespace, jobName) + ":" + lockKeySuffix + return redisKeyJobs(namespace, jobName) + ":lock" } -var lockInfoKeySuffix = "lock_info" func redisKeyJobsLockInfo(namespace, jobName string) string { - return redisKeyJobs(namespace, jobName) + ":" + lockInfoKeySuffix + return redisKeyJobs(namespace, jobName) + ":lock_info" } -var concurrencyKeySuffix = "max_concurrency" func redisKeyJobsConcurrency(namespace, jobName string) string { - return redisKeyJobs(namespace, jobName) + ":" + concurrencyKeySuffix + return redisKeyJobs(namespace, jobName) + ":max_concurrency" } func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { @@ -98,43 +94,6 @@ func redisKeyLastPeriodicEnqueue(namespace string) string { return redisNamespacePrefix(namespace) + "last_periodic_enqueue" } -// Helpers functions used by Lua scripts below that need to match naming convention as redisKeyJobs* functions above -// note: all assume the local var jobQueue is in scope, which is the the val of redisKeyJobs() -// note: acquire/release lock functions assume getLockKey and getLockKeyInfo are in scope -var redisLuaJobsPausedKey = fmt.Sprintf(` -local function getPauseKey(jobQueue) - return string.format("%%s:%s", jobQueue) -end`, pauseKeySuffix) - -var redisLuaJobsLockKey = fmt.Sprintf(` -local function getLockKey(jobQueue) - return string.format("%%s:%s", jobQueue) -end`, lockKeySuffix) - -var redisLuaJobsLockInfoKey = fmt.Sprintf(` -local function getLockInfoKey(jobQueue) - return string.format("%%s:%s", jobQueue) -end`, lockInfoKeySuffix) - -var redisLuaJobsConcurrencyKey = fmt.Sprintf(` -local function getConcurrencyKey(jobQueue) - return string.format("%%s:%s", jobQueue) -end`, concurrencyKeySuffix) - -var redisLuaAcquireLock = fmt.Sprintf(` -local function acquireLock(jobQueue, workerPoolID) - redis.call('incr', getLockKey(jobQueue)) - redis.call('hincrby', getLockInfoKey(jobQueue), workerPoolID, 1) -end -`) - -var redisLuaReleaseLock = fmt.Sprintf(` -local function releaseLock(jobQueue, workerPoolID) - redis.call('decr', getLockKey(jobQueue)) - redis.call('hincrby', getLockInfoKey(jobQueue), workerPoolID, -1) -end -`) - // Used to fetch the next job to run // // KEYS[1] = the 1st job queue we want to try, eg, "work:jobs:emails" @@ -146,16 +105,10 @@ end // KEYS[N+1] = the last job queue's in prog queue... // ARGV[1] = job queue's workerPoolID var redisLuaFetchJob = fmt.Sprintf(` --- getPauseKey will be inserted below -%s --- getLockKey will be inserted below -%s --- getLockInfoKey will be inserted below -%s --- getConcurrencyKey will be inserted below -%s --- acquireLock will be inserted below -%s +local function acquireLock(lockKey, lockInfoKey, workerPoolID) + redis.call('incr', lockKey) + redis.call('hincrby', lockInfoKey, workerPoolID, 1) +end local function haveJobs(jobQueue) return redis.call('llen', jobQueue) > 0 @@ -178,26 +131,29 @@ local function canRun(lockKey, maxConcurrency) end end -local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency, workerPoolID +local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency, workerPoolID, concurrencyKey, lockInfoKey local keylen = #KEYS workerPoolID = ARGV[1] -for i=1,keylen,2 do +for i=1,keylen,%d do jobQueue = KEYS[i] inProgQueue = KEYS[i+1] - pauseKey = getPauseKey(jobQueue) - lockKey = getLockKey(jobQueue) - maxConcurrency = tonumber(redis.call('get', getConcurrencyKey(jobQueue))) + pauseKey = KEYS[i+2] + lockKey = KEYS[i+3] + lockInfoKey = KEYS[i+4] + concurrencyKey = KEYS[i+5] + + maxConcurrency = tonumber(redis.call('get', concurrencyKey)) if haveJobs(jobQueue) and not isPaused(pauseKey) and canRun(lockKey, maxConcurrency) then res = redis.call('rpoplpush', jobQueue, inProgQueue) if res then - acquireLock(jobQueue, workerPoolID) + acquireLock(lockKey, lockInfoKey, workerPoolID) return {res, jobQueue, inProgQueue} end end end -return nil`, redisLuaJobsPausedKey, redisLuaJobsLockKey, redisLuaJobsLockInfoKey, redisLuaJobsConcurrencyKey, redisLuaAcquireLock) +return nil`, fetchKeysPerJobType) // Used by the reaper to re-enqueue jobs that were in progress // @@ -210,26 +166,27 @@ return nil`, redisLuaJobsPausedKey, redisLuaJobsLockKey, redisLuaJobsLockInfoKey // KEYS[N+1] = the last job's job queue // ARGV[1] = workerPoolID for job queue var redisLuaReenqueueJob = fmt.Sprintf(` --- getLockKey inserted below -%s --- getLockInfoKey will be inserted below -%s --- releaseLock will be inserted below -%s +local function releaseLock(lockKey, lockInfoKey, workerPoolID) + redis.call('decr', lockKey) + redis.call('hincrby', lockInfoKey, workerPoolID, -1) +end local keylen = #KEYS -local res, jobQueue, inProgQueue, workerPoolID +local res, jobQueue, inProgQueue, workerPoolID, lockKey, lockInfoKey workerPoolID = ARGV[1] -for i=1,keylen,2 do + +for i=1,keylen,%d do inProgQueue = KEYS[i] jobQueue = KEYS[i+1] + lockKey = KEYS[i+2] + lockInfoKey = KEYS[i+3] res = redis.call('rpoplpush', inProgQueue, jobQueue) if res then - releaseLock(jobQueue, workerPoolID) + releaseLock(lockKey, lockInfoKey, workerPoolID) return {res, inProgQueue, jobQueue} end end -return nil`, redisLuaJobsLockKey, redisLuaJobsLockInfoKey, redisLuaReleaseLock) +return nil`, requeueKeysPerJob) // Used by the reaper to clean up stale locks // @@ -241,7 +198,7 @@ return nil`, redisLuaJobsLockKey, redisLuaJobsLockInfoKey, redisLuaReleaseLock) // KEYS[N] = the last job's lock // KEYS[N+1] = the last job's lock info haash // ARGV[1] = the dead worker pool id -var redisLuaReapStaleLocks = ` +var redisLuaReapStaleLocks = ` local keylen = #KEYS local lock, lockInfo, deadLockCount local deadPoolID = ARGV[1] diff --git a/worker.go b/worker.go index 1b16ef39..1a9c95dc 100644 --- a/worker.go +++ b/worker.go @@ -9,6 +9,8 @@ import ( "github.com/garyburd/redigo/redis" ) +const fetchKeysPerJobType = 6 + type worker struct { workerID string poolID string @@ -59,11 +61,17 @@ func (w *worker) updateMiddlewareAndJobTypes(middleware []*middlewareHandler, jo w.middleware = middleware sampler := prioritySampler{} for _, jt := range jobTypes { - sampler.add(jt.Priority, redisKeyJobs(w.namespace, jt.Name), redisKeyJobsInProgress(w.namespace, w.poolID, jt.Name)) + sampler.add(jt.Priority, + redisKeyJobs(w.namespace, jt.Name), + redisKeyJobsInProgress(w.namespace, w.poolID, jt.Name), + redisKeyJobsPaused(w.namespace, jt.Name), + redisKeyJobsLock(w.namespace, jt.Name), + redisKeyJobsLockInfo(w.namespace, jt.Name), + redisKeyJobsConcurrency(w.namespace, jt.Name)) } w.sampler = sampler w.jobTypes = jobTypes - w.redisFetchScript = redis.NewScript(len(jobTypes)*2, redisLuaFetchJob) + w.redisFetchScript = redis.NewScript(len(jobTypes)*fetchKeysPerJobType, redisLuaFetchJob) } func (w *worker) start() { @@ -135,11 +143,11 @@ func (w *worker) fetchJob() (*Job, error) { // resort queues // NOTE: we could optimize this to only resort every second, or something. w.sampler.sample() - numKeys := len(w.sampler.samples)*2 + numKeys := len(w.sampler.samples) * fetchKeysPerJobType var scriptArgs = make([]interface{}, 0, numKeys+1) for _, s := range w.sampler.samples { - scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg) // KEYS[1...] + scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, s.redisJobsPaused, s.redisJobsLock, s.redisJobsLockInfo, s.redisJobsMaxConcurrency) // KEYS[1-6 * N] } scriptArgs = append(scriptArgs, w.poolID) // ARGV[1] conn := w.pool.Get() diff --git a/worker_pool_test.go b/worker_pool_test.go index 052ffe1e..70444bde 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -134,7 +134,7 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { // now make sure the during the duration of job execution there is never > 1 job in flight start := time.Now() - totalRuntime := time.Duration(sleepTime * numJobs) * time.Millisecond + totalRuntime := time.Duration(sleepTime*numJobs) * time.Millisecond time.Sleep(10 * time.Millisecond) for time.Since(start) < totalRuntime { // jobs in progress, lock count for the job and lock info for the pool should never exceed 1 @@ -183,7 +183,7 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { // now make sure no jobs get started until we unpause start := time.Now() - totalRuntime := time.Duration(sleepTime * numJobs) * time.Millisecond + totalRuntime := time.Duration(sleepTime*numJobs) * time.Millisecond for time.Since(start) < totalRuntime { assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) // lock count for the job and lock info for the pool should both be at 1 while job is running From 8ac964461b21231ea3ab935624e165564d78ae17 Mon Sep 17 00:00:00 2001 From: Shaun Dunning Date: Wed, 21 Jun 2017 15:29:00 -0400 Subject: [PATCH 9/9] small cleanup to lua fetch fxn --- redis.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/redis.go b/redis.go index 628eb079..849b2264 100644 --- a/redis.go +++ b/redis.go @@ -146,11 +146,9 @@ for i=1,keylen,%d do maxConcurrency = tonumber(redis.call('get', concurrencyKey)) if haveJobs(jobQueue) and not isPaused(pauseKey) and canRun(lockKey, maxConcurrency) then + acquireLock(lockKey, lockInfoKey, workerPoolID) res = redis.call('rpoplpush', jobQueue, inProgQueue) - if res then - acquireLock(lockKey, lockInfoKey, workerPoolID) - return {res, jobQueue, inProgQueue} - end + return {res, jobQueue, inProgQueue} end end return nil`, fetchKeysPerJobType)