Skip to content

Commit

Permalink
fix(queue): fix generics to be able to properly be extended
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 14, 2024
1 parent 68cf88a commit f2495e5
Showing 1 changed file with 72 additions and 20 deletions.
92 changes: 72 additions & 20 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export interface ObliterateOpts {
count?: number;
}

export interface QueueListener<DataType, ResultType, NameType extends string>
export interface QueueListener<JobBase extends Job = Job>
extends IoredisListener {
/**
* Listen to 'cleaned' event.
Expand Down Expand Up @@ -57,17 +57,14 @@ export interface QueueListener<DataType, ResultType, NameType extends string>
*
* This event is triggered when the job updates its progress.
*/
progress: (
job: Job<DataType, ResultType, NameType>,
progress: number | object,
) => void;
progress: (job: JobBase, progress: number | object) => void;

/**
* Listen to 'removed' event.
*
* This event is triggered when a job is removed.
*/
removed: (job: Job<DataType, ResultType, NameType>) => void;
removed: (job: JobBase) => void;

/**
* Listen to 'resumed' event.
Expand All @@ -81,21 +78,74 @@ export interface QueueListener<DataType, ResultType, NameType extends string>
*
* This event is triggered when the queue creates a new job.
*/
waiting: (job: Job<DataType, ResultType, NameType>) => void;
waiting: (job: JobBase) => void;
}

// Helper for JobBase type
type JobBase<T, ResultType, NameType extends string> = T extends Job<
any,
any,
any
>
? T
: Job<T, ResultType, NameType>;

// Helper types to extract DataType, ResultType, and NameType
type ExtractDataType<DataTypeOrJob, Default> = DataTypeOrJob extends Job<
infer D,
any,
any
>
? D
: Default;

type ExtractResultType<DataTypeOrJob, Default> = DataTypeOrJob extends Job<
any,
infer R,
any
>
? R
: Default;

type ExtractNameType<
DataTypeOrJob,
Default extends string,
> = DataTypeOrJob extends Job<any, any, infer N> ? N : Default;

/**
* Queue
*
* This class provides methods to add jobs to a queue and some other high-level
* administration such as pausing or deleting queues.
*
* @template DataType - The type of the data that the job will process.
* @template ResultType - The type of the result of the job.
* @template NameType - The type of the name of the job.
*
* @example
*
* ```typescript
* import { Queue } from 'bullmq';
*
* interface MyDataType {
* foo: string;
* }
*
* interface MyResultType {
* bar: string;
* }
*
* const queue = new Queue<MyDataType, MyResultType, "blue" | "brown">('myQueue');
* ```
*/
export class Queue<
DataType = any,
ResultType = any,
NameType extends string = string,
> extends QueueGetters<Job<DataType, ResultType, NameType>> {
DataTypeOrJob = any,
DefaultResultType = any,
DefaultNameType extends string = string,
DataType = ExtractDataType<DataTypeOrJob, DataTypeOrJob>,
ResultType = ExtractResultType<DataTypeOrJob, DefaultResultType>,
NameType extends string = ExtractNameType<DataTypeOrJob, DefaultNameType>,
> extends QueueGetters<JobBase<DataTypeOrJob, ResultType, NameType>> {
token = v4();
jobsOpts: BaseJobOptions;
opts: QueueOptions;
Expand Down Expand Up @@ -132,32 +182,34 @@ export class Queue<
});
}

emit<U extends keyof QueueListener<DataType, ResultType, NameType>>(
emit<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(
event: U,
...args: Parameters<QueueListener<DataType, ResultType, NameType>[U]>
...args: Parameters<
QueueListener<JobBase<DataType, ResultType, NameType>>[U]
>
): boolean {
return super.emit(event, ...args);
}

off<U extends keyof QueueListener<DataType, ResultType, NameType>>(
off<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(
eventName: U,
listener: QueueListener<DataType, ResultType, NameType>[U],
listener: QueueListener<JobBase<DataType, ResultType, NameType>>[U],
): this {
super.off(eventName, listener);
return this;
}

on<U extends keyof QueueListener<DataType, ResultType, NameType>>(
on<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(
event: U,
listener: QueueListener<DataType, ResultType, NameType>[U],
listener: QueueListener<JobBase<DataType, ResultType, NameType>>[U],
): this {
super.on(event, listener);
return this;
}

once<U extends keyof QueueListener<DataType, ResultType, NameType>>(
once<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(
event: U,
listener: QueueListener<DataType, ResultType, NameType>[U],
listener: QueueListener<JobBase<DataType, ResultType, NameType>>[U],
): this {
super.once(event, listener);
return this;
Expand Down Expand Up @@ -290,7 +342,7 @@ export class Queue<
jobId,
},
);
this.emit('waiting', job);
this.emit('waiting', job as JobBase<DataType, ResultType, NameType>);

span?.setAttributes({
[TelemetryAttributes.JobId]: job.id,
Expand Down

0 comments on commit f2495e5

Please sign in to comment.