Skip to content

Commit

Permalink
feat: reconnect and cancelAll consumers
Browse files Browse the repository at this point in the history
Provided that they consumed with the channel wrapper function
`consume`
  • Loading branch information
luddd3 committed Aug 27, 2021
1 parent b2c89ac commit fb0c00b
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 4 deletions.
108 changes: 104 additions & 4 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ interface SendToQueueMessage {
reject: (err: Error) => void;
}

interface Consumer {
consumerTag: string | null;
queue: string;
onMessage: (msg: amqplib.ConsumeMessage) => void;
consumeOptions?: amqplib.Options.Consume;
options?: { prefetch?: number };
}

type Message = PublishMessage | SendToQueueMessage;

const IRRECOVERABLE_ERRORS = [
Expand Down Expand Up @@ -87,6 +95,8 @@ export default class ChannelWrapper extends EventEmitter {
private _unconfirmedMessages: Message[] = [];
/** Reason code during publish or sendtoqueue messages. */
private _irrecoverableCode: number | undefined;
/** Consumers which will be reconnected on channel errors etc. */
private _consumers: Consumer[] = [];

/**
* The currently connected channel. Note that not all setup functions
Expand Down Expand Up @@ -324,6 +334,8 @@ export default class ChannelWrapper extends EventEmitter {

// Array of setup functions to call.
this._setups = [];
this._consumers = [];

if (options.setup) {
this._setups.push(options.setup);
}
Expand Down Expand Up @@ -359,10 +371,13 @@ export default class ChannelWrapper extends EventEmitter {
this.emit('error', err, { name: this.name });
})
)
).then(() => {
this._settingUp = undefined;
});

)
.then(() => {
return Promise.all(this._consumers.map((c) => this._reconnectConsumer(c)));
})
.then(() => {
this._settingUp = undefined;
});
await this._settingUp;

if (!this._channel) {
Expand Down Expand Up @@ -581,6 +596,91 @@ export default class ChannelWrapper extends EventEmitter {
}
}

/**
* Setup a consumer
* This consumer will be reconnected on cancellation and channel errors.
*/
async consume(
queue: string,
onMessage: Consumer['onMessage'],
consumeOptions?: Consumer['consumeOptions'],
options?: Consumer['options']
): Promise<void> {
const consumer: Consumer = {
consumerTag: null,
queue,
onMessage,
consumeOptions,
options,
};
this._consumers.push(consumer);
await this._consume(consumer);
}

private async _consume(consumer: Consumer): Promise<void> {
if (!this._channel) {
return;
}

const options = consumer.options;
if (options?.prefetch) {
this._channel.prefetch(options.prefetch, false);
}

const { consumerTag } = await this._channel.consume(
consumer.queue,
(msg) => {
if (!msg) {
consumer.consumerTag = null;
this._reconnectConsumer(consumer).catch((err) => {
if (err.isOperational && err.message.includes('BasicConsume; 404')) {
// Ignore errors caused by queue not declared. In
// those cases the connection will reconnect and
// then consumers reestablished. The full reconnect
// might be avoided if we assert the queue again
// before starting to consume.
return;
}
throw err;
});
return;
}
consumer.onMessage(msg);
},
consumer.consumeOptions
);
consumer.consumerTag = consumerTag;
}

private async _reconnectConsumer(consumer: Consumer): Promise<void> {
if (!this._consumers.includes(consumer)) {
// Intentionally canceled
return;
}
await this._consume(consumer);
}

/**
* Cancel all consumers
*/
async cancelAll(): Promise<void> {
const consumers = this._consumers;
this._consumers = [];
if (!this._channel) {
return;
}

const channel = this._channel;
await Promise.all(
consumers.reduce<any[]>((acc, consumer) => {
if (consumer.consumerTag) {
acc.push(channel.cancel(consumer.consumerTag));
}
return acc;
}, [])
);
}

/** Send an `ack` to the underlying channel. */
ack(message: amqplib.Message, allUpTo?: boolean): void {
this._channel && this._channel.ack(message, allUpTo);
Expand Down
175 changes: 175 additions & 0 deletions test/ChannelWrapperTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,181 @@ describe('ChannelWrapper', function () {
// Final message should have been published to the underlying queue.
expect(queue.length).to.equal(2);
});

it('should consume messages', async function () {
let onMessage: any = null;

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => {
onMessage = onMsg;
return Promise.resolve({ consumerTag: 'abc' });
});
},
});
await channelWrapper.waitForConnect();

const messages: any[] = [];
await channelWrapper.consume(
'queue',
(msg) => {
messages.push(msg);
},
{ noAck: true }
);

onMessage(1);
onMessage(2);
onMessage(3);
expect(messages).to.deep.equal([1, 2, 3]);
});

