Skip to content

Commit

Permalink
feat(job-scheduler): add telemetry support to the job scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 19, 2024
1 parent 2129569 commit 72ea950
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 26 deletions.
77 changes: 51 additions & 26 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { JobsOptions, RepeatStrategy } from '../types';
import { Job } from './job';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';

export interface JobSchedulerJson {
key: string; // key is actually the job scheduler id
Expand All @@ -24,6 +25,7 @@ export class JobScheduler extends QueueBase {
opts: RepeatBaseOptions,
Connection?: typeof RedisConnection,
) {
console.log('JobScheduler constructor', name, opts);
super(name, opts, Connection);

this.repeatStrategy =
Expand All @@ -46,6 +48,12 @@ export class JobScheduler extends QueueBase {
);
}

if (!pattern && !every) {
throw new Error(
'Either .pattern or .every options must be defined for this repeatable job',
);
}

if (repeatOpts.immediately && repeatOpts.startDate) {
throw new Error(
'Both .immediately and .startDate options are defined for this repeatable job',
Expand Down Expand Up @@ -77,7 +85,7 @@ export class JobScheduler extends QueueBase {
now = startMillis > now ? startMillis : now;
}

let nextMillis;
let nextMillis: number;
if (every) {
nextMillis = prevMillis + every;

Expand All @@ -92,7 +100,7 @@ export class JobScheduler extends QueueBase {
const multi = (await this.client).multi();
if (nextMillis) {
if (override) {
await this.scripts.addJobScheduler(
this.scripts.addJobScheduler(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
Expand All @@ -105,37 +113,54 @@ export class JobScheduler extends QueueBase {
},
);
} else {
await this.scripts.updateJobSchedulerNextMillis(
this.scripts.updateJobSchedulerNextMillis(
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
);
}

const job = this.createNextJob<T, R, N>(
(<unknown>multi) as RedisClient,
jobName,
nextMillis,
jobSchedulerId,
{ ...opts, repeat: filteredRepeatOpts },
jobData,
iterationCount,
return this.trace<Job<T, R, N>>(
SpanKind.PRODUCER,
'add',
`${this.name}.${jobName}`,
async (span, srcPropagationMedatada) => {
const job = this.createNextJob<T, R, N>(
(<unknown>multi) as RedisClient,
jobName,
nextMillis,
jobSchedulerId,
{
...opts,
repeat: filteredRepeatOpts,
telemetryMetadata: srcPropagationMedatada,
},
jobData,
iterationCount,
);

const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]

// Check if there are any errors
const erroredResult = results.find(result => result[0]);
if (erroredResult) {
throw new Error(
`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`,
);
}

// Get last result with the job id
const lastResult = results.pop();
job.id = lastResult[1] as string;

span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});

return job;
},
);

const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]

// Check if there are any errors
const erroredResult = results.find(result => result[0]);
if (erroredResult) {
throw new Error(
`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`,
);
}

// Get last result with the job id
const lastResult = results.pop();
job.id = lastResult[1] as string;
return job;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/enums/telemetry-attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export enum TelemetryAttributes {
JobResult = 'bullmq.job.result',
JobFailedReason = 'bullmq.job.failed.reason',
FlowName = 'bullmq.flow.name',
JobSchedulerId = 'bullmq.job.scheduler.id',
}

export enum SpanKind {
Expand Down
6 changes: 6 additions & 0 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,12 @@ describe('Job Scheduler', function () {
);
});

it('should throw an error when not specifying .pattern or .every', async function () {
await expect(queue.upsertJobScheduler('repeat', {})).to.be.rejectedWith(
'Either .pattern or .every options must be defined for this repeatable job',
);
});

it('should throw an error when using .immediately and .startDate simultaneously', async function () {
await expect(
queue.upsertJobScheduler('repeat', {
Expand Down
50 changes: 50 additions & 0 deletions tests/test_telemetry_interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,56 @@ describe('Telemetry', () => {
});
});

describe('Queue.upsertJobScheduler', async () => {
it('should correctly interact with telemetry when adding a job scheduler', async () => {
const jobSchedulerId = 'testJobScheduler';
const data = { foo: 'bar' };

await queue.upsertJobScheduler(
jobSchedulerId,
{ every: 1000, endDate: Date.now() + 1000 },
{ name: 'repeatable-job', data },
);

const activeContext = telemetryClient.contextManager.active();
const span = activeContext.getSpan?.() as MockSpan;
expect(span).to.be.an.instanceOf(MockSpan);
expect(span.name).to.equal(`add ${queueName}.repeatable-job`);
expect(span.options?.kind).to.equal(SpanKind.PRODUCER);
expect(span.attributes[TelemetryAttributes.JobSchedulerId]).to.equal(
jobSchedulerId,
);
expect(span.attributes[TelemetryAttributes.JobId]).to.be.a('string');
expect(span.attributes[TelemetryAttributes.JobId]).to.include(
`repeat:${jobSchedulerId}:`,
);
});

it('should correctly handle errors and record them in telemetry for upsertJobScheduler', async () => {
const recordExceptionSpy = sinon.spy(
MockSpan.prototype,
'recordException',
);

try {
await queue.upsertJobScheduler(
'testJobScheduler',
{ endDate: 0 },
{ data: { foo: 'bar' } },
);
} catch (e) {
assert(recordExceptionSpy.calledOnce);
const recordedError = recordExceptionSpy.firstCall.args[0];
assert.equal(
recordedError.message,
'End date must be greater than current timestamp',
);
} finally {
recordExceptionSpy.restore();
}
});
});

describe('Worker.processJob', async () => {
it('should correctly interact with telemetry when processing a job', async () => {
const job = await queue.add('testJob', { foo: 'bar' });
Expand Down

0 comments on commit 72ea950

Please sign in to comment.