Skip to content

Commit

Permalink
fix(connection): require connection to be passed (#2335)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Dec 19, 2023
1 parent 11bf83c commit 1867dd1
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 39 deletions.
16 changes: 3 additions & 13 deletions src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class FlowProducer extends EventEmitter {
protected connection: RedisConnection;

constructor(
public opts: QueueBaseOptions = {},
public opts: QueueBaseOptions = { connection: {} },
Connection: typeof RedisConnection = RedisConnection,
) {
super();
Expand All @@ -108,19 +108,9 @@ export class FlowProducer extends EventEmitter {
...opts,
};

if (!opts.connection) {
console.warn(
[
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer',
'without providing explicitly a connection or connection options is deprecated. This behaviour will',
'be removed in the next major release',
].join(' '),
);
}

this.connection = new Connection(
opts.connection,
isRedisInstance(opts?.connection),
isRedisInstance(opts.connection),
false,
opts.skipVersionCheck,
);
Expand Down Expand Up @@ -459,7 +449,7 @@ export class FlowProducer extends EventEmitter {
name: node.queueName,
keys: queueKeys.getKeys(node.queueName),
toKey: (type: string) => queueKeys.toKey(node.queueName, type),
opts: { prefix },
opts: { prefix, connection: {} },
qualifiedName: queueKeys.getQueueQualifiedName(node.queueName),
closing: this.closing,
waitUntilReady: async () => this.connection.client,
Expand Down
14 changes: 2 additions & 12 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
*/
constructor(
public readonly name: string,
public opts: QueueBaseOptions = {},
public opts: QueueBaseOptions = { connection: {} },
Connection: typeof RedisConnection = RedisConnection,
) {
super();
Expand All @@ -53,19 +53,9 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
throw new Error('Queue name must be provided');
}

if (!opts.connection) {
console.warn(
[
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer',
'without providing explicitly a connection or connection options is deprecated. This behaviour will',
'be removed in the next major release',
].join(' '),
);
}

this.connection = new Connection(
opts.connection,
isRedisInstance(opts?.connection),
isRedisInstance(opts.connection),
opts.blockingConnection,
opts.skipVersionCheck,
);
Expand Down
4 changes: 3 additions & 1 deletion src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ export class QueueEvents extends QueueBase {

constructor(
name: string,
{ connection, autorun = true, ...opts }: QueueEventsOptions = {},
{ connection, autorun = true, ...opts }: QueueEventsOptions = {
connection: {},
},
Connection?: typeof RedisConnection,
) {
super(
Expand Down
20 changes: 13 additions & 7 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ const overrideMessage = [
'and will be overridden by BullMQ.',
].join(' ');

const deprecationMessage = [
'BullMQ: DEPRECATION WARNING! Your redis options maxRetriesPerRequest must be null.',
'On the next versions having this settings will throw an exception',
].join(' ');
const deprecationMessage =
'BullMQ: Your redis options maxRetriesPerRequest must be null.';

interface RedisCapabilities {
canDoubleTimeout: boolean;
Expand Down Expand Up @@ -94,7 +92,7 @@ export class RedisConnection extends EventEmitter {
this.opts = this._client.options;
}

this.checkBlockingOptions(deprecationMessage, this.opts);
this.checkBlockingOptions(deprecationMessage, this.opts, true);
}

this.skipVersionCheck =
Expand All @@ -115,9 +113,17 @@ export class RedisConnection extends EventEmitter {
this.initializing.catch(err => this.emit('error', err));
}

private checkBlockingOptions(msg: string, options?: RedisOptions) {
private checkBlockingOptions(
msg: string,
options?: RedisOptions,
throwError = false,
) {
if (this.blocking && options && options.maxRetriesPerRequest) {
console.error(msg);
if (throwError) {
throw new Error(msg);
} else {
console.error(msg);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class Worker<
constructor(
name: string,
processor?: string | null | Processor<DataType, ResultType, NameType>,
opts: WorkerOptions = {},
opts: WorkerOptions = { connection: {} },
Connection?: typeof RedisConnection,
) {
super(
Expand Down
2 changes: 1 addition & 1 deletion src/interfaces/queue-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface QueueBaseOptions {
/**
* Options for connecting to a Redis instance.
*/
connection?: ConnectionOptions;
connection: ConnectionOptions;

/**
* Denotes commands should retry indefinitely.
Expand Down
8 changes: 4 additions & 4 deletions tests/test_script_loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ describe('scriptLoader', () => {
const info = cache.get(path.basename(path.resolve(fixture), '.lua'));

expect(info).to.not.eql(undefined);
expect(info.includes.length).to.eql(1);
expect(info?.includes.length).to.eql(1);

const include = info.includes[0];
expect(include.name).to.eql('math');
expect(include.path.startsWith(includePath)).to.be.true;
const include = info?.includes[0];
expect(include?.name).to.eql('math');
expect(include?.path.startsWith(includePath)).to.be.true;
});

it('supports path mapping and globs simultaneously', async () => {
Expand Down

0 comments on commit 1867dd1

Please sign in to comment.