it('should reconnect consumer on consumer cancellation', async function () {
let onMessage: any = null;
let consumerTag = 0;

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((_queue, onMsg, _options) => {
onMessage = onMsg;
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
},
});
await channelWrapper.waitForConnect();

const messages: any[] = [];
await channelWrapper.consume('queue', (msg) => {
messages.push(msg);
});

onMessage(1);
onMessage(null); // simulate consumer cancel
onMessage(2);
onMessage(null); // simulate second cancel
onMessage(3);

expect(messages).to.deep.equal([1, 2, 3]);
expect(consumerTag).to.equal(3);
});

it('should reconnect consumers on channel error', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
let consumerTag = 0;

// Define a prefetch function here, because it will otherwise be
// unique for each new channel
const prefetchFn = jest
.fn()
.mockImplementation((_prefetch: number, _isGlobal: boolean) => {});

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.prefetch = prefetchFn;
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
await channelWrapper.consume(
'queue1',
(msg) => {
queue1.push(msg);
},
{ noAck: true },
{ prefetch: 10 }
);

const queue2: any[] = [];
await channelWrapper.consume('queue2', (msg) => {
queue2.push(msg);
});

onQueue1(1);
onQueue2(1);

connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

onQueue1(2);
onQueue2(2);

expect(queue1).to.deep.equal([1, 2]);
expect(queue2).to.deep.equal([1, 2]);
expect(consumerTag).to.equal(4);
expect(prefetchFn).to.have.beenCalledTimes(2);
expect(prefetchFn).to.have.beenNthCalledWith(1, 10, false);
expect(prefetchFn).to.have.beenNthCalledWith(2, 10, false);
});

it('should be able to cancel all consumers', async function () {
let onQueue1: any = null;
let onQueue2: any = null;
let consumerTag = 0;
const canceledTags: number[] = [];

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
async setup(channel: amqplib.ConfirmChannel) {
channel.consume = jest.fn().mockImplementation((queue, onMsg, _options) => {
if (queue === 'queue1') {
onQueue1 = onMsg;
} else {
onQueue2 = onMsg;
}
return Promise.resolve({ consumerTag: `${consumerTag++}` });
});
channel.cancel = jest.fn().mockImplementation((consumerTag) => {
canceledTags.push(consumerTag);
if (consumerTag === '0') {
onQueue1(null);
} else if (consumerTag === '1') {
onQueue2(null);
}
return Promise.resolve();
});
},
});
await channelWrapper.waitForConnect();

const queue1: any[] = [];
await channelWrapper.consume('queue1', (msg) => {
queue1.push(msg);
});

const queue2: any[] = [];
await channelWrapper.consume('queue2', (msg) => {
queue2.push(msg);
});

onQueue1(1);
onQueue2(1);

await channelWrapper.cancelAll();

// Consumers shouldn't be resumed after reconnect when canceled
connectionManager.simulateDisconnect();
connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

expect(queue1).to.deep.equal([1]);
expect(queue2).to.deep.equal([1]);
expect(consumerTag).to.equal(2);
expect(canceledTags).to.deep.equal(['0', '1']);
});
});

/** Returns the arguments of the most recent call to this mock. */
Expand Down
6 changes: 6 additions & 0 deletions test/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ export class FakeConfirmChannel extends EventEmitter {
close = jest.fn().mockImplementation(async (): Promise<void> => {
this.emit('close');
});

consume = jest.fn().mockImplementation(async (): Promise<Replies.Consume> => {
return { consumerTag: 'abc' };
});

prefetch = jest.fn().mockImplementation((_prefetch: number, _isGlobal: boolean): void => {});
}

export class FakeConnection extends EventEmitter {
Expand Down

0 comments on commit fb0c00b

Please sign in to comment.