Skip to content

Commit

Permalink
fix(job): consider changing priority to 0 (#2599)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jun 28, 2024
1 parent 3fd6bca commit 4dba122
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 90 deletions.
38 changes: 24 additions & 14 deletions src/commands/changePriority-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,39 @@ local priority = tonumber(ARGV[1])
local rcall = redis.call

-- Includes
--- @include "includes/isQueuePaused"
--- @include "includes/addJobInTargetList"
--- @include "includes/addJobWithPriority"
--- @include "includes/getTargetQueueList"
--- @include "includes/pushBackJobWithPriority"

local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey,
priorityCounter, lifo, priority, jobId, paused)
if priority == 0 then
local pushCmd = lifo and 'RPUSH' or 'LPUSH'
addJobInTargetList(targetKey, markerKey, pushCmd, paused, jobId)
else
if lifo then
pushBackJobWithPriority(prioritizedKey, priority, jobId)
else
addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
priorityCounter, paused)
end
end
end

if rcall("EXISTS", jobKey) == 1 then
local metaKey = KEYS[3]
local isPaused = isQueuePaused(metaKey)
local target, isPaused = getTargetQueueList(metaKey, KEYS[1], KEYS[2])
local markerKey = KEYS[6]
local prioritizedKey = KEYS[4]

-- Re-add with the new priority
if rcall("ZREM", KEYS[4], jobId) > 0 then
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, KEYS[5],
isPaused)
-- If the new priority is 0, then just leave the job where it is in the wait list.
elseif priority > 0 then
-- Job is already in the wait list, we need to re-add it with the new priority.
local target = isPaused and KEYS[2] or KEYS[1]

local numRemovedElements = rcall("LREM", target, -1, jobId)
if numRemovedElements > 0 then
addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
KEYS[5], isPaused)
end
reAddJobWithNewPriority( prioritizedKey, markerKey, target,
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused)
elseif rcall("LREM", target, -1, jobId) > 0 then
reAddJobWithNewPriority( prioritizedKey, markerKey, target,
KEYS[5], ARGV[4] == '1', priority, jobId, isPaused)
end

rcall("HSET", jobKey, "priority", priority)
Expand Down
201 changes: 125 additions & 76 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -985,46 +985,96 @@ describe('Job', function () {
});

describe('.changePriority', () => {
it('can change priority of a job', async function () {
await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 });
const job = await Job.create(
queue,
'test2',
{ foo: 'bar' },
{ priority: 16 },
);
describe('when job is in wait state', () => {
describe('when lifo option is provided as true', () => {
it('moves job to the head of wait list', async () => {
await queue.pause();
await Job.create(queue, 'test1', { foo: 'bar' });
const job = await Job.create(
queue,
'test2',
{ foo: 'bar' },
{ priority: 16 },
);

await job.changePriority({
priority: 1,
});
await job.changePriority({
priority: 0,
lifo: true,
});

const worker = new Worker(
queueName,
async () => {
await delay(20);
},
{ connection, prefix },
);
await worker.waitUntilReady();
const worker = new Worker(
queueName,
async () => {
await delay(20);
},
{ connection, prefix },
);
await worker.waitUntilReady();

const completing = new Promise<void>(resolve => {
worker.on(
'completed',
after(2, job => {
expect(job.name).to.be.eql('test1');
resolve();
}),
);
});

const completing = new Promise<void>(resolve => {
worker.on(
'completed',
after(2, job => {
expect(job.name).to.be.eql('test1');
resolve();
}),
);
await queue.resume();

await completing;

await worker.close();
});
});

await completing;
describe('when lifo option is provided as false', () => {
it('moves job to the tail of wait list and has more priority', async () => {
await queue.pause();
const job = await Job.create(
queue,
'test1',
{ foo: 'bar' },
{ priority: 8 },
);
await Job.create(queue, 'test2', { foo: 'bar' });

await worker.close();
await job.changePriority({
priority: 0,
lifo: false,
});

const worker = new Worker(
queueName,
async () => {
await delay(20);
},
{ connection, prefix },
);
await worker.waitUntilReady();

const completing = new Promise<void>(resolve => {
worker.on(
'completed',
after(2, job => {
expect(job.name).to.be.eql('test1');
resolve();
}),
);
});

await queue.resume();

await completing;

await worker.close();
});
});
});

describe('when queue is paused', () => {
it('respects new priority', async () => {
await queue.pause();
describe('when job is in prioritized state', () => {
it('can change priority of a job', async function () {
await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 });
const job = await Job.create(
queue,
Expand Down Expand Up @@ -1056,69 +1106,68 @@ describe('Job', function () {
);
});

await queue.resume();

await completing;

await worker.close();
});
});

describe('when lifo option is provided as true', () => {
it('moves job to the head of wait list', async () => {
await queue.pause();
await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 });
const job = await Job.create(
queue,
'test2',
{ foo: 'bar' },
{ priority: 16 },
);

await job.changePriority({
lifo: true,
});
describe('when lifo option is provided as true', () => {
it('moves job to the head of prioritized jobs with same priority', async () => {
await queue.pause();
await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 16 });
const job = await Job.create(
queue,
'test2',
{ foo: 'bar' },
{ priority: 16 },
);

const worker = new Worker(
queueName,
async () => {
await delay(20);
},
{ connection, prefix },
);
await worker.waitUntilReady();
await job.changePriority({
priority: 16,
lifo: true,
});

const completing = new Promise<void>(resolve => {
worker.on(
'completed',
after(2, job => {
expect(job.name).to.be.eql('test1');
resolve();
}),
const worker = new Worker(
queueName,
async () => {
await delay(20);
},
{ connection, prefix },
);
});
await worker.waitUntilReady();

const completing = new Promise<void>(resolve => {
worker.on(
'completed',
after(2, job => {
expect(job.name).to.be.eql('test1');
resolve();
}),
);
});

await queue.resume();
await queue.resume();

await completing;
await completing;

await worker.close();
await worker.close();
});
});
});

describe('when lifo option is provided as false', () => {
it('moves job to the tail of wait list and has more priority', async () => {
describe('when queue is paused', () => {
it('respects new priority', async () => {
await queue.pause();
await Job.create(queue, 'test1', { foo: 'bar' }, { priority: 8 });
const job = await Job.create(
queue,
'test1',
'test2',
{ foo: 'bar' },
{ priority: 8 },
{ priority: 16 },
);
await Job.create(queue, 'test2', { foo: 'bar' }, { priority: 16 });

await job.changePriority({
lifo: false,
priority: 1,
});

const worker = new Worker(
Expand All @@ -1134,7 +1183,7 @@ describe('Job', function () {
worker.on(
'completed',
after(2, job => {
expect(job.name).to.be.eql('test2');
expect(job.name).to.be.eql('test1');
resolve();
}),
);
Expand All @@ -1148,7 +1197,7 @@ describe('Job', function () {
});
});

describe('when job is not in wait state', () => {
describe('when job is not in wait or prioritized state', () => {
it('does not add a record in priority zset', async () => {
const job = await Job.create(
queue,
Expand Down

0 comments on commit 4dba122

Please sign in to comment.