Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dynamic-rate-limit): validate job lock cases #2975

Merged
merged 4 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"tsc:all": "tsc && tsc -p tsconfig-cjs.json"
},
"dependencies": {
"cron-parser": "^4.6.0",
"cron-parser": "^4.9.0",
"ioredis": "^5.4.1",
"msgpackr": "^1.11.2",
"node-abort-controller": "^3.1.1",
Expand Down
15 changes: 11 additions & 4 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1387,13 +1387,11 @@ export class Scripts {
*/
async moveJobFromActiveToWait(jobId: string, token: string) {
const client = await this.queue.client;
const lockKey = `${this.queue.toKey(jobId)}:lock`;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we generate the key here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeLock include concats lock suffix to jobKey that is passed as an argv, so no need to pass lockKey


const keys: (string | number)[] = [
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.stalled,
lockKey,
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.limiter,
Expand All @@ -1404,13 +1402,22 @@ export class Scripts {

const args = [jobId, token, this.queue.toKey(jobId)];

const pttl = await this.execCommand(
const result = await this.execCommand(
client,
'moveJobFromActiveToWait',
keys.concat(args),
);

return pttl < 0 ? 0 : pttl;
if (result < 0) {
throw this.finishedErrors({
code: result,
jobId,
command: 'moveJobFromActiveToWait',
state: 'active',
});
}

return result;
}

async obliterate(opts: { force: boolean; count: number }): Promise<number> {
Expand Down
60 changes: 0 additions & 60 deletions src/commands/moveJobFromActiveToWait-10.lua

This file was deleted.

63 changes: 63 additions & 0 deletions src/commands/moveJobFromActiveToWait-9.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
--[[
Function to move job from active state to wait.
Input:
KEYS[1] active key
KEYS[2] wait key

KEYS[3] stalled key
KEYS[4] paused key
KEYS[5] meta key
KEYS[6] limiter key
KEYS[7] prioritized key
KEYS[8] marker key
KEYS[9] event key

ARGV[1] job id
ARGV[2] lock token
ARGV[3] job id key
]]
local rcall = redis.call

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/pushBackJobWithPriority"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/removeLock"

local jobId = ARGV[1]
local token = ARGV[2]
local jobKey = ARGV[3]

local errorCode = removeLock(jobKey, KEYS[3], token, jobId)
if errorCode < 0 then
return errorCode
end

local metaKey = KEYS[5]
local removed = rcall("LREM", KEYS[1], 1, jobId)
if removed > 0 then
local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4])

local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0

if priority > 0 then
pushBackJobWithPriority(KEYS[7], priority, jobId)
else
addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId)
end

local maxEvents = getOrSetMaxEvents(metaKey)

-- Emit waiting event
rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
"jobId", jobId)
end

local pttl = rcall("PTTL", KEYS[6])

if pttl > 0 then
return pttl
else
return 0
end
11 changes: 9 additions & 2 deletions tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,9 +791,15 @@ describe('events', function () {
);

let deduplicatedCounter = 0;
queueEvents.on('deduplicated', ({ jobId }) => {
deduplicatedCounter++;
const deduplication = new Promise<void>(resolve => {
queueEvents.on('deduplicated', () => {
deduplicatedCounter++;
if (deduplicatedCounter == 2) {
resolve();
}
});
});

await job.remove();

await queue.add(
Expand All @@ -814,6 +820,7 @@ describe('events', function () {
{ deduplication: { id: 'a1' } },
);
await secondJob.remove();
await deduplication;

expect(deduplicatedCounter).to.be.equal(2);
});
Expand Down
47 changes: 47 additions & 0 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,53 @@ describe('Rate Limiter', function () {
await worker.close();
});

describe('when job does not exist', () => {
it('should fail with job existence error', async () => {
const dynamicLimit = 250;
const duration = 100;

const worker = new Worker(
queueName,
async job => {
if (job.attemptsStarted === 1) {
await queue.rateLimit(dynamicLimit);
await queue.obliterate({ force: true });
throw Worker.RateLimitError();
}
},
{
autorun: false,
concurrency: 10,
drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker.
limiter: {
max: 2,
duration,
},
connection,
prefix,
},
);

await worker.waitUntilReady();

const failing = new Promise<void>(resolve => {
worker.on('error', err => {
expect(err.message).to.be.equal(
`Missing lock for job ${job.id}. moveJobFromActiveToWait`,
);
resolve();
});
});

const job = await queue.add('test', { foo: 'bar' });

worker.run();

await failing;
await worker.close();
}).timeout(4000);
});

describe('when rate limit is too low', () => {
it('should move job to wait anyway', async function () {
this.timeout(4000);
Expand Down
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,7 @@ create-require@^1.1.0:
resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333"
integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==

cron-parser@^4.6.0:
cron-parser@^4.9.0:
version "4.9.0"
resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5"
integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==
Expand Down
Loading