Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repeatable): new repeatables structure #2617

Merged
merged 10 commits into from
Jul 16, 2024
41 changes: 41 additions & 0 deletions docs/gitbook/guide/jobs/repeatable.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,47 @@ As you may notice, the repeat strategy setting should be provided in `Queue` and
The repeat strategy function receives an optional `jobName` third parameter.
{% endhint %}

### Custom Repeatable Key

By default, we are generating repeatable keys base on repeat options and job name.

In some cases, it is desired to pass a custom key to be able to differentiate your repeatable jobs even when they have same repeat options:

```typescript
import { Queue } from 'bullmq';

const myQueue = new Queue('Paint', { connection });

// Repeat job every 10 seconds
await myQueue.add(
'bird',
{ color: 'bird' },
{
repeat: {
every: 1000,
},
key: 'colibri',
},
);

// Repeat job every 10 seconds
await myQueue.add(
'bird',
{ color: 'bird' },
{
repeat: {
every: 1000,
},
key: 'eagle',
},
);

```

{% hint style="warning" %}
While adding a new repeatable job with same key but different repeat options, you will override your previous record.
{% endhint %}

### Read more:

* 💡 [Repeat Strategy API Reference](https://api.docs.bullmq.io/types/v5.RepeatStrategy.html)
Expand Down
109 changes: 74 additions & 35 deletions src/classes/repeat.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import { parseExpression } from 'cron-parser';
import { createHash } from 'crypto';
import { RepeatBaseOptions, RepeatableJob, RepeatOptions } from '../interfaces';
import {
RedisClient,
RepeatBaseOptions,
RepeatableJob,
RepeatOptions,
} from '../interfaces';
import { JobsOptions, RepeatStrategy } from '../types';
import { Job } from './job';
import { QueueBase } from './queue-base';
Expand Down Expand Up @@ -31,7 +36,7 @@ export class Repeat extends QueueBase {
skipCheckExists?: boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you remember the use case where this boolean was needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was inspecting the code and here is false by default https://github.com/taskforcesh/bullmq/blob/master/src/classes/worker.ts#L721 in worker class (check the existence in case we need to stop adding the next delayed job), here is true https://github.com/taskforcesh/bullmq/blob/master/src/classes/queue.ts#L209 (where we are not checking the existence of that repeatable job and we add the repeatable job)

): Promise<Job<T, R, N> | undefined> {
// HACK: This is a temporary fix to enable easy migration from bullmq <3.0.0
// to >= 3.0.0. It should be removed when moving to 4.x.
// to >= 3.0.0. TODO: It should be removed when moving to 4.x.
const repeatOpts: RepeatOptions & { cron?: string } = { ...opts.repeat };
repeatOpts.pattern ??= repeatOpts.cron;
delete repeatOpts.cron;
Expand Down Expand Up @@ -70,23 +75,27 @@ export class Repeat extends QueueBase {
repeatOpts.jobId = opts.jobId;
}

const repeatJobKey = getRepeatKey(name, repeatOpts);
const qualifiedName = getRepeatCocatOptions(name, repeatOpts);

let repeatableExists = true;
const repeatJobKey = await this.scripts.addRepeatableJob(
opts.repeat.key ?? this.hash(qualifiedName),
nextMillis,
{
name,
endDate: repeatOpts.endDate
? new Date(repeatOpts.endDate).getTime()
: undefined,
tz: repeatOpts.tz,
pattern: repeatOpts.pattern,
every: repeatOpts.every,
},
qualifiedName,
skipCheckExists,
);

if (!skipCheckExists) {
// Check that the repeatable job hasn't been removed
// TODO: a lua script would be better here
const client = await this.client;
repeatableExists = !!(await client.zscore(
this.keys.repeat,
repeatJobKey,
));
}
const { immediately, ...filteredRepeatOpts } = repeatOpts;

// The job could have been deleted since this check
if (repeatableExists) {
if (repeatJobKey) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment The job could have been deleted since this check is a bit confusing now since the "repeatableExists" boolean is not used anymore.

return this.createNextJob<T, R, N>(
name,
nextMillis,
Expand All @@ -109,17 +118,12 @@ export class Repeat extends QueueBase {
currentCount: number,
hasImmediately: boolean,
) {
const client = await this.client;

//
// Generate unique job id for this iteration.
//
const jobId = this.getRepeatJobId({
name,
const jobId = this.getRepeatDelayedJobId({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is important that this jobId is generated in the same way as in the previous version to avoid breaking changes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it won't be a breaking change in this case as the jobId is only used for new delayed jobs and set it once. Also when removing old cron jobs I'm considering checking the old format before the new one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the edge case if it adds a delayed job with a different jobId but the same delay time, then you will end up having 2 reputable jobs at least in one iteration, but maybe this edge case cannot happen in practice.

customKey: repeatJobKey,
nextMillis,
namespace: this.hash(repeatJobKey),
jobId: opts.repeat.jobId,
key: opts.repeat.key,
});
const now = Date.now();
const delay =
Expand All @@ -136,8 +140,6 @@ export class Repeat extends QueueBase {

mergedOpts.repeat = { ...opts.repeat, count: currentCount };

await client.zadd(this.keys.repeat, nextMillis.toString(), repeatJobKey);

return this.Job.create<T, R, N>(this, name, data, mergedOpts);
}

Expand All @@ -146,29 +148,56 @@ export class Repeat extends QueueBase {
repeat: RepeatOptions,
jobId?: string,
): Promise<number> {
const repeatJobKey = getRepeatKey(name, { ...repeat, jobId });
const repeatJobId = this.getRepeatJobId({
const qualifiedName = getRepeatCocatOptions(name, { ...repeat, jobId });
const repeatJobKey = repeat.key ?? this.hash(qualifiedName);
const legacyRepeatJobId = this.getRepeatJobId({
name,
nextMillis: '',
namespace: this.hash(repeatJobKey),
namespace: this.hash(qualifiedName),
jobId: jobId ?? repeat.jobId,
key: repeat.key,
});

return this.scripts.removeRepeatable(repeatJobId, repeatJobKey);
return this.scripts.removeRepeatable(
legacyRepeatJobId,
qualifiedName,
repeatJobKey,
);
}

async removeRepeatableByKey(repeatJobKey: string): Promise<number> {
const data = this.keyToData(repeatJobKey);

const repeatJobId = this.getRepeatJobId({
const legacyRepeatJobId = this.getRepeatJobId({
name: data.name,
nextMillis: '',
namespace: this.hash(repeatJobKey),
jobId: data.id,
});

return this.scripts.removeRepeatable(repeatJobId, repeatJobKey);
return this.scripts.removeRepeatable(legacyRepeatJobId, '', repeatJobKey);
}

private async getRepeatableData(
client: RedisClient,
key: string,
next?: number,
): Promise<RepeatableJob> {
const jobData = await client.hgetall(this.toKey('repeat:' + key));

if (jobData) {
return {
key,
name: jobData.name,
endDate: parseInt(jobData.endDate) || null,
tz: jobData.tz || null,
pattern: jobData.pattern || null,
every: jobData.every || null,
next,
};
}

return this.keyToData(key, next);
}

private keyToData(key: string, next?: number): RepeatableJob {
Expand Down Expand Up @@ -200,9 +229,11 @@ export class Repeat extends QueueBase {

const jobs = [];
for (let i = 0; i < result.length; i += 2) {
jobs.push(this.keyToData(result[i], parseInt(result[i + 1])));
jobs.push(
this.getRepeatableData(client, result[i], parseInt(result[i + 1])),
);
}
return jobs;
return Promise.all(jobs);
}

async getRepeatableCount(): Promise<number> {
Expand All @@ -214,6 +245,16 @@ export class Repeat extends QueueBase {
return createHash(this.repeatKeyHashAlgorithm).update(str).digest('hex');
}

private getRepeatDelayedJobId({
nextMillis,
customKey,
}: {
customKey: string;
nextMillis: number | string;
}) {
return `repeat:${customKey}:${nextMillis}`;
}

private getRepeatJobId({
name,
nextMillis,
Expand All @@ -229,12 +270,10 @@ export class Repeat extends QueueBase {
}) {
const checksum = key ?? this.hash(`${name}${jobId || ''}${namespace}`);
return `repeat:${checksum}:${nextMillis}`;
// return `repeat:${jobId || ''}:${name}:${namespace}:${nextMillis}`;
//return `repeat:${name}:${namespace}:${nextMillis}`;
}
}

function getRepeatKey(name: string, repeat: RepeatOptions) {
function getRepeatCocatOptions(name: string, repeat: RepeatOptions) {
const endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : '';
const tz = repeat.tz || '';
const pattern = repeat.pattern;
Expand Down
62 changes: 58 additions & 4 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
WorkerOptions,
KeepJobs,
MoveToDelayedOpts,
RepeatableOptions,
} from '../interfaces';
import {
JobState,
Expand Down Expand Up @@ -259,25 +260,78 @@ export class Scripts {
return (<any>client).pause(args);
}

protected addRepeatableJobArgs(
customKey: string,
nextMillis: number,
opts: RepeatableOptions,
legacyCustomKey: string,
skipCheckExists: boolean,
): (string | number | Buffer)[] {
const keys: (string | number | Buffer)[] = [
this.queue.keys.repeat,
customKey,
];

const args = [
nextMillis,
pack(opts),
legacyCustomKey,
skipCheckExists ? '1' : '0',
];

return keys.concat(args);
}

async addRepeatableJob(
customKey: string,
nextMillis: number,
opts: RepeatableOptions,
legacyCustomKey: string,
skipCheckExists: boolean,
): Promise<string> {
const client = await this.queue.client;

const args = this.addRepeatableJobArgs(
customKey,
nextMillis,
opts,
legacyCustomKey,
skipCheckExists,
);

return (<any>client).addRepeatableJob(args);
}

private removeRepeatableArgs(
repeatJobId: string,
legacyRepeatJobId: string,
qualifiedName: string,
repeatJobKey: string,
): string[] {
const queueKeys = this.queue.keys;

const keys = [queueKeys.repeat, queueKeys.delayed];

const args = [repeatJobId, repeatJobKey, queueKeys['']];
const args = [
legacyRepeatJobId,
qualifiedName,
repeatJobKey,
queueKeys[''],
];

return keys.concat(args);
}

async removeRepeatable(
repeatJobId: string,
legacyRepeatJobId: string,
qualifiedName: string,
repeatJobKey: string,
): Promise<number> {
const client = await this.queue.client;
const args = this.removeRepeatableArgs(repeatJobId, repeatJobKey);
const args = this.removeRepeatableArgs(
legacyRepeatJobId,
qualifiedName,
repeatJobKey,
);

return (<any>client).removeRepeatable(args);
}
Expand Down
Loading
Loading