diff --git a/dead_pool_reaper.go b/dead_pool_reaper.go index 8ec3f864..f276f068 100644 --- a/dead_pool_reaper.go +++ b/dead_pool_reaper.go @@ -10,27 +10,30 @@ import ( ) const ( - deadTime = 5 * time.Minute - reapPeriod = 10 * time.Minute - reapJitterSecs = 30 + deadTime = 5 * time.Minute + reapPeriod = 10 * time.Minute + reapJitterSecs = 30 + requeueKeysPerJob = 4 ) type deadPoolReaper struct { - namespace string - pool *redis.Pool - deadTime time.Duration - reapPeriod time.Duration + namespace string + pool *redis.Pool + deadTime time.Duration + reapPeriod time.Duration + curJobTypes []string stopChan chan struct{} doneStoppingChan chan struct{} } -func newDeadPoolReaper(namespace string, pool *redis.Pool) *deadPoolReaper { +func newDeadPoolReaper(namespace string, pool *redis.Pool, curJobTypes []string) *deadPoolReaper { return &deadPoolReaper{ namespace: namespace, pool: pool, deadTime: deadTime, reapPeriod: reapPeriod, + curJobTypes: curJobTypes, stopChan: make(chan struct{}), doneStoppingChan: make(chan struct{}), } @@ -86,17 +89,23 @@ func (r *deadPoolReaper) reap() error { // Cleanup all dead pools for deadPoolID, jobTypes := range deadPoolIDs { + lockJobTypes := jobTypes // if we found jobs from the heartbeat, requeue them and remove the heartbeat if len(jobTypes) > 0 { r.requeueInProgressJobs(deadPoolID, jobTypes) if _, err = conn.Do("DEL", redisKeyHeartbeat(r.namespace, deadPoolID)); err != nil { return err } + } else { + // try to clean up locks for the current set of jobs if heartbeat was not found + lockJobTypes = r.curJobTypes } - // Remove dead pool from worker pools set - _, err = conn.Do("SREM", workerPoolsKey, deadPoolID) - if err != nil { + if _, err = conn.Do("SREM", workerPoolsKey, deadPoolID); err != nil { + return err + } + // Cleanup any stale lock info + if err = r.cleanStaleLockInfo(deadPoolID, lockJobTypes); err != nil { return err } } @@ -104,15 +113,35 @@ func (r *deadPoolReaper) reap() error { return nil } +func (r *deadPoolReaper) cleanStaleLockInfo(poolID string, jobTypes []string) error { + numKeys := len(jobTypes) * 2 + redisReapLocksScript := redis.NewScript(numKeys, redisLuaReapStaleLocks) + var scriptArgs = make([]interface{}, 0, numKeys+1) // +1 for argv[1] + + for _, jobType := range jobTypes { + scriptArgs = append(scriptArgs, redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType)) + } + scriptArgs = append(scriptArgs, poolID) // ARGV[1] + + conn := r.pool.Get() + defer conn.Close() + if _, err := redisReapLocksScript.Do(conn, scriptArgs...); err != nil { + return err + } + + return nil +} + func (r *deadPoolReaper) requeueInProgressJobs(poolID string, jobTypes []string) error { - numArgs := len(jobTypes) * 2 - redisRequeueScript := redis.NewScript(numArgs, redisLuaReenqueueJob) - var scriptArgs = make([]interface{}, 0, numArgs) + numKeys := len(jobTypes) * requeueKeysPerJob + redisRequeueScript := redis.NewScript(numKeys, redisLuaReenqueueJob) + var scriptArgs = make([]interface{}, 0, numKeys+1) for _, jobType := range jobTypes { // pops from in progress, push into job queue and decrement the queue lock - scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType)) + scriptArgs = append(scriptArgs, redisKeyJobsInProgress(r.namespace, poolID, jobType), redisKeyJobs(r.namespace, jobType), redisKeyJobsLock(r.namespace, jobType), redisKeyJobsLockInfo(r.namespace, jobType)) // KEYS[1-4 * N] } + scriptArgs = append(scriptArgs, poolID) // ARGV[1] conn := r.pool.Get() defer conn.Close() @@ -146,11 +175,9 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) { deadPools := map[string][]string{} for _, workerPoolID := range workerPoolIDs { heartbeatKey := redisKeyHeartbeat(r.namespace, workerPoolID) - - // Check that last heartbeat was long enough ago to consider the pool dead heartbeatAt, err := redis.Int64(conn.Do("HGET", heartbeatKey, "heartbeat_at")) if err == redis.ErrNil { - // dead pool with no heartbeat + // heartbeat expired, save dead pool and use cur set of jobs from reaper deadPools[workerPoolID] = []string{} continue } @@ -158,6 +185,7 @@ func (r *deadPoolReaper) findDeadPools() (map[string][]string, error) { return nil, err } + // Check that last heartbeat was long enough ago to consider the pool dead if time.Unix(heartbeatAt, 0).Add(r.deadTime).After(time.Now()) { continue } diff --git a/dead_pool_reaper_test.go b/dead_pool_reaper_test.go index c4bc38e2..fc31a949 100644 --- a/dead_pool_reaper_test.go +++ b/dead_pool_reaper_test.go @@ -47,7 +47,7 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool) + reaper := newDeadPoolReaper(ns, pool, []string{}) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"2": {"type1", "type2"}, "3": {"type1", "type2"}}, deadPools) @@ -57,6 +57,8 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) _, err = conn.Do("incr", redisKeyJobsLock(ns, "type1")) assert.NoError(t, err) + _, err = conn.Do("hincrby", redisKeyJobsLockInfo(ns, "type1"), "2", 1) // worker pool 2 has lock + assert.NoError(t, err) // Ensure 0 jobs in jobs queue jobsCount, err := redis.Int(conn.Do("llen", redisKeyJobs(ns, "type1"))) @@ -82,10 +84,10 @@ func TestDeadPoolReaper(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 0, jobsCount) - // Lock count should get decremented - lockCount, err := redis.Int(conn.Do("get", redisKeyJobsLock(ns, "type1"))) - assert.NoError(t, err) - assert.Equal(t, 0, lockCount) + // Locks should get cleaned up + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, "type1"))) + v, _ := conn.Do("HGET", redisKeyJobsLockInfo(ns, "type1"), "2") + assert.Nil(t, v) } func TestDeadPoolReaperNoHeartbeat(t *testing.T) { @@ -106,14 +108,23 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { assert.NoError(t, err) err = conn.Send("SADD", workerPoolsKey, "3") assert.NoError(t, err) + // stale lock info + err = conn.Send("SET", redisKeyJobsLock(ns, "type1"), 3) + assert.NoError(t, err) + err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "1", 1) + assert.NoError(t, err) + err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "2", 1) + assert.NoError(t, err) + err = conn.Send("HSET", redisKeyJobsLockInfo(ns, "type1"), "3", 1) + assert.NoError(t, err) err = conn.Flush() assert.NoError(t, err) // Test getting dead pool ids - reaper := newDeadPoolReaper(ns, pool) + reaper := newDeadPoolReaper(ns, pool, []string{"type1"}) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) - assert.Equal(t, deadPools, map[string][]string{"1": {}, "2": {}, "3": {}}) + assert.Equal(t, map[string][]string{"1": {}, "2": {}, "3": {}}, deadPools) // Test requeueing jobs _, err = conn.Do("lpush", redisKeyJobsInProgress(ns, "2", "type1"), "foo") @@ -152,6 +163,13 @@ func TestDeadPoolReaperNoHeartbeat(t *testing.T) { jobsCount, err = redis.Int(conn.Do("scard", redisKeyWorkerPools(ns))) assert.NoError(t, err) assert.Equal(t, 0, jobsCount) + + // Stale lock info was cleaned up using reap.curJobTypes + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, "type1"))) + for _, poolID := range []string{"1", "2", "3"} { + v, _ := conn.Do("HGET", redisKeyJobsLockInfo(ns, "type1"), poolID) + assert.Nil(t, v) + } } func TestDeadPoolReaperNoJobTypes(t *testing.T) { @@ -186,7 +204,7 @@ func TestDeadPoolReaperNoJobTypes(t *testing.T) { assert.NoError(t, err) // Test getting dead pool - reaper := newDeadPoolReaper(ns, pool) + reaper := newDeadPoolReaper(ns, pool, []string{}) deadPools, err := reaper.findDeadPools() assert.NoError(t, err) assert.Equal(t, map[string][]string{"2": {"type1", "type2"}}, deadPools) @@ -261,7 +279,7 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { // setup a worker pool and start the reaper, which should restart the stale job above wp := setupTestWorkerPool(pool, ns, job1, 1, JobOptions{Priority: 1}) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool) + wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, []string{"job1"}) wp.deadPoolReaper.deadTime = expectedDeadTime wp.deadPoolReaper.start() // provide some initialization time @@ -273,3 +291,55 @@ func TestDeadPoolReaperWithWorkerPools(t *testing.T) { staleHeart.stop() wp.deadPoolReaper.stop() } + +func TestDeadPoolReaperCleanStaleLocks(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + + conn := pool.Get() + defer conn.Close() + job1, job2 := "type1", "type2" + jobNames := []string{job1, job2} + workerPoolID1, workerPoolID2 := "1", "2" + lock1 := redisKeyJobsLock(ns, job1) + lock2 := redisKeyJobsLock(ns, job2) + lockInfo1 := redisKeyJobsLockInfo(ns, job1) + lockInfo2 := redisKeyJobsLockInfo(ns, job2) + + // Create redis data + var err error + err = conn.Send("SET", lock1, 3) + assert.NoError(t, err) + err = conn.Send("SET", lock2, 1) + assert.NoError(t, err) + err = conn.Send("HSET", lockInfo1, workerPoolID1, 1) // workerPoolID1 holds 1 lock on job1 + assert.NoError(t, err) + err = conn.Send("HSET", lockInfo1, workerPoolID2, 2) // workerPoolID2 holds 2 locks on job1 + assert.NoError(t, err) + err = conn.Send("HSET", lockInfo2, workerPoolID2, 2) // test that we don't go below 0 on job2 lock + assert.NoError(t, err) + err = conn.Flush() + assert.NoError(t, err) + + reaper := newDeadPoolReaper(ns, pool, jobNames) + // clean lock info for workerPoolID1 + reaper.cleanStaleLockInfo(workerPoolID1, jobNames) + assert.NoError(t, err) + assert.EqualValues(t, 2, getInt64(pool, lock1)) // job1 lock should be decr by 1 + assert.EqualValues(t, 1, getInt64(pool, lock2)) // job2 lock is unchanged + v, _ := conn.Do("HGET", lockInfo1, workerPoolID1) // workerPoolID1 removed from job1's lock info + assert.Nil(t, v) + + // now clean lock info for workerPoolID2 + reaper.cleanStaleLockInfo(workerPoolID2, jobNames) + assert.NoError(t, err) + // both locks should be at 0 + assert.EqualValues(t, 0, getInt64(pool, lock1)) + assert.EqualValues(t, 0, getInt64(pool, lock2)) + // worker pool ID 2 removed from both lock info hashes + v, err = conn.Do("HGET", lockInfo1, workerPoolID2) + assert.Nil(t, v) + v, err = conn.Do("HGET", lockInfo2, workerPoolID2) + assert.Nil(t, v) +} diff --git a/priority_sampler.go b/priority_sampler.go index 9c1c2cbd..a31d017f 100644 --- a/priority_sampler.go +++ b/priority_sampler.go @@ -13,15 +13,23 @@ type sampleItem struct { priority uint // payload: - redisJobs string - redisJobsInProg string + redisJobs string + redisJobsInProg string + redisJobsPaused string + redisJobsLock string + redisJobsLockInfo string + redisJobsMaxConcurrency string } -func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg string) { +func (s *prioritySampler) add(priority uint, redisJobs, redisJobsInProg, redisJobsPaused, redisJobsLock, redisJobsLockInfo, redisJobsMaxConcurrency string) { sample := sampleItem{ - priority: priority, - redisJobs: redisJobs, - redisJobsInProg: redisJobsInProg, + priority: priority, + redisJobs: redisJobs, + redisJobsInProg: redisJobsInProg, + redisJobsPaused: redisJobsPaused, + redisJobsLock: redisJobsLock, + redisJobsLockInfo: redisJobsLockInfo, + redisJobsMaxConcurrency: redisJobsMaxConcurrency, } s.samples = append(s.samples, sample) s.sum += priority diff --git a/priority_sampler_test.go b/priority_sampler_test.go index 3ab254da..97485aff 100644 --- a/priority_sampler_test.go +++ b/priority_sampler_test.go @@ -8,9 +8,10 @@ import ( func TestPrioritySampler(t *testing.T) { ps := prioritySampler{} - ps.add(5, "jobs.5", "jobsinprog.5") - ps.add(2, "jobs.2a", "jobsinprog.2a") - ps.add(1, "jobs.1b", "jobsinprog.1b") + + ps.add(5, "jobs.5", "jobsinprog.5", "jobspaused.5", "jobslock.5", "jobslockinfo.5", "jobsconcurrency.5") + ps.add(2, "jobs.2a", "jobsinprog.2a", "jobspaused.2a", "jobslock.2a", "jobslockinfo.2a", "jobsconcurrency.2a") + ps.add(1, "jobs.1b", "jobsinprog.1b", "jobspaused.1b", "jobslock.1b", "jobslockinfo.1b", "jobsconcurrency.1b") var c5 = 0 var c2 = 0 @@ -41,7 +42,13 @@ func TestPrioritySampler(t *testing.T) { func BenchmarkPrioritySampler(b *testing.B) { ps := prioritySampler{} for i := 0; i < 200; i++ { - ps.add(uint(i)+1, "jobs."+fmt.Sprint(i), "jobsinprog."+fmt.Sprint(i)) + ps.add(uint(i)+1, + "jobs."+fmt.Sprint(i), + "jobsinprog."+fmt.Sprint(i), + "jobspaused."+fmt.Sprint(i), + "jobslock."+fmt.Sprint(i), + "jobslockinfo."+fmt.Sprint(i), + "jobsmaxconcurrency."+fmt.Sprint(i)) } b.ResetTimer() diff --git a/redis.go b/redis.go index 9da02018..849b2264 100644 --- a/redis.go +++ b/redis.go @@ -56,19 +56,20 @@ func redisKeyHeartbeat(namespace, workerPoolID string) string { return redisNamespacePrefix(namespace) + "worker_pools:" + workerPoolID } -var pauseKeySuffix = "paused" func redisKeyJobsPaused(namespace, jobName string) string { - return redisKeyJobs(namespace, jobName) + ":" + pauseKeySuffix + return redisKeyJobs(namespace, jobName) + ":paused" } -var lockKeySuffix = "lock" func redisKeyJobsLock(namespace, jobName string) string { - return redisKeyJobs(namespace, jobName) + ":" + lockKeySuffix + return redisKeyJobs(namespace, jobName) + ":lock" +} + +func redisKeyJobsLockInfo(namespace, jobName string) string { + return redisKeyJobs(namespace, jobName) + ":lock_info" } -var concurrencyKeySuffix = "max_concurrency" func redisKeyJobsConcurrency(namespace, jobName string) string { - return redisKeyJobs(namespace, jobName) + ":" + concurrencyKeySuffix + return redisKeyJobs(namespace, jobName) + ":max_concurrency" } func redisKeyUniqueJob(namespace, jobName string, args map[string]interface{}) (string, error) { @@ -93,23 +94,6 @@ func redisKeyLastPeriodicEnqueue(namespace string) string { return redisNamespacePrefix(namespace) + "last_periodic_enqueue" } -// Used by Lua scripts below and needs to follow same naming convention as redisKeyJobs* functions above -// note: all assume the local var jobQueue is present, which is the the val of redisKeyJobs() -var redisLuaJobsPausedKey = fmt.Sprintf(` -local function getPauseKey(jobQueue) - return string.format("%%s:%s", jobQueue) -end`, pauseKeySuffix) - -var redisLuaJobsLockedKey = fmt.Sprintf(` -local function getLockKey(jobQueue) - return string.format("%%s:%s", jobQueue) -end`, lockKeySuffix) - -var redisLuaJobsConcurrencyKey = fmt.Sprintf(` -local function getConcurrencyKey(jobQueue) - return string.format("%%s:%s", jobQueue) -end`, concurrencyKeySuffix) - // Used to fetch the next job to run // // KEYS[1] = the 1st job queue we want to try, eg, "work:jobs:emails" @@ -119,13 +103,12 @@ end`, concurrencyKeySuffix) // ... // KEYS[N] = the last job queue... // KEYS[N+1] = the last job queue's in prog queue... +// ARGV[1] = job queue's workerPoolID var redisLuaFetchJob = fmt.Sprintf(` --- getPauseKey will be inserted below -%s --- getLockKey will be inserted below -%s --- getConcurrencyKey will be inserted below -%s +local function acquireLock(lockKey, lockInfoKey, workerPoolID) + redis.call('incr', lockKey) + redis.call('hincrby', lockInfoKey, workerPoolID, 1) +end local function haveJobs(jobQueue) return redis.call('llen', jobQueue) > 0 @@ -141,7 +124,6 @@ local function canRun(lockKey, maxConcurrency) -- default case: maxConcurrency not defined or set to 0 means no cap on concurrent jobs OR -- maxConcurrency set, but lock does not yet exist OR -- maxConcurrency set, lock is set, but not yet at max concurrency - redis.call('incr', lockKey) return true else -- we are at max capacity for running jobs @@ -149,22 +131,27 @@ local function canRun(lockKey, maxConcurrency) end end -local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency +local res, jobQueue, inProgQueue, pauseKey, lockKey, maxConcurrency, workerPoolID, concurrencyKey, lockInfoKey local keylen = #KEYS +workerPoolID = ARGV[1] -for i=1,keylen,2 do +for i=1,keylen,%d do jobQueue = KEYS[i] inProgQueue = KEYS[i+1] - pauseKey = getPauseKey(jobQueue) - lockKey = getLockKey(jobQueue) - maxConcurrency = tonumber(redis.call('get', getConcurrencyKey(jobQueue))) + pauseKey = KEYS[i+2] + lockKey = KEYS[i+3] + lockInfoKey = KEYS[i+4] + concurrencyKey = KEYS[i+5] + + maxConcurrency = tonumber(redis.call('get', concurrencyKey)) if haveJobs(jobQueue) and not isPaused(pauseKey) and canRun(lockKey, maxConcurrency) then + acquireLock(lockKey, lockInfoKey, workerPoolID) res = redis.call('rpoplpush', jobQueue, inProgQueue) return {res, jobQueue, inProgQueue} end end -return nil`, redisLuaJobsPausedKey, redisLuaJobsLockedKey, redisLuaJobsConcurrencyKey) +return nil`, fetchKeysPerJobType) // Used by the reaper to re-enqueue jobs that were in progress // @@ -175,22 +162,61 @@ return nil`, redisLuaJobsPausedKey, redisLuaJobsLockedKey, redisLuaJobsConcurren // ... // KEYS[N] = the last job's in progress queue // KEYS[N+1] = the last job's job queue +// ARGV[1] = workerPoolID for job queue var redisLuaReenqueueJob = fmt.Sprintf(` --- getLockKey inserted below -%s +local function releaseLock(lockKey, lockInfoKey, workerPoolID) + redis.call('decr', lockKey) + redis.call('hincrby', lockInfoKey, workerPoolID, -1) +end local keylen = #KEYS -local res, jobQueue, inProgQueue -for i=1,keylen,2 do +local res, jobQueue, inProgQueue, workerPoolID, lockKey, lockInfoKey +workerPoolID = ARGV[1] + +for i=1,keylen,%d do inProgQueue = KEYS[i] jobQueue = KEYS[i+1] + lockKey = KEYS[i+2] + lockInfoKey = KEYS[i+3] res = redis.call('rpoplpush', inProgQueue, jobQueue) if res then - redis.call('decr', getLockKey(jobQueue)) + releaseLock(lockKey, lockInfoKey, workerPoolID) return {res, inProgQueue, jobQueue} end end -return nil`, redisLuaJobsLockedKey) +return nil`, requeueKeysPerJob) + +// Used by the reaper to clean up stale locks +// +// KEYS[1] = the 1st job's lock +// KEYS[2] = the 1st job's lock info hash +// KEYS[3] = the 2nd job's lock +// KEYS[4] = the 2nd job's lock info hash +// ... +// KEYS[N] = the last job's lock +// KEYS[N+1] = the last job's lock info haash +// ARGV[1] = the dead worker pool id +var redisLuaReapStaleLocks = ` +local keylen = #KEYS +local lock, lockInfo, deadLockCount +local deadPoolID = ARGV[1] + +for i=1,keylen,2 do + lock = KEYS[i] + lockInfo = KEYS[i+1] + deadLockCount = tonumber(redis.call('hget', lockInfo, deadPoolID)) + + if deadLockCount then + redis.call('decrby', lock, deadLockCount) + redis.call('hdel', lockInfo, deadPoolID) + + if tonumber(redis.call('get', lock)) < 0 then + redis.call('set', lock, 0) + end + end +end +return nil +` // KEYS[1] = zset of jobs (retry or scheduled), eg work:retry // KEYS[2] = zset of dead, eg work:dead. If we don't know the jobName of a job, we'll put it in dead. @@ -235,8 +261,8 @@ for i=1,jobCount do j = cjson.decode(jobs[i]) if j['id'] == ARGV[2] then redis.call('zrem', KEYS[1], jobs[i]) - deletedCount = deletedCount + 1 - jobBytes = jobs[i] + deletedCount = deletedCount + 1 + jobBytes = jobs[i] end end return {deletedCount, jobBytes} @@ -320,7 +346,7 @@ return requeuedCount ` // KEYS[1] = job queue to push onto -// KEYS[2] = Unique job's key. Test for existance and set if we push. +// KEYS[2] = Unique job's key. Test for existence and set if we push. // ARGV[1] = job var redisLuaEnqueueUnique = ` if redis.call('set', KEYS[2], '1', 'NX', 'EX', '86400') then diff --git a/worker.go b/worker.go index 90da5d3d..1a9c95dc 100644 --- a/worker.go +++ b/worker.go @@ -9,6 +9,8 @@ import ( "github.com/garyburd/redigo/redis" ) +const fetchKeysPerJobType = 6 + type worker struct { workerID string poolID string @@ -59,11 +61,17 @@ func (w *worker) updateMiddlewareAndJobTypes(middleware []*middlewareHandler, jo w.middleware = middleware sampler := prioritySampler{} for _, jt := range jobTypes { - sampler.add(jt.Priority, redisKeyJobs(w.namespace, jt.Name), redisKeyJobsInProgress(w.namespace, w.poolID, jt.Name)) + sampler.add(jt.Priority, + redisKeyJobs(w.namespace, jt.Name), + redisKeyJobsInProgress(w.namespace, w.poolID, jt.Name), + redisKeyJobsPaused(w.namespace, jt.Name), + redisKeyJobsLock(w.namespace, jt.Name), + redisKeyJobsLockInfo(w.namespace, jt.Name), + redisKeyJobsConcurrency(w.namespace, jt.Name)) } w.sampler = sampler w.jobTypes = jobTypes - w.redisFetchScript = redis.NewScript(len(jobTypes)*2, redisLuaFetchJob) + w.redisFetchScript = redis.NewScript(len(jobTypes)*fetchKeysPerJobType, redisLuaFetchJob) } func (w *worker) start() { @@ -135,12 +143,13 @@ func (w *worker) fetchJob() (*Job, error) { // resort queues // NOTE: we could optimize this to only resort every second, or something. w.sampler.sample() - var scriptArgs = make([]interface{}, 0, len(w.sampler.samples)*2) + numKeys := len(w.sampler.samples) * fetchKeysPerJobType + var scriptArgs = make([]interface{}, 0, numKeys+1) for _, s := range w.sampler.samples { - scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg) + scriptArgs = append(scriptArgs, s.redisJobs, s.redisJobsInProg, s.redisJobsPaused, s.redisJobsLock, s.redisJobsLockInfo, s.redisJobsMaxConcurrency) // KEYS[1-6 * N] } - + scriptArgs = append(scriptArgs, w.poolID) // ARGV[1] conn := w.pool.Get() defer conn.Close() @@ -224,6 +233,7 @@ func (w *worker) removeJobFromInProgress(job *Job) { conn.Send("MULTI") conn.Send("LREM", job.inProgQueue, 1, job.rawJSON) conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) + conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) if _, err := conn.Do("EXEC"); err != nil { logError("worker.remove_job_from_in_progress.lrem", err) } @@ -265,6 +275,7 @@ func (w *worker) addToRetry(job *Job, runErr error) { conn.Send("MULTI") conn.Send("LREM", job.inProgQueue, 1, job.rawJSON) conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) + conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) conn.Send("ZADD", redisKeyRetry(w.namespace), nowEpochSeconds()+backoff(job), rawJSON) if _, err = conn.Do("EXEC"); err != nil { logError("worker.add_to_retry.exec", err) @@ -290,6 +301,7 @@ func (w *worker) addToDead(job *Job, runErr error) { conn.Send("MULTI") conn.Send("LREM", job.inProgQueue, 1, job.rawJSON) conn.Send("DECR", redisKeyJobsLock(w.namespace, job.Name)) + conn.Send("HINCRBY", redisKeyJobsLockInfo(w.namespace, job.Name), w.poolID, -1) conn.Send("ZADD", redisKeyDead(w.namespace), nowEpochSeconds(), rawJSON) _, err = conn.Do("EXEC") if err != nil { diff --git a/worker_pool.go b/worker_pool.go index 04f19c9a..6a35d540 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -234,7 +234,7 @@ func (wp *WorkerPool) startRequeuers() { } wp.retrier = newRequeuer(wp.namespace, wp.pool, redisKeyRetry(wp.namespace), jobNames) wp.scheduler = newRequeuer(wp.namespace, wp.pool, redisKeyScheduled(wp.namespace), jobNames) - wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool) + wp.deadPoolReaper = newDeadPoolReaper(wp.namespace, wp.pool, jobNames) wp.retrier.start() wp.scheduler.start() wp.deadPoolReaper.start() diff --git a/worker_pool_test.go b/worker_pool_test.go index 5b95ef11..70444bde 100644 --- a/worker_pool_test.go +++ b/worker_pool_test.go @@ -120,7 +120,6 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { job1 := "job1" numJobs, concurrency, sleepTime := 5, 5, 2 wp := setupTestWorkerPool(pool, ns, job1, concurrency, JobOptions{Priority: 1, MaxConcurrency: 1}) - wp.Start() // enqueue some jobs enqueuer := NewEnqueuer(ns, pool) @@ -128,14 +127,24 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) assert.Nil(t, err) } - assert.True(t, int64(numJobs) >= listSize(pool, redisKeyJobs(ns, job1))) + + // make sure we've enough jobs queued up to make an interesting test + jobsQueued := listSize(pool, redisKeyJobs(ns, job1)) + assert.True(t, jobsQueued > 3, "should be at least 3 jobs queued up, but only found %v", jobsQueued) // now make sure the during the duration of job execution there is never > 1 job in flight start := time.Now() - totalRuntime := time.Duration(sleepTime * numJobs) * time.Millisecond + totalRuntime := time.Duration(sleepTime*numJobs) * time.Millisecond + time.Sleep(10 * time.Millisecond) for time.Since(start) < totalRuntime { + // jobs in progress, lock count for the job and lock info for the pool should never exceed 1 jobsInProgress := listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1)) - assert.True(t, jobsInProgress <= 1, fmt.Sprintf("jobsInProgress should never exceed 1: actual=%d", jobsInProgress)) + assert.True(t, jobsInProgress <= 1, "jobsInProgress should never exceed 1: actual=%d", jobsInProgress) + + jobLockCount := getInt64(pool, redisKeyJobsLock(ns, job1)) + assert.True(t, jobLockCount <= 1, "global lock count for job should never exceed 1, got: %v", jobLockCount) + wpLockCount := hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID) + assert.True(t, wpLockCount <= 1, "lock count for the worker pool should never exceed 1: actual=%v", wpLockCount) time.Sleep(time.Duration(sleepTime) * time.Millisecond) } wp.Drain() @@ -144,6 +153,8 @@ func TestWorkersPoolRunSingleThreaded(t *testing.T) { // At this point it should all be empty. assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) } func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { @@ -151,9 +162,6 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { ns, job1 := "work", "job1" numJobs, concurrency, sleepTime := 5, 5, 2 wp := setupTestWorkerPool(pool, ns, job1, concurrency, JobOptions{Priority: 1, MaxConcurrency: 1}) - // reset the backoff times to help with testing - sleepBackoffsInMilliseconds = []int64{10, 10, 10, 10, 10} - wp.Start() // enqueue some jobs enqueuer := NewEnqueuer(ns, pool) @@ -161,19 +169,26 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) assert.Nil(t, err) } - assert.True(t, int64(numJobs) >= listSize(pool, redisKeyJobs(ns, job1))) + // provide time for jobs to process + time.Sleep(10 * time.Millisecond) - // pause work and allow time for any outstanding jobs to finish + // pause work, provide time for outstanding jobs to finish and queue up another job pauseJobs(ns, job1, pool) - time.Sleep(time.Duration(sleepTime * 2) * time.Millisecond) + time.Sleep(2 * time.Millisecond) + _, err := enqueuer.Enqueue(job1, Q{"sleep": sleepTime}) + assert.Nil(t, err) + // check that we still have some jobs to process - assert.True(t, listSize(pool, redisKeyJobs(ns, job1)) > int64(0)) + assert.True(t, listSize(pool, redisKeyJobs(ns, job1)) >= 1) // now make sure no jobs get started until we unpause start := time.Now() - totalRuntime := time.Duration(sleepTime * numJobs) * time.Millisecond + totalRuntime := time.Duration(sleepTime*numJobs) * time.Millisecond for time.Since(start) < totalRuntime { assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) + // lock count for the job and lock info for the pool should both be at 1 while job is running + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) time.Sleep(time.Duration(sleepTime) * time.Millisecond) } @@ -187,6 +202,8 @@ func TestWorkerPoolPauseSingleThreadedJobs(t *testing.T) { // At this point it should all be empty. assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, wp.workerPoolID, job1))) + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), wp.workerPoolID)) } // Test Helpers @@ -203,5 +220,7 @@ func setupTestWorkerPool(pool *redis.Pool, namespace, jobName string, concurrenc wp := NewWorkerPool(TestContext{}, uint(concurrency), namespace, pool) wp.JobWithOptions(jobName, jobOpts, (*TestContext).SleepyJob) + // reset the backoff times to help with testing + sleepBackoffsInMilliseconds = []int64{10, 10, 10, 10, 10} return wp } diff --git a/worker_test.go b/worker_test.go index 9f01c2e7..6ebaf64a 100644 --- a/worker_test.go +++ b/worker_test.go @@ -93,6 +93,7 @@ func TestWorkerInProgress(t *testing.T) { job1 := "job1" deleteQueue(pool, ns, job1) deleteRetryAndDead(pool, ns) + deletePausedAndLockedKeys(ns, job1, pool) jobTypes := make(map[string]*jobType) jobTypes[job1] = &jobType{ @@ -117,6 +118,8 @@ func TestWorkerInProgress(t *testing.T) { time.Sleep(10 * time.Millisecond) assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 1, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) + assert.EqualValues(t, 1, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 1, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), w.poolID)) // nothing in the worker status w.observer.drain() @@ -143,6 +146,7 @@ func TestWorkerRetry(t *testing.T) { job1 := "job1" deleteQueue(pool, ns, job1) deleteRetryAndDead(pool, ns) + deletePausedAndLockedKeys(ns, job1, pool) jobTypes := make(map[string]*jobType) jobTypes[job1] = &jobType{ @@ -167,6 +171,8 @@ func TestWorkerRetry(t *testing.T) { assert.EqualValues(t, 0, zsetSize(pool, redisKeyDead(ns))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) + assert.EqualValues(t, 0, getInt64(pool, redisKeyJobsLock(ns, job1))) + assert.EqualValues(t, 0, hgetInt64(pool, redisKeyJobsLockInfo(ns, job1), w.poolID)) // Get the job on the retry queue ts, job := jobOnZset(pool, redisKeyRetry(ns)) @@ -239,6 +245,7 @@ func TestWorkerDead(t *testing.T) { deleteQueue(pool, ns, job1) deleteQueue(pool, ns, job2) deleteRetryAndDead(pool, ns) + deletePausedAndLockedKeys(ns, job1, pool) jobTypes := make(map[string]*jobType) jobTypes[job1] = &jobType{ @@ -273,8 +280,10 @@ func TestWorkerDead(t *testing.T) { assert.EqualValues(t, 1, zsetSize(pool, redisKeyDead(ns))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) + assert.EqualValues(t, 0, listSize(pool, redisKeyJobs(ns, job1))) + assert.EqualValues(t, 0, listSize(pool, redisKeyJobsInProgress(ns, "1", job1))) - // Get the job on the retry queue + // Get the job on the dead queue ts, job := jobOnZset(pool, redisKeyDead(ns)) assert.True(t, ts <= nowEpochSeconds()) @@ -431,7 +440,7 @@ func zsetSize(pool *redis.Pool, key string) int64 { v, err := redis.Int64(conn.Do("ZCARD", key)) if err != nil { - panic("could not delete retry/dead queue: " + err.Error()) + panic("could not get ZSET size: " + err.Error()) } return v } @@ -442,7 +451,29 @@ func listSize(pool *redis.Pool, key string) int64 { v, err := redis.Int64(conn.Do("LLEN", key)) if err != nil { - panic("could not delete retry/dead queue: " + err.Error()) + panic("could not get list length: " + err.Error()) + } + return v +} + +func getInt64(pool *redis.Pool, key string) int64 { + conn := pool.Get() + defer conn.Close() + + v, err := redis.Int64(conn.Do("GET", key)) + if err != nil { + panic("could not GET int64: " + err.Error()) + } + return v +} + +func hgetInt64(pool *redis.Pool, redisKey, hashKey string) int64 { + conn := pool.Get() + defer conn.Close() + + v, err := redis.Int64(conn.Do("HGET", redisKey, hashKey)) + if err != nil { + panic("could not HGET int64: " + err.Error()) } return v } @@ -453,7 +484,7 @@ func jobOnZset(pool *redis.Pool, key string) (int64, *Job) { v, err := conn.Do("ZRANGE", key, 0, 0, "WITHSCORES") if err != nil { - panic("could not delete retry/dead queue: " + err.Error()) + panic("ZRANGE error: " + err.Error()) } vv := v.([]interface{}) @@ -478,7 +509,7 @@ func jobOnQueue(pool *redis.Pool, key string) *Job { rawJSON, err := redis.Bytes(conn.Do("RPOP", key)) if err != nil { - panic("could not delete retry/dead queue: " + err.Error()) + panic("could RPOP from job queue: " + err.Error()) } job, err := newJob(rawJSON, nil, nil) @@ -545,5 +576,8 @@ func deletePausedAndLockedKeys(namespace, jobName string, pool *redis.Pool) erro if _, err := conn.Do("DEL", redisKeyJobsLock(namespace, jobName)); err != nil { return err } + if _, err := conn.Do("DEL", redisKeyJobsLockInfo(namespace, jobName)); err != nil { + return err + } return nil }