Skip to content

Commit

Permalink
perf(change-delay): add delay marker when needed (#2411)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Feb 5, 2024
1 parent b3ab285 commit 8b62d28
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 63 deletions.
11 changes: 0 additions & 11 deletions docs/gitbook/python/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
## v2.2.1 (2024-01-16)
### Fix
* **retry-jobs:** Add marker when needed ([#2374](https://github.com/taskforcesh/bullmq/issues/2374)) ([`1813d5f`](https://github.com/taskforcesh/bullmq/commit/1813d5fa12b7db69ee6c8c09273729cda8e3e3b5))
* **security:** Upgrade msgpackr https://github.com/advisories/GHSA-7hpj-7hhx-2fgx ([`7ae0953`](https://github.com/taskforcesh/bullmq/commit/7ae095357fddbdaacc286cbe5782946b95160d55))

### Documentation
* **changelog:** Split changelog ([#2381](https://github.com/taskforcesh/bullmq/issues/2381)) ([`368b5a1`](https://github.com/taskforcesh/bullmq/commit/368b5a104b632fa181b2c19cc5e3530387f38ae4))
* **summary:** Add remove dependency section ([#2378](https://github.com/taskforcesh/bullmq/issues/2378)) ([`03e1451`](https://github.com/taskforcesh/bullmq/commit/03e1451f54edf56f11f9e74f9b4095efe522bb97))

## v2.2.0 (2024-01-14)
### Feature
Expand All @@ -33,12 +28,6 @@
* **redis:** Upgrade to v5 [python] ([#2364](https://github.com/taskforcesh/bullmq/issues/2364)) ([`d5113c8`](https://github.com/taskforcesh/bullmq/commit/d5113c88ad108b281b292e2890e0eef3be41c8fb))
* **worker:** Worker can be closed if Redis is down ([#2350](https://github.com/taskforcesh/bullmq/issues/2350)) ([`888dcc2`](https://github.com/taskforcesh/bullmq/commit/888dcc2dd40571e05fe1f4a5c81161ed062f4542))

### Documentation
* **sandbox:** Add URL support section (#2373) ref #2326 #2372 ([`3a38a47`](https://github.com/taskforcesh/bullmq/commit/3a38a471cbeda70ac9d4d9744b199090dc6f0a12))
* **bullmq-pro:** Update changelog to v6.9.0 ([#2359](https://github.com/taskforcesh/bullmq/issues/2359)) ([`66d9469`](https://github.com/taskforcesh/bullmq/commit/66d9469b3b40b0b5b43601308fa063707cb41b91))
* **workers:** Add auto removal jobs section ([#2355](https://github.com/taskforcesh/bullmq/issues/2355)) ([`dddd2c8`](https://github.com/taskforcesh/bullmq/commit/dddd2c89132c0b3216a8788e1672e4433b98a436))
* **changelog:** Format v2 changes docs [python] ([#2353](https://github.com/taskforcesh/bullmq/issues/2353)) ([`97837f2`](https://github.com/taskforcesh/bullmq/commit/97837f22ec4abbe20c4026221801f51156f4861b))

## v2.0.0 (2023-12-23)
### Feature
* **job:** Add isActive method [python] ([#2352](https://github.com/taskforcesh/bullmq/issues/2352)) ([`afb5e31`](https://github.com/taskforcesh/bullmq/commit/afb5e31484ed2e5a1c381c732321225c0a8b78ff))
Expand Down
17 changes: 12 additions & 5 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,12 +622,19 @@ export class Scripts {
timestamp = timestamp * 0x1000 + (+jobId & 0xfff);
}

const keys: (string | number)[] = ['delayed', jobId].map(name => {
return this.queue.toKey(name);
});
keys.push.apply(keys, [this.queue.keys.events]);
const keys: (string | number)[] = [
this.queue.keys.delayed,
this.queue.keys.meta,
this.queue.keys.marker,
this.queue.keys.events,
];

return keys.concat([delay, JSON.stringify(timestamp), jobId]);
return keys.concat([
delay,
JSON.stringify(timestamp),
jobId,
this.queue.toKey(jobId),
]);
}

async changePriority(
Expand Down
41 changes: 0 additions & 41 deletions src/commands/changeDelay-3.lua

This file was deleted.

57 changes: 57 additions & 0 deletions src/commands/changeDelay-4.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
--[[
Change job delay when it is in delayed set.
Input:
KEYS[1] delayed key
KEYS[2] meta key
KEYS[3] marker key
KEYS[4] events stream
ARGV[1] delay
ARGV[2] delayedTimestamp
ARGV[3] the id of the job
ARGV[4] job key
Output:
0 - OK
-1 - Missing job.
-3 - Job not in delayed set.
Events:
- delayed key.
]]
local rcall = redis.call

-- Includes
--- @include "includes/addDelayMarkerIfNeeded"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePaused"

if rcall("EXISTS", ARGV[4]) == 1 then
local jobId = ARGV[3]
local score = tonumber(ARGV[2])
local delayedTimestamp = (score / 0x1000)

local numRemovedElements = rcall("ZREM", KEYS[1], jobId)

if numRemovedElements < 1 then
return -3
end

rcall("HSET", ARGV[4], "delay", tonumber(ARGV[1]))
rcall("ZADD", KEYS[1], score, jobId)

local maxEvents = getOrSetMaxEvents(KEYS[2])

rcall("XADD", KEYS[4], "MAXLEN", "~", maxEvents, "*", "event", "delayed",
"jobId", jobId, "delay", delayedTimestamp)

-- mark that a delayed job is available
local isPaused = isQueuePaused(KEYS[2])
if not isPaused then
addDelayMarkerIfNeeded(KEYS[3], KEYS[1])
end

return 0
else
return -1
end
6 changes: 4 additions & 2 deletions src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
--- @include "getTargetQueueList"

local function moveParentToWait(parentPrefix, parentId, emitEvent)
local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait", parentPrefix .. "paused")
local parentTarget, isPaused = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "wait",
parentPrefix .. "paused")
addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPaused, parentId)

if emitEvent then
Expand Down Expand Up @@ -48,7 +49,8 @@ local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey)
end
else
local missedParentKey = rcall("HGET", jobKey, "parentKey")
if( (type(missedParentKey) == "string") and missedParentKey ~= "" and (rcall("EXISTS", missedParentKey) == 1)) then
if( (type(missedParentKey) == "string") and missedParentKey ~= ""
and (rcall("EXISTS", missedParentKey) == 1)) then
local parentDependenciesKey = missedParentKey .. ":dependencies"
local result = rcall("SREM", parentDependenciesKey, jobKey)
if result > 0 then
Expand Down
8 changes: 4 additions & 4 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ describe('Job', function () {
const completing = new Promise<void>(resolve => {
worker.on('completed', async () => {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte(4000);
expect(timeDiff).to.be.gte(2000);
resolve();
});
});
Expand All @@ -856,17 +856,17 @@ describe('Job', function () {
queue,
'test',
{ foo: 'bar' },
{ delay: 2000 },
{ delay: 8000 },
);

const isDelayed = await job.isDelayed();
expect(isDelayed).to.be.equal(true);

await job.changeDelay(4000);
await job.changeDelay(2000);

const isDelayedAfterChangeDelay = await job.isDelayed();
expect(isDelayedAfterChangeDelay).to.be.equal(true);
expect(job.delay).to.be.equal(4000);
expect(job.delay).to.be.equal(2000);

await completing;

Expand Down
2 changes: 2 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1505,10 +1505,12 @@ describe('workers', function () {
connection,
prefix,
});
await worker1.waitUntilReady();
const worker2 = new Worker(queueName2, null, {
connection,
prefix,
});
await worker2.waitUntilReady();

try {
// There is no point into checking the ready status after closing
Expand Down

0 comments on commit 8b62d28

Please sign in to comment.