diff --git a/package.json b/package.json index 3f9acf4eb6..2eb2f4de7d 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,7 @@ "tsc:all": "tsc && tsc -p tsconfig-cjs.json" }, "dependencies": { - "cron-parser": "^4.6.0", + "cron-parser": "^4.9.0", "ioredis": "^5.4.1", "msgpackr": "^1.11.2", "node-abort-controller": "^3.1.1", diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index e4ee4cbeae..a36359a946 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -1387,13 +1387,11 @@ export class Scripts { */ async moveJobFromActiveToWait(jobId: string, token: string) { const client = await this.queue.client; - const lockKey = `${this.queue.toKey(jobId)}:lock`; const keys: (string | number)[] = [ this.queue.keys.active, this.queue.keys.wait, this.queue.keys.stalled, - lockKey, this.queue.keys.paused, this.queue.keys.meta, this.queue.keys.limiter, @@ -1404,13 +1402,22 @@ export class Scripts { const args = [jobId, token, this.queue.toKey(jobId)]; - const pttl = await this.execCommand( + const result = await this.execCommand( client, 'moveJobFromActiveToWait', keys.concat(args), ); - return pttl < 0 ? 0 : pttl; + if (result < 0) { + throw this.finishedErrors({ + code: result, + jobId, + command: 'moveJobFromActiveToWait', + state: 'active', + }); + } + + return result; } async obliterate(opts: { force: boolean; count: number }): Promise { diff --git a/src/commands/moveJobFromActiveToWait-10.lua b/src/commands/moveJobFromActiveToWait-10.lua deleted file mode 100644 index e90d6d2d10..0000000000 --- a/src/commands/moveJobFromActiveToWait-10.lua +++ /dev/null @@ -1,60 +0,0 @@ ---[[ - Function to move job from active state to wait. - Input: - KEYS[1] active key - KEYS[2] wait key - - KEYS[3] stalled key - KEYS[4] job lock key - KEYS[5] paused key - KEYS[6] meta key - KEYS[7] limiter key - KEYS[8] prioritized key - KEYS[9] marker key - KEYS[10] event key - - ARGV[1] job id - ARGV[2] lock token - ARGV[3] job id key -]] -local rcall = redis.call - --- Includes ---- @include "includes/addJobInTargetList" ---- @include "includes/pushBackJobWithPriority" ---- @include "includes/getOrSetMaxEvents" ---- @include "includes/getTargetQueueList" - -local jobId = ARGV[1] -local token = ARGV[2] -local lockKey = KEYS[4] - -local lockToken = rcall("GET", lockKey) -local pttl = rcall("PTTL", KEYS[7]) -if lockToken == token then - local metaKey = KEYS[6] - local removed = rcall("LREM", KEYS[1], 1, jobId) - if removed > 0 then - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[5]) - - rcall("SREM", KEYS[3], jobId) - - local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0 - - if priority > 0 then - pushBackJobWithPriority(KEYS[8], priority, jobId) - else - addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId) - end - - rcall("DEL", lockKey) - - local maxEvents = getOrSetMaxEvents(metaKey) - - -- Emit waiting event - rcall("XADD", KEYS[10], "MAXLEN", "~", maxEvents, "*", "event", "waiting", - "jobId", jobId) - end -end - -return pttl diff --git a/src/commands/moveJobFromActiveToWait-9.lua b/src/commands/moveJobFromActiveToWait-9.lua new file mode 100644 index 0000000000..6c927a99ec --- /dev/null +++ b/src/commands/moveJobFromActiveToWait-9.lua @@ -0,0 +1,63 @@ +--[[ + Function to move job from active state to wait. + Input: + KEYS[1] active key + KEYS[2] wait key + + KEYS[3] stalled key + KEYS[4] paused key + KEYS[5] meta key + KEYS[6] limiter key + KEYS[7] prioritized key + KEYS[8] marker key + KEYS[9] event key + + ARGV[1] job id + ARGV[2] lock token + ARGV[3] job id key +]] +local rcall = redis.call + +-- Includes +--- @include "includes/addJobInTargetList" +--- @include "includes/pushBackJobWithPriority" +--- @include "includes/getOrSetMaxEvents" +--- @include "includes/getTargetQueueList" +--- @include "includes/removeLock" + +local jobId = ARGV[1] +local token = ARGV[2] +local jobKey = ARGV[3] + +local errorCode = removeLock(jobKey, KEYS[3], token, jobId) +if errorCode < 0 then + return errorCode +end + +local metaKey = KEYS[5] +local removed = rcall("LREM", KEYS[1], 1, jobId) +if removed > 0 then + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4]) + + local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0 + + if priority > 0 then + pushBackJobWithPriority(KEYS[7], priority, jobId) + else + addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId) + end + + local maxEvents = getOrSetMaxEvents(metaKey) + + -- Emit waiting event + rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting", + "jobId", jobId) +end + +local pttl = rcall("PTTL", KEYS[6]) + +if pttl > 0 then + return pttl +else + return 0 +end \ No newline at end of file diff --git a/tests/test_events.ts b/tests/test_events.ts index a1fd791d99..3ec45cc9ce 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -791,9 +791,15 @@ describe('events', function () { ); let deduplicatedCounter = 0; - queueEvents.on('deduplicated', ({ jobId }) => { - deduplicatedCounter++; + const deduplication = new Promise(resolve => { + queueEvents.on('deduplicated', () => { + deduplicatedCounter++; + if (deduplicatedCounter == 2) { + resolve(); + } + }); }); + await job.remove(); await queue.add( @@ -814,6 +820,7 @@ describe('events', function () { { deduplication: { id: 'a1' } }, ); await secondJob.remove(); + await deduplication; expect(deduplicatedCounter).to.be.equal(2); }); diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index 28e4c5c9ad..3a9f8c991d 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -423,6 +423,53 @@ describe('Rate Limiter', function () { await worker.close(); }); + describe('when job does not exist', () => { + it('should fail with job existence error', async () => { + const dynamicLimit = 250; + const duration = 100; + + const worker = new Worker( + queueName, + async job => { + if (job.attemptsStarted === 1) { + await queue.rateLimit(dynamicLimit); + await queue.obliterate({ force: true }); + throw Worker.RateLimitError(); + } + }, + { + autorun: false, + concurrency: 10, + drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker. + limiter: { + max: 2, + duration, + }, + connection, + prefix, + }, + ); + + await worker.waitUntilReady(); + + const failing = new Promise(resolve => { + worker.on('error', err => { + expect(err.message).to.be.equal( + `Missing lock for job ${job.id}. moveJobFromActiveToWait`, + ); + resolve(); + }); + }); + + const job = await queue.add('test', { foo: 'bar' }); + + worker.run(); + + await failing; + await worker.close(); + }).timeout(4000); + }); + describe('when rate limit is too low', () => { it('should move job to wait anyway', async function () { this.timeout(4000); diff --git a/yarn.lock b/yarn.lock index f68b48af72..6141eb518f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2241,7 +2241,7 @@ create-require@^1.1.0: resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333" integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ== -cron-parser@^4.6.0: +cron-parser@^4.9.0: version "4.9.0" resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5" integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==