Skip to content

Commit

Permalink
fix(repeatable): consider removing legacy repeatable job (#2658) fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jul 18, 2024
1 parent 9d8f874 commit a6764ae
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 12 deletions.
19 changes: 11 additions & 8 deletions src/classes/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ export class Repeat extends QueueBase {
repeatOpts.jobId = opts.jobId;
}

const qualifiedName = getRepeatCocatOptions(name, repeatOpts);
const repeatConcatOptions = getRepeatConcatOptions(name, repeatOpts);

const repeatJobKey = await this.scripts.addRepeatableJob(
opts.repeat.key ?? this.hash(qualifiedName),
opts.repeat.key ?? this.hash(repeatConcatOptions),
nextMillis,
{
name,
Expand All @@ -89,7 +89,7 @@ export class Repeat extends QueueBase {
pattern: repeatOpts.pattern,
every: repeatOpts.every,
},
qualifiedName,
repeatConcatOptions,
skipCheckExists,
);

Expand Down Expand Up @@ -148,19 +148,22 @@ export class Repeat extends QueueBase {
repeat: RepeatOptions,
jobId?: string,
): Promise<number> {
const qualifiedName = getRepeatCocatOptions(name, { ...repeat, jobId });
const repeatJobKey = repeat.key ?? this.hash(qualifiedName);
const repeatConcatOptions = getRepeatConcatOptions(name, {
...repeat,
jobId,
});
const repeatJobKey = repeat.key ?? this.hash(repeatConcatOptions);
const legacyRepeatJobId = this.getRepeatJobId({
name,
nextMillis: '',
namespace: this.hash(qualifiedName),
namespace: this.hash(repeatConcatOptions),
jobId: jobId ?? repeat.jobId,
key: repeat.key,
});

return this.scripts.removeRepeatable(
legacyRepeatJobId,
qualifiedName,
repeatConcatOptions,
repeatJobKey,
);
}
Expand Down Expand Up @@ -273,7 +276,7 @@ export class Repeat extends QueueBase {
}
}

function getRepeatCocatOptions(name: string, repeat: RepeatOptions) {
function getRepeatConcatOptions(name: string, repeat: RepeatOptions) {
const endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : '';
const tz = repeat.tz || '';
const pattern = repeat.pattern;
Expand Down
17 changes: 13 additions & 4 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ export class Scripts {

private removeRepeatableArgs(
legacyRepeatJobId: string,
qualifiedName: string,
repeatConcatOptions: string,
repeatJobKey: string,
): string[] {
const queueKeys = this.queue.keys;
Expand All @@ -311,23 +311,32 @@ export class Scripts {

const args = [
legacyRepeatJobId,
qualifiedName,
this.getRepeatConcatOptions(repeatConcatOptions, repeatJobKey),
repeatJobKey,
queueKeys[''],
];

return keys.concat(args);
}

// TODO: remove this check in next breaking change
getRepeatConcatOptions(repeatConcatOptions: string, repeatJobKey: string) {
if (repeatJobKey && repeatJobKey.split(':').length > 2) {
return repeatJobKey;
}

return repeatConcatOptions;
}

async removeRepeatable(
legacyRepeatJobId: string,
qualifiedName: string,
repeatConcatOptions: string,
repeatJobKey: string,
): Promise<number> {
const client = await this.queue.client;
const args = this.removeRepeatableArgs(
legacyRepeatJobId,
qualifiedName,
repeatConcatOptions,
repeatJobKey,
);

Expand Down
82 changes: 82 additions & 0 deletions tests/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,88 @@ describe('repeat', function () {
expect(repeatableJobsAfterRemove).to.have.length(0);
});

describe('when legacy repeatable format is present', function () {
it('should be able to remove legacy repeatable jobs', async () => {
const client = await queue.client;
await client.hmset(
`${prefix}:${queue.name}:repeat:839d4be40c8b2f30fca6f860d0cf76f7:1735711200000`,
'priority',
0,
'delay',
14524061394,
'data',
'{}',
'timestamp',
1721187138606,
'rjk',
'remove::::* 1 * 1 *',
'name',
'remove',
);
await client.zadd(
`${prefix}:${queue.name}:repeat`,
1735711200000,
'remove::::* 1 * 1 *',
);
await client.zadd(
`${prefix}:${queue.name}:delayed`,
1735711200000,
'repeat:839d4be40c8b2f30fca6f860d0cf76f7:1735711200000',
);

const repeat = { pattern: '* 1 * 1 *' };

const repeatableJobs = await queue.getRepeatableJobs();
expect(repeatableJobs).to.have.length(1);
const removed = await queue.removeRepeatable('remove', repeat);

const delayedCount = await queue.getJobCountByTypes('delayed');
expect(delayedCount).to.be.equal(0);
expect(removed).to.be.true;
const repeatableJobsAfterRemove = await queue.getRepeatableJobs();
expect(repeatableJobsAfterRemove).to.have.length(0);
});

it('should be able to remove legacy repeatable jobs by key', async () => {
const client = await queue.client;
await client.hmset(
`${prefix}:${queue.name}:repeat:839d4be40c8b2f30fca6f860d0cf76f7:1735711200000`,
'priority',
0,
'delay',
14524061394,
'data',
'{}',
'timestamp',
1721187138606,
'rjk',
'remove::::* 1 * 1 *',
'name',
'remove',
);
await client.zadd(
`${prefix}:${queue.name}:repeat`,
1735711200000,
'remove::::* 1 * 1 *',
);
await client.zadd(
`${prefix}:${queue.name}:delayed`,
1735711200000,
'repeat:839d4be40c8b2f30fca6f860d0cf76f7:1735711200000',
);

const repeatableJobs = await queue.getRepeatableJobs();
expect(repeatableJobs).to.have.length(1);
const removed = await queue.removeRepeatableByKey('remove::::* 1 * 1 *');

const delayedCount = await queue.getJobCountByTypes('delayed');
expect(delayedCount).to.be.equal(0);
expect(removed).to.be.true;
const repeatableJobsAfterRemove = await queue.getRepeatableJobs();
expect(repeatableJobsAfterRemove).to.have.length(0);
});
});

describe('when repeatable job does not exist', function () {
it('returns false', async () => {
const repeat = { pattern: '*/2 * * * * *' };
Expand Down

0 comments on commit a6764ae

Please sign in to comment.