Skip to content

Commit

Permalink
fix(stalled): consider adding marker when moving job back to wait (#2384
Browse files Browse the repository at this point in the history
)
  • Loading branch information
roggervalf authored Jan 20, 2024
1 parent b289819 commit 4914df8
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 29 deletions.
4 changes: 2 additions & 2 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-7.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
Expand Down Expand Up @@ -554,7 +554,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None

def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int):
keys = self.getKeys(['stalled', 'wait', 'active', 'failed',
'stalled-check', 'meta', 'paused', 'events'])
'stalled-check', 'meta', 'paused', 'marker', 'events'])
args = [maxStalledCount, self.keys[''], round(
time.time() * 1000), stalledInterval]
return self.commands["moveStalledJobsToWait"](keys, args)
Expand Down
1 change: 1 addition & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ export class Scripts {
this.queue.keys['stalled-check'],
this.queue.keys.meta,
this.queue.keys.paused,
this.queue.keys.marker,
this.queue.keys.events,
];
const args = [
Expand Down
8 changes: 2 additions & 6 deletions src/commands/addStandardJob-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ local parent = args[8]
local parentData

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/storeJob"
Expand Down Expand Up @@ -100,14 +101,9 @@ storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,

local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[2])

if not paused then
-- mark that a job is available
rcall("ZADD", KEYS[7], 0, "0")
end

-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
rcall(pushCmd, target, jobId)
addJobInTargetList(target, KEYS[7], pushCmd, paused, jobId)

-- Emit waiting event
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
Expand Down
9 changes: 9 additions & 0 deletions src/commands/includes/addBaseMarkerIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
--[[
Add marker if needed when a job is available.
]]

local function addBaseMarkerIfNeeded(markerKey, isPaused)
if not isPaused then
rcall("ZADD", markerKey, 0, "0")
end
end
11 changes: 11 additions & 0 deletions src/commands/includes/addJobInTargetList.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
--[[
Function to add job in target list and add marker if needed.
]]

-- Includes
--- @include "addBaseMarkerIfNeeded"

local function addJobInTargetList(targetKey, markerKey, pushCmd, isPaused, jobId)
rcall(pushCmd, targetKey, jobId)
addBaseMarkerIfNeeded(markerKey, isPaused)
end
7 changes: 4 additions & 3 deletions src/commands/includes/addJobWithPriority.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
Function to add job considering priority.
]]

-- Includes
--- @include "addBaseMarkerIfNeeded"

local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused)
local prioCounter = rcall("INCR", priorityCounterKey)
local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff)
rcall("ZADD", prioritizedKey, score, jobId)
if not isPaused then
rcall("ZADD", markerKey, 0, "0")
end
addBaseMarkerIfNeeded(markerKey, isPaused)
end
7 changes: 4 additions & 3 deletions src/commands/includes/moveParentToWaitIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

-- Includes
--- @include "addDelayMarkerIfNeeded"
--- @include "addJobInTargetList"
--- @include "addJobWithPriority"
--- @include "isQueuePaused"
--- @include "getTargetQueueList"

local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,
parentKey, parentId, timestamp)
local isParentActive = rcall("ZSCORE",
Expand All @@ -33,11 +35,10 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,
addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
else
if priority == 0 then
local parentTarget, _paused =
local parentTarget, isParentPaused =
getTargetQueueList(parentMetaKey, parentWaitKey,
parentPausedKey)
rcall("RPUSH", parentTarget, parentId)
rcall("ZADD", parentMarkerKey, 0, "0")
addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPaused, parentId)
else
local isPaused = isQueuePaused(parentMetaKey)
addJobWithPriority(parentMarkerKey,
Expand Down
6 changes: 2 additions & 4 deletions src/commands/includes/promoteDelayedJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
]]

-- Includes
--- @include "addJobInTargetList"
--- @include "addJobWithPriority"

-- Try to get as much as 1000 jobs at once
Expand All @@ -24,10 +25,7 @@ local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedK

if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", targetKey, jobId)
if not isPaused then
rcall("ZADD", markerKey, 0, "0")
end
addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId)
else
addJobWithPriority(markerKey, prioritizedKey, priority,
jobId, priorityCounterKey, isPaused)
Expand Down
5 changes: 2 additions & 3 deletions src/commands/moveJobsToWait-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ local timestamp = tonumber(ARGV[2])
local rcall = redis.call;

-- Includes
--- @include "includes/addBaseMarkerIfNeeded"
--- @include "includes/batches"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
Expand Down Expand Up @@ -61,9 +62,7 @@ if (#jobs > 0) then
rcall("LPUSH", target, unpack(jobs, from, to))
end

if not paused then
rcall("ZADD", KEYS[7], 0, "0")
end
addBaseMarkerIfNeeded(KEYS[7], paused)
end

maxCount = maxCount - #jobs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
--[[
Move stalled jobs to wait.
Input:
KEYS[1] 'stalled' (SET)
KEYS[2] 'wait', (LIST)
Expand All @@ -8,19 +9,22 @@
KEYS[5] 'stalled-check', (KEY)
KEYS[6] 'meta', (KEY)
KEYS[7] 'paused', (LIST)
KEYS[8] 'event stream' (STREAM)
KEYS[8] 'marker'
KEYS[9] 'event stream' (STREAM)
ARGV[1] Max stalled job count
ARGV[2] queue.toKey('')
ARGV[3] timestamp
ARGV[4] max check time
Events:
'stalled' with stalled job id.
]]

local rcall = redis.call

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/removeJob"
Expand All @@ -35,7 +39,8 @@ local failedKey = KEYS[4]
local stalledCheckKey = KEYS[5]
local metaKey = KEYS[6]
local pausedKey = KEYS[7]
local eventStreamKey = KEYS[8]
local markerKey = KEYS[8]
local eventStreamKey = KEYS[9]
local maxStalledJobCount = ARGV[1]
local queueKeyPrefix = ARGV[2]
local timestamp = ARGV[3]
Expand Down Expand Up @@ -113,11 +118,12 @@ if (#stalling > 0) then

table.insert(failed, jobId)
else
local target =
local target, isPaused=
getTargetQueueList(metaKey, waitKey, pausedKey)

-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
rcall("RPUSH", target, jobId)
addJobInTargetList(target, markerKey, "RPUSH", isPaused, jobId)

rcall("XADD", eventStreamKey, "*", "event",
"waiting", "jobId", jobId, 'prev', 'active')

Expand Down
4 changes: 2 additions & 2 deletions src/commands/promote-8.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ local rcall = redis.call
local jobId = ARGV[2]

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"

Expand All @@ -42,8 +43,7 @@ if rcall("ZREM", KEYS[1], jobId) == 1 then

if priority == 0 then
-- LIFO or FIFO
rcall("LPUSH", target, jobId)
if not paused then rcall("ZADD", KEYS[8], 0, "0") end
addJobInTargetList(target, KEYS[8], "LPUSH", paused, jobId)
else
addJobWithPriority(KEYS[8], KEYS[5], priority, jobId, KEYS[6], paused)
end
Expand Down
4 changes: 2 additions & 2 deletions tests/test_stalled_jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ describe('stalled jobs', function () {
});

it('process stalled jobs when starting a queue', async function () {
this.timeout(10000);
this.timeout(5000);

const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
Expand Down Expand Up @@ -298,7 +298,7 @@ describe('stalled jobs', function () {
});

it('moves jobs to failed with maxStalledCount > 1', async function () {
this.timeout(60000);
this.timeout(8000);

const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();
Expand Down

0 comments on commit 4914df8

Please sign in to comment.