Skip to content

Commit

Permalink
feat(job-queue-plugin): Update bullmq & redis dependencies (#2020)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: The minimum Redis recommended version is 6.2.0
  • Loading branch information
alexisvigoureux authored Mar 20, 2023
1 parent 57793cf commit eb0b73f
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,21 @@ export type CachedSession = {
* @example
* ```TypeScript
* import { CachedSession, Logger, SessionCacheStrategy, VendurePlugin } from '\@vendure/core';
* import IORedis from 'ioredis';
* import { Redis, RedisOptions } from 'ioredis';
*
* export interface RedisSessionCachePluginOptions {
* namespace?: string;
* redisOptions?: IORedis.RedisOptions;
* redisOptions?: RedisOptions;
* }
* const loggerCtx = 'RedisSessionCacheStrategy';
* const DEFAULT_NAMESPACE = 'vendure-session-cache';
*
* export class RedisSessionCacheStrategy implements SessionCacheStrategy {
* private client: IORedis.Redis;
* private client: Redis;
* constructor(private options: RedisSessionCachePluginOptions) {}
*
* init() {
* this.client = new IORedis(this.options.redisOptions);
* this.client = new Redis(this.options.redisOptions as RedisOptions);
* this.client.on('error', err => Logger.error(err.message, loggerCtx, err.stack));
* }
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { CachedSession, Logger, SessionCacheStrategy, VendurePlugin } from '@vendure/core';
import IORedis from 'ioredis';
import { Redis, RedisOptions } from 'ioredis';

const loggerCtx = 'RedisSessionCacheStrategy';
const DEFAULT_NAMESPACE = 'vendure-session-cache';

export class RedisSessionCacheStrategy implements SessionCacheStrategy {
private client: IORedis.Redis;
private client: Redis;

constructor(private options: RedisSessionCachePluginOptions) {}

init() {
this.client = new IORedis(this.options.redisOptions);
this.client = new Redis(this.options.redisOptions as RedisOptions);
this.client.on('error', err => Logger.error(err.message, loggerCtx, err.stack));
}

Expand Down Expand Up @@ -44,7 +44,7 @@ export class RedisSessionCacheStrategy implements SessionCacheStrategy {

export interface RedisSessionCachePluginOptions {
namespace?: string;
redisOptions?: IORedis.RedisOptions;
redisOptions?: RedisOptions;
}

@VendurePlugin({
Expand Down
4 changes: 2 additions & 2 deletions packages/job-queue-plugin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
},
"devDependencies": {
"@google-cloud/pubsub": "^2.8.0",
"@types/ioredis": "^4.28.10",
"ioredis": "^5.3.0",
"@vendure/common": "^2.0.0-next.28",
"@vendure/core": "^2.0.0-next.28",
"bullmq": "^1.86.7",
"bullmq": "^3.6.1",
"rimraf": "^3.0.2",
"typescript": "4.9.5"
}
Expand Down
30 changes: 10 additions & 20 deletions packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import Bull, {
JobType,
Processor,
Queue,
QueueScheduler,
Worker,
WorkerOptions,
} from 'bullmq';
Expand All @@ -41,7 +40,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
private connectionOptions: ConnectionOptions;
private queue: Queue;
private worker: Worker;
private scheduler: QueueScheduler;
private workerProcessor: Processor;
private options: BullMQPluginOptions;
private queueNameProcessFnMap = new Map<string, (job: Job) => Promise<any>>();
Expand Down Expand Up @@ -106,20 +104,10 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
}
throw new InternalServerError(`No processor defined for the queue "${queueName}"`);
};

this.scheduler = new QueueScheduler(QUEUE_NAME, {
...options.schedulerOptions,
connection: this.redisConnection,
})
.on('error', (e: any) =>
Logger.error(`BullMQ Scheduler error: ${JSON.stringify(e.message)}`, loggerCtx, e.stack),
)
.on('stalled', jobId => Logger.warn(`BullMQ Scheduler stalled on job ${jobId}`, loggerCtx))
.on('failed', jobId => Logger.warn(`BullMQ Scheduler failed on job ${jobId}`, loggerCtx));
}

async destroy() {
await Promise.all([this.queue.close(), this.worker?.close(), this.scheduler.close()]);
await Promise.all([this.queue.close(), this.worker?.close()]);
}

async add<Data extends JobData<Data> = object>(job: Job<Data>): Promise<Job<Data>> {
Expand Down Expand Up @@ -250,16 +238,19 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
this.worker = new Worker(QUEUE_NAME, this.workerProcessor, options)
.on('error', e => Logger.error(`BullMQ Worker error: ${e.message}`, loggerCtx, e.stack))
.on('closing', e => Logger.verbose(`BullMQ Worker closing: ${e}`, loggerCtx))
.on('closed', () => Logger.verbose('BullMQ Worker closed'))
.on('failed', (job: Bull.Job, failedReason) => {
.on('closed', () => Logger.verbose(`BullMQ Worker closed`))
.on('failed', (job: Bull.Job | undefined, error) => {
Logger.warn(
`Job ${job.id ?? ''} [${job.name}] failed (attempt ${job.attemptsMade} of ${
job.opts.attempts ?? 1
`Job ${job?.id} [${job?.name}] failed (attempt ${job?.attemptsMade} of ${
job?.opts.attempts ?? 1
})`,
);
})
.on('completed', (job: Bull.Job, failedReason: string) => {
Logger.debug(`Job ${job.id ?? ''} [${job.name}] completed`);
.on('stalled', (jobId: string) => {
Logger.warn(`BullMQ Worker: job ${jobId} stalled`, loggerCtx);
})
.on('completed', (job: Bull.Job) => {
Logger.debug(`Job ${job.id} [${job.name}] completed`);
});
}
}
Expand All @@ -273,7 +264,6 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
this.stopped = true;
try {
await Promise.all([
this.scheduler.disconnect(),
this.queue.disconnect(),
this.worker.disconnect(),
]);
Expand Down
9 changes: 1 addition & 8 deletions packages/job-queue-plugin/src/bullmq/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Job } from '@vendure/core';
import { ConnectionOptions, QueueSchedulerOptions, WorkerOptions } from 'bullmq';
import { ConnectionOptions, WorkerOptions } from 'bullmq';
import { QueueOptions } from 'bullmq';

/**
Expand Down Expand Up @@ -34,13 +34,6 @@ export interface BullMQPluginOptions {
* See the [BullMQ WorkerOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.workeroptions.md)
*/
workerOptions?: Exclude<WorkerOptions, 'connection'>;
/**
* @description
* Additional options used when instantiating the BullMQ
* QueueScheduler instance.
* See the [BullMQ QueueSchedulerOptions docs](https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.queuescheduleroptions.md)
*/
schedulerOptions?: Exclude<QueueSchedulerOptions, 'connection'>;
/**
* @description
* When a job is added to the JobQueue using `JobQueue.add()`, the calling
Expand Down

0 comments on commit eb0b73f

Please sign in to comment.