Skip to content

Commit

Permalink
fix(flow): allow using removeOnFail and failParentOnFailure in parents (
Browse files Browse the repository at this point in the history
#2947) fixes #2229
  • Loading branch information
roggervalf authored Nov 30, 2024
1 parent d0c473f commit 85f6f6f
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 30 deletions.
12 changes: 10 additions & 2 deletions src/commands/includes/moveParentFromWaitingChildrenToFailed.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
-- Includes
--- @include "moveParentToWaitIfNeeded"
--- @include "removeDeduplicationKeyIfNeeded"
--- @include "removeJobsOnFail"

local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentId, jobIdKey, timestamp)
if rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) == 1 then
rcall("ZADD", parentQueueKey .. ":failed", timestamp, parentId)
local parentQueuePrefix = parentQueueKey .. ":"
local parentFailedKey = parentQueueKey .. ":failed"
rcall("ZADD", parentFailedKey, timestamp, parentId)
local failedReason = "child " .. jobIdKey .. " failed"
rcall("HMSET", parentKey, "failedReason", failedReason, "finishedOn", timestamp)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "failed", "jobId", parentId, "failedReason",
failedReason, "prev", "waiting-children")

local jobAttributes = rcall("HMGET", parentKey, "parent", "deid")
local jobAttributes = rcall("HMGET", parentKey, "parent", "deid", "opts")

removeDeduplicationKeyIfNeeded(parentQueueKey .. ":", jobAttributes[2])

Expand All @@ -41,5 +44,10 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey,
end
end
end

local parentRawOpts = jobAttributes[3]
local parentOpts = cjson.decode(parentRawOpts)

removeJobsOnFail(parentQueuePrefix, parentFailedKey, parentId, parentOpts, timestamp)
end
end
36 changes: 36 additions & 0 deletions src/commands/includes/removeJobsOnFail.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
--[[
Functions to remove jobs when removeOnFail option is provided.
]]

-- Includes
--- @include "removeJob"
--- @include "removeJobsByMaxAge"
--- @include "removeJobsByMaxCount"

local function removeJobsOnFail(queueKeyPrefix, failedKey, jobId, opts, timestamp)
local removeOnFailType = type(opts["removeOnFail"])
if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
failedKey, queueKeyPrefix)
elseif removeOnFailType == "boolean" then
if opts["removeOnFail"] then
removeJob(jobId, false, queueKeyPrefix,
false --[[remove debounce key]])
rcall("ZREM", failedKey, jobId)
end
elseif removeOnFailType ~= "nil" then
local maxAge = opts["removeOnFail"]["age"]
local maxCount = opts["removeOnFail"]["count"]

if maxAge ~= nil then
removeJobsByMaxAge(timestamp, maxAge,
failedKey, queueKeyPrefix)
end

if maxCount ~= nil and maxCount > 0 then
removeJobsByMaxCount(maxCount, failedKey,
queueKeyPrefix)
end
end
end

29 changes: 2 additions & 27 deletions src/commands/moveStalledJobsToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ local rcall = redis.call
--- @include "includes/moveParentFromWaitingChildrenToFailed"
--- @include "includes/moveParentToWaitIfNeeded"
--- @include "includes/removeDeduplicationKeyIfNeeded"
--- @include "includes/removeJob"
--- @include "includes/removeJobsByMaxAge"
--- @include "includes/removeJobsByMaxCount"
--- @include "includes/removeJobsOnFail"
--- @include "includes/trimEvents"

local stalledKey = KEYS[1]
Expand Down Expand Up @@ -86,7 +84,6 @@ if (#stalling > 0) then
local rawOpts = jobAttributes[1]
local rawParentData = jobAttributes[2]
local opts = cjson.decode(rawOpts)
local removeOnFailType = type(opts["removeOnFail"])
rcall("ZADD", failedKey, timestamp, jobId)
removeDeduplicationKeyIfNeeded(queueKeyPrefix, jobAttributes[3])

Expand Down Expand Up @@ -123,29 +120,7 @@ if (#stalling > 0) then
end
end

if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
failedKey, queueKeyPrefix)
elseif removeOnFailType == "boolean" then
if opts["removeOnFail"] then
removeJob(jobId, false, queueKeyPrefix,
false --[[remove debounce key]])
rcall("ZREM", failedKey, jobId)
end
elseif removeOnFailType ~= "nil" then
local maxAge = opts["removeOnFail"]["age"]
local maxCount = opts["removeOnFail"]["count"]

if maxAge ~= nil then
removeJobsByMaxAge(timestamp, maxAge,
failedKey, queueKeyPrefix)
end

if maxCount ~= nil and maxCount > 0 then
removeJobsByMaxCount(maxCount, failedKey,
queueKeyPrefix)
end
end
removeJobsOnFail(queueKeyPrefix, failedKey, jobId, opts, timestamp)

table.insert(failed, jobId)
else
Expand Down
137 changes: 136 additions & 1 deletion tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2239,7 +2239,142 @@ describe('flows', () => {

await removeAllQueueData(new IORedis(redisHost), parentQueueName);
await removeAllQueueData(new IORedis(redisHost), grandChildrenQueueName);
}).timeout(8000);
});

describe('when removeOnFail option is provided', async () => {
it('should remove parent when child is moved to failed', async () => {
const name = 'child-job';

const parentQueueName = `parent-queue-${v4()}`;
const grandChildrenQueueName = `grand-children-queue-${v4()}`;

const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});
const grandChildrenQueue = new Queue(grandChildrenQueueName, {
connection,
prefix,
});
const queueEvents = new QueueEvents(parentQueueName, {
connection,
prefix,
});
await queueEvents.waitUntilReady();

let grandChildrenProcessor,
processedGrandChildren = 0;
const processingChildren = new Promise<void>(resolve => {
grandChildrenProcessor = async () => {
processedGrandChildren++;

if (processedGrandChildren === 2) {
return resolve();
}

await delay(200);

throw new Error('failed');
};
});

const grandChildrenWorker = new Worker(
grandChildrenQueueName,
grandChildrenProcessor,
{ connection, prefix },
);

await grandChildrenWorker.waitUntilReady();

const flow = new FlowProducer({ connection, prefix });
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{
name,
data: { foo: 'bar' },
queueName,
},
{
name,
data: { foo: 'qux' },
queueName,
opts: { failParentOnFailure: true, removeOnFail: true },
children: [
{
name,
data: { foo: 'bar' },
queueName: grandChildrenQueueName,
opts: { failParentOnFailure: true },
},
{
name,
data: { foo: 'baz' },
queueName: grandChildrenQueueName,
},
],
},
],
});

const failed = new Promise<void>(resolve => {
queueEvents.on('failed', async ({ jobId, failedReason, prev }) => {
if (jobId === tree.job.id) {
expect(prev).to.be.equal('waiting-children');
expect(failedReason).to.be.equal(
`child ${prefix}:${queueName}:${tree.children[1].job.id} failed`,
);
resolve();
}
});
});

expect(tree).to.have.property('job');
expect(tree).to.have.property('children');

const { children, job } = tree;
const parentState = await job.getState();

expect(parentState).to.be.eql('waiting-children');

await processingChildren;
await failed;

const { children: grandChildren } = children[1];
const updatedGrandchildJob = await grandChildrenQueue.getJob(
grandChildren[0].job.id,
);
const grandChildState = await updatedGrandchildJob.getState();

expect(grandChildState).to.be.eql('failed');
expect(updatedGrandchildJob.failedReason).to.be.eql('failed');

const updatedParentJob = await queue.getJob(children[1].job.id);
expect(updatedParentJob).to.be.undefined;

const updatedGrandparentJob = await parentQueue.getJob(job.id);
const updatedGrandparentState = await updatedGrandparentJob.getState();

expect(updatedGrandparentState).to.be.eql('failed');
expect(updatedGrandparentJob.failedReason).to.be.eql(
`child ${prefix}:${queueName}:${children[1].job.id} failed`,
);

await parentQueue.close();
await grandChildrenQueue.close();
await grandChildrenWorker.close();
await flow.close();
await queueEvents.close();

await removeAllQueueData(new IORedis(redisHost), parentQueueName);
await removeAllQueueData(
new IORedis(redisHost),
grandChildrenQueueName,
);
});
});

describe('when removeDependencyOnFailure is provided', async () => {
it('moves parent to wait after children fail', async () => {
Expand Down

0 comments on commit 85f6f6f

Please sign in to comment.