Skip to content

Commit

Permalink
fix(stalled): consider failParentOnFailure when moving child into fai…
Browse files Browse the repository at this point in the history
…led (#2526) fixes #2464 (python)
  • Loading branch information
roggervalf authored Apr 18, 2024
1 parent 99ab8c5 commit 5e31eb0
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
15 changes: 14 additions & 1 deletion src/commands/moveStalledJobsToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ local rcall = redis.call
--- @include "includes/addJobInTargetList"
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/moveParentFromWaitingChildrenToFailed"
--- @include "includes/removeJob"
--- @include "includes/removeJobsByMaxAge"
--- @include "includes/removeJobsByMaxCount"
Expand Down Expand Up @@ -81,7 +82,9 @@ if (#stalling > 0) then
local stalledCount =
rcall("HINCRBY", jobKey, "stalledCounter", 1)
if (stalledCount > MAX_STALLED_JOB_COUNT) then
local rawOpts = rcall("HGET", jobKey, "opts")
local jobAttributes = rcall("HMGET", jobKey, "opts", "parent")
local rawOpts = jobAttributes[1]
local rawParentData = jobAttributes[2]
local opts = cjson.decode(rawOpts)
local removeOnFailType = type(opts["removeOnFail"])
rcall("ZADD", failedKey, timestamp, jobId)
Expand All @@ -93,6 +96,16 @@ if (#stalling > 0) then
"failed", "jobId", jobId, 'prev', 'active',
'failedReason', failedReason)

if opts['fpof'] and rawParentData ~= false then
local parentData = cjson.decode(rawParentData)
moveParentFromWaitingChildrenToFailed(
parentData['queueKey'],
parentData['queueKey'] .. ':' .. parentData['id'],
parentData['id'],
jobKey,
timestamp
)
end
if removeOnFailType == "number" then
removeJobsByMaxCount(opts["removeOnFail"],
failedKey, queueKeyPrefix)
Expand Down
91 changes: 90 additions & 1 deletion tests/test_stalled_jobs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Queue, Worker, QueueEvents } from '../src/classes';
import { FlowProducer, Queue, Worker, QueueEvents } from '../src/classes';
import { delay, removeAllQueueData } from '../src/utils';
import { default as IORedis } from 'ioredis';
import { after } from 'lodash';
Expand Down Expand Up @@ -378,6 +378,95 @@ describe('stalled jobs', function () {
await queueEvents.close();
});

describe('when failParentOnFailure is provided as true', function () {
it('should move parent to failed when child is moved to failed', async function () {
this.timeout(6000);
const concurrency = 4;
const parentQueueName = `parent-queue-${v4()}`;

const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});

const flow = new FlowProducer({ connection, prefix });

const worker = new Worker(
queueName,
async () => {
return delay(10000);
},
{
connection,
prefix,
lockDuration: 1000,
stalledInterval: 100,
maxStalledCount: 0,
concurrency,
},
);

const allActive = new Promise(resolve => {
worker.on('active', after(concurrency, resolve));
});

await worker.waitUntilReady();

const { job: parent } = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{
name: 'test',
data: { foo: 'bar' },
queueName,
opts: { failParentOnFailure: true },
},
],
});

const jobs = Array.from(Array(3).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue.addBulk(jobs);
await allActive;
await worker.close(true);

const worker2 = new Worker(queueName, async job => {}, {
connection,
prefix,
stalledInterval: 100,
maxStalledCount: 0,
concurrency,
});

const errorMessage = 'job stalled more than allowable limit';
const allFailed = new Promise<void>(resolve => {
worker2.on(
'failed',
after(concurrency, async (job, failedReason, prev) => {
const parentState = await parent.getState();

expect(parentState).to.be.equal('failed');
expect(prev).to.be.equal('active');
expect(failedReason.message).to.be.equal(errorMessage);
resolve();
}),
);
});

await allFailed;

await worker2.close();
await parentQueue.close();
await flow.close();
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});
});

describe('when removeOnFail is provided as a number', function () {
it('keeps the specified number of jobs in failed', async function () {
this.timeout(6000);
Expand Down

0 comments on commit 5e31eb0

Please sign in to comment.