Skip to content

Commit

Permalink
feat(queue): refactor a protected addJob method allowing telemetry ex…
Browse files Browse the repository at this point in the history
…tensions
  • Loading branch information
manast committed Nov 22, 2024
1 parent 4d3eb88 commit 09f2571
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 37 deletions.
91 changes: 55 additions & 36 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,47 +311,66 @@ export class Queue<
opts = { ...opts, telemetryMetadata: srcPropagationMedatada };
}

if (opts && opts.repeat) {
if (opts.repeat.endDate) {
if (+new Date(opts.repeat.endDate) < Date.now()) {
throw new Error(
'End date must be greater than current timestamp',
);
}
}
const job = await this.addJob(name, data, opts);

return (await this.repeat).updateRepeatableJob<
DataType,
ResultType,
NameType
>(name, data, { ...this.jobsOpts, ...opts }, { override: true });
} else {
const jobId = opts?.jobId;
span?.setAttributes({
[TelemetryAttributes.JobName]: name,
[TelemetryAttributes.JobId]: job.id,
});

if (jobId == '0' || jobId?.startsWith('0:')) {
throw new Error("JobId cannot be '0' or start with 0:");
}
return job;
},
);
}

const job = await this.Job.create<DataType, ResultType, NameType>(
this as MinimalQueue,
name,
data,
{
...this.jobsOpts,
...opts,
jobId,
},
);
this.emit('waiting', job as JobBase<DataType, ResultType, NameType>);
/**
* addJob is a telemetry free version of the add method, useful in order to wrap it
* with custom telemetry on subclasses.
*
* @param name
* @param data
* @param opts
*
* @returns Job
*/
protected async addJob(
name: NameType,
data: DataType,
opts?: JobsOptions,
): Promise<Job<DataType, ResultType, NameType>> {
if (opts && opts.repeat) {
if (opts.repeat.endDate) {
if (+new Date(opts.repeat.endDate) < Date.now()) {
throw new Error('End date must be greater than current timestamp');
}
}

span?.setAttributes({
[TelemetryAttributes.JobId]: job.id,
});
return (await this.repeat).updateRepeatableJob<
DataType,
ResultType,
NameType
>(name, data, { ...this.jobsOpts, ...opts }, { override: true });
} else {
const jobId = opts?.jobId;

return job;
}
},
);
if (jobId == '0' || jobId?.startsWith('0:')) {
throw new Error("JobId cannot be '0' or start with 0:");
}

const job = await this.Job.create<DataType, ResultType, NameType>(
this as MinimalQueue,
name,
data,
{
...this.jobsOpts,
...opts,
jobId,
},
);
this.emit('waiting', job as JobBase<DataType, ResultType, NameType>);

return job;
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ will never work with more accuracy than 1ms. */
* This method waits for current jobs to finalize before returning.
*
* @param force - Use force boolean parameter if you do not want to wait for
* current jobs to be processed. When using telemetry, be mindful that it can
* current jobs to be processed. When using telemetry, be mindful that it can
* interfere with the proper closure of spans, potentially preventing them from being exported.
*
* @returns Promise that resolves when the worker has been closed.
Expand Down

0 comments on commit 09f2571

Please sign in to comment.