Skip to content

Commit

Permalink
fix(retry): handle pause queue status
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Feb 9, 2023
1 parent 6c6ecb2 commit 9f945d6
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 105 deletions.
42 changes: 0 additions & 42 deletions lib/commands/reprocessJob-4.lua

This file was deleted.

52 changes: 52 additions & 0 deletions lib/commands/reprocessJob-6.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
--[[
Attempts to reprocess a job
Input:
KEYS[1] job key
KEYS[2] job lock key
KEYS[3] job state
KEYS[4] wait key
KEYS[5] meta-pause
KEYS[6] paused key
ARGV[1] job.id,
ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH'
ARGV[3] token
ARGV[4] timestamp
Output:
1 means the operation was a success
0 means the job does not exist
-1 means the job is currently locked and can't be retried.
-2 means the job was not found in the expected set.
]]
local rcall = redis.call;
if (rcall("EXISTS", KEYS[1]) == 1) then
if (rcall("EXISTS", KEYS[2]) == 0) then
rcall("HDEL", KEYS[1], "finishedOn", "processedOn", "failedReason")
rcall("HSET", KEYS[1], "retriedOn", ARGV[4])

if (rcall("ZREM", KEYS[3], ARGV[1]) == 1) then
local target
if rcall("EXISTS", KEYS[5]) ~= 1 then
target = KEYS[4]
else
target = KEYS[6]
end

rcall(ARGV[2], target, ARGV[1])

-- Emit waiting event (wait..ing@token)
rcall("PUBLISH", KEYS[4] .. "ing@" .. ARGV[3], ARGV[1])
return 1
else
return -2
end
else
return -1
end
else
return 0
end
12 changes: 11 additions & 1 deletion lib/commands/retryJob-3.lua → lib/commands/retryJob-5.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
KEYS[1] 'active',
KEYS[2] 'wait'
KEYS[3] jobId
KEYS[4] 'meta-paused'
KEYS[5] 'paused'
ARGV[1] pushCmd
ARGV[2] jobId
Expand All @@ -30,7 +32,15 @@ if redis.call("EXISTS", KEYS[3]) == 1 then
end

redis.call("LREM", KEYS[1], 0, ARGV[2])
redis.call(ARGV[1], KEYS[2], ARGV[2])

local target
if rcall("EXISTS", KEYS[4]) ~= 1 then
target = KEYS[2]
else
target = KEYS[5]
end

redis.call(ARGV[1], target, ARGV[2])

return 0
else
Expand Down
57 changes: 0 additions & 57 deletions lib/commands/retryJobs-3.lua

This file was deleted.

64 changes: 64 additions & 0 deletions lib/commands/retryJobs-5.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
--[[
Attempts to retry all failed jobs
Input:
KEYS[1] base key
KEYS[2] failed state key
KEYS[3] wait state key
KEYS[4] 'meta-paused'
KEYS[5] 'paused'
ARGV[1] count
Output:
1 means the operation is not completed
0 means the operation is completed
]]
local baseKey = KEYS[1]
local maxCount = tonumber(ARGV[1])

local rcall = redis.call;

local function batches(n, batchSize)
local i = 0

return function()
local from = i * batchSize + 1
i = i + 1
if (from <= n) then
local to = math.min(from + batchSize - 1, n)
return from, to
end
end
end

local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

local jobs = getZSetItems(KEYS[2], maxCount)

if (#jobs > 0) then
for i, key in ipairs(jobs) do
local jobKey = baseKey .. key
rcall("HDEL", jobKey, "finishedOn", "processedOn", "failedReason")
end

local target
if rcall("EXISTS", KEYS[4]) ~= 1 then
target = KEYS[3]
else
target = KEYS[5]
end

for from, to in batches(#jobs, 7000) do
rcall("ZREM", KEYS[2], unpack(jobs, from, to))
rcall("LPUSH", target, unpack(jobs, from, to))
end
end

maxCount = maxCount - #jobs

if (maxCount <= 0) then return 1 end

return 0
21 changes: 16 additions & 5 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ const scripts = {
},

retryJobsArgs(queue, count) {
const keys = [queue.toKey(''), queue.toKey('failed'), queue.toKey('wait')];
const keys = [
queue.toKey(''),
queue.toKey('failed'),
queue.toKey('wait'),
queue.toKey('meta-paused'),
queue.toKey('paused')
];

const args = [count];

Expand Down Expand Up @@ -455,9 +461,12 @@ const scripts = {
const queue = job.queue;
const jobId = job.id;

const keys = _.map(['active', 'wait', jobId], name => {
return queue.toKey(name);
});
const keys = _.map(
['active', 'wait', jobId, 'meta-paused', 'paused'],
name => {
return queue.toKey(name);
}
);

const pushCmd = (job.opts.lifo ? 'R' : 'L') + 'PUSH';

Expand Down Expand Up @@ -485,7 +494,9 @@ const scripts = {
queue.toKey(job.id),
queue.toKey(job.id) + ':lock',
queue.toKey(options.state),
queue.toKey('wait')
queue.toKey('wait'),
queue.toKey('meta-paused'),
queue.toKey('paused')
];

const args = [
Expand Down
Loading

0 comments on commit 9f945d6

Please sign in to comment.