diff --git a/README.md b/README.md index 8c43eb6..a1aef30 100644 --- a/README.md +++ b/README.md @@ -132,8 +132,9 @@ Options: Note that `this` inside the setup function will the returned ChannelWrapper. The ChannelWrapper has a special `context` member you can use to store arbitrary data in. -- `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()` +- `options.json` - if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()` are plain JSON objects. These will be encoded automatically before being sent. +- `options.confirm` - if true (default), the created channel will be a ConfirmChannel - `options.publishTimeout` - a default timeout for messages published to this channel. ### AmqpConnectionManager#isConnected() diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index 1c271f8..263c5bb 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -1,14 +1,18 @@ -import * as amqplib from 'amqplib'; -import { ConfirmChannel, Options } from 'amqplib'; +import type * as amqplib from 'amqplib'; +import { Options } from 'amqplib'; import { EventEmitter } from 'events'; import pb from 'promise-breaker'; import { IAmqpConnectionManager } from './AmqpConnectionManager.js'; const MAX_MESSAGES_PER_BATCH = 1000; +export type Channel = amqplib.ConfirmChannel | amqplib.Channel; + export type SetupFunc = - | ((channel: ConfirmChannel, callback: (error?: Error) => void) => void) - | ((channel: ConfirmChannel) => Promise); + | ((channel: Channel, callback: (error?: Error) => void) => void) + | ((channel: Channel) => Promise) + | ((channel: amqplib.ConfirmChannel, callback: (error?: Error) => void) => void) + | ((channel: amqplib.ConfirmChannel) => Promise); export interface CreateChannelOpts { /** Name for this channel. Used for debugging. */ @@ -18,6 +22,10 @@ export interface CreateChannelOpts { * This function should either accept a callback, or return a Promise. See addSetup below */ setup?: SetupFunc; + /** + * True to create a ConfirmChannel (default). False to create a regular Channel. + */ + confirm?: boolean; /** * if true, then ChannelWrapper assumes all messages passed to publish() and sendToQueue() are plain JSON objects. * These will be encoded automatically before being sent. @@ -33,7 +41,7 @@ interface PublishMessage { type: 'publish'; exchange: string; routingKey: string; - content: Buffer | string | unknown; + content: Buffer; options?: amqplib.Options.Publish; resolve: (result: boolean) => void; reject: (err: Error) => void; @@ -44,7 +52,7 @@ interface PublishMessage { interface SendToQueueMessage { type: 'sendToQueue'; queue: string; - content: Buffer | string | unknown; + content: Buffer; options?: amqplib.Options.Publish; resolve: (result: boolean) => void; reject: (err: Error) => void; @@ -119,8 +127,12 @@ export default class ChannelWrapper extends EventEmitter { * have been run on this channel until `@_settingUp` is either null or * resolved. */ - private _channel?: amqplib.ConfirmChannel; + private _channel?: Channel; + /** + * True to create a ConfirmChannel. False to create a regular Channel. + */ + private _confirm = true; /** * True if the "worker" is busy sending messages. False if we need to * start the worker to get stuff done. @@ -234,11 +246,7 @@ export default class ChannelWrapper extends EventEmitter { * @param {function} [done] - Optional callback. * @returns {void | Promise} - Resolves when complete. */ - removeSetup( - setup: SetupFunc, - teardown?: SetupFunc, - done?: pb.Callback - ): Promise { + removeSetup(setup: SetupFunc, teardown?: SetupFunc, done?: pb.Callback): Promise { return pb.addCallback(done, () => { this._setups = this._setups.filter((s) => s !== setup); @@ -291,7 +299,7 @@ export default class ChannelWrapper extends EventEmitter { type: 'publish', exchange, routingKey, - content, + content: this._getEncodedMessage(content), resolve, reject, options: opts, @@ -319,6 +327,8 @@ export default class ChannelWrapper extends EventEmitter { options?: PublishOptions, done?: pb.Callback ): Promise { + const encodedContent = this._getEncodedMessage(content); + return pb.addCallback( done, new Promise((resolve, reject) => { @@ -327,7 +337,7 @@ export default class ChannelWrapper extends EventEmitter { { type: 'sendToQueue', queue, - content, + content: encodedContent, resolve, reject, options: opts, @@ -378,6 +388,7 @@ export default class ChannelWrapper extends EventEmitter { this._onConnect = this._onConnect.bind(this); this._onDisconnect = this._onDisconnect.bind(this); this._connectionManager = connectionManager; + this._confirm = options.confirm ?? true; this.name = options.name; this._publishTimeout = options.publishTimeout; @@ -404,7 +415,12 @@ export default class ChannelWrapper extends EventEmitter { this._irrecoverableCode = undefined; try { - const channel = await connection.createConfirmChannel(); + let channel: Channel; + if (this._confirm) { + channel = await connection.createConfirmChannel(); + } else { + channel = await connection.createChannel(); + } this._channel = channel; this._channelHasRoom = true; @@ -447,7 +463,7 @@ export default class ChannelWrapper extends EventEmitter { } // Called whenever the channel closes. - private _onChannelClose(channel: amqplib.ConfirmChannel): void { + private _onChannelClose(channel: Channel): void { if (this._channel === channel) { this._channel = undefined; } @@ -552,7 +568,7 @@ export default class ChannelWrapper extends EventEmitter { } } - private _getEncodedMessage(content: Message['content']): Buffer { + private _getEncodedMessage(content: Buffer | string | unknown): Buffer { let encodedMessage: Buffer; if (this._json) { @@ -597,58 +613,81 @@ export default class ChannelWrapper extends EventEmitter { break; } - this._unconfirmedMessages.push(message); - - const encodedMessage = this._getEncodedMessage(message.content); + let thisCanSend = true; switch (message.type) { case 'publish': { - let thisCanSend = true; - thisCanSend = this._channelHasRoom = channel.publish( - message.exchange, - message.routingKey, - encodedMessage, - message.options, - (err) => { - if (message.isTimedout) { - return; - } - - if (message.timeout) { - clearTimeout(message.timeout); - } - - if (err) { - this._messageRejected(message, err); - } else { - this._messageResolved(message, thisCanSend); + if (this._confirm) { + this._unconfirmedMessages.push(message); + thisCanSend = this._channelHasRoom = channel.publish( + message.exchange, + message.routingKey, + message.content, + message.options, + (err) => { + if (message.isTimedout) { + return; + } + + if (message.timeout) { + clearTimeout(message.timeout); + } + + if (err) { + this._messageRejected(message, err); + } else { + this._messageResolved(message, thisCanSend); + } } + ); + } else { + if (message.timeout) { + clearTimeout(message.timeout); } - ); + thisCanSend = this._channelHasRoom = channel.publish( + message.exchange, + message.routingKey, + message.content, + message.options + ); + message.resolve(thisCanSend); + } break; } case 'sendToQueue': { - let thisCanSend = true; - thisCanSend = this._channelHasRoom = channel.sendToQueue( - message.queue, - encodedMessage, - message.options, - (err) => { - if (message.isTimedout) { - return; - } - - if (message.timeout) { - clearTimeout(message.timeout); - } - - if (err) { - this._messageRejected(message, err); - } else { - this._messageResolved(message, thisCanSend); + if (this._confirm) { + this._unconfirmedMessages.push(message); + thisCanSend = this._channelHasRoom = channel.sendToQueue( + message.queue, + message.content, + message.options, + (err) => { + if (message.isTimedout) { + return; + } + + if (message.timeout) { + clearTimeout(message.timeout); + } + + if (err) { + this._messageRejected(message, err); + } else { + this._messageResolved(message, thisCanSend); + } } + ); + } else { + if (message.timeout) { + clearTimeout(message.timeout); } - ); + thisCanSend = this._channelHasRoom = channel.sendToQueue( + message.queue, + message.content, + message.options + ); + message.resolve(thisCanSend); + } break; } /* istanbul ignore next */ diff --git a/src/index.ts b/src/index.ts index 01e7224..3281ba8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,7 +9,12 @@ export type { ConnectionUrl, IAmqpConnectionManager as AmqpConnectionManager, } from './AmqpConnectionManager.js'; -export type { CreateChannelOpts, default as ChannelWrapper, SetupFunc } from './ChannelWrapper.js'; +export type { + CreateChannelOpts, + default as ChannelWrapper, + SetupFunc, + Channel, +} from './ChannelWrapper.js'; export function connect( urls: ConnectionUrl | ConnectionUrl[] | undefined | null, diff --git a/test/ChannelWrapperTest.ts b/test/ChannelWrapperTest.ts index 8e3f23e..643c36e 100644 --- a/test/ChannelWrapperTest.ts +++ b/test/ChannelWrapperTest.ts @@ -29,7 +29,9 @@ function makeMessage(content: string): amqplib.Message { }; } -function getUnderlyingChannel(channelWrapper: ChannelWrapper): fixtures.FakeConfirmChannel { +function getUnderlyingChannel( + channelWrapper: ChannelWrapper +): fixtures.FakeConfirmChannel | fixtures.FakeChannel { const channel = (channelWrapper as any)._channel; if (!channel) { throw new Error('No underlying channel'); @@ -217,6 +219,19 @@ describe('ChannelWrapper', function () { expect(lastArgs(errorHandler)?.[0]?.message).to.equal('No channel for you!'); }); + it('should create plain channel', async function () { + const setup = jest.fn().mockImplementation(() => promiseTools.delay(10)); + + connectionManager.simulateConnect(); + const channelWrapper = new ChannelWrapper(connectionManager, { + setup, + confirm: false, + }); + await channelWrapper.waitForConnect(); + + expect(setup).to.have.beenCalledTimes(1); + }); + it('should work if there are no setup functions', async function () { connectionManager.simulateConnect(); const channelWrapper = new ChannelWrapper(connectionManager); diff --git a/test/fixtures.ts b/test/fixtures.ts index ff75607..d5a89c9 100644 --- a/test/fixtures.ts +++ b/test/fixtures.ts @@ -63,7 +63,7 @@ export class FakeAmqp { } } -export class FakeConfirmChannel extends EventEmitter { +export class FakeChannel extends EventEmitter { publish = jest .fn() .mockImplementation( @@ -71,11 +71,9 @@ export class FakeConfirmChannel extends EventEmitter { _exchange: string, _routingKey: string, content: Buffer, - _options?: Options.Publish, - callback?: (err: any, ok: Replies.Empty) => void + _options?: Options.Publish ): boolean => { this.emit('publish', content); - callback?.(null, {}); return true; } ); @@ -83,14 +81,8 @@ export class FakeConfirmChannel extends EventEmitter { sendToQueue = jest .fn() .mockImplementation( - ( - _queue: string, - content: Buffer, - _options?: Options.Publish, - callback?: (err: any, ok: Replies.Empty) => void - ): boolean => { + (_queue: string, content: Buffer, _options?: Options.Publish): boolean => { this.emit('sendToQueue', content); - callback?.(null, {}); return true; } ); @@ -154,6 +146,39 @@ export class FakeConfirmChannel extends EventEmitter { prefetch = jest.fn().mockImplementation((_prefetch: number, _isGlobal: boolean): void => {}); } +export class FakeConfirmChannel extends FakeChannel { + publish = jest + .fn() + .mockImplementation( + ( + _exchange: string, + _routingKey: string, + content: Buffer, + _options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void + ): boolean => { + this.emit('publish', content); + callback?.(null, {}); + return true; + } + ); + + sendToQueue = jest + .fn() + .mockImplementation( + ( + _queue: string, + content: Buffer, + _options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void + ): boolean => { + this.emit('sendToQueue', content); + callback?.(null, {}); + return true; + } + ); +} + export class FakeConnection extends EventEmitter { url: string; _closed = false; @@ -164,6 +189,10 @@ export class FakeConnection extends EventEmitter { this._closed = false; } + createChannel() { + return Promise.resolve(new exports.FakeChannel()); + } + createConfirmChannel() { return Promise.resolve(new exports.FakeConfirmChannel()); } diff --git a/test/integrationTest.ts b/test/integrationTest.ts index 061befc..b4389b5 100644 --- a/test/integrationTest.ts +++ b/test/integrationTest.ts @@ -1,4 +1,4 @@ -import { ConfirmChannel, ConsumeMessage } from 'amqplib'; +import { Channel, ConfirmChannel, ConsumeMessage } from 'amqplib'; import chai from 'chai'; import chaiJest from 'chai-jest'; import pEvent from 'p-event'; @@ -135,6 +135,41 @@ describe('Integration tests', () => { await receiveWrapper.close(); }); + it('send and receive messages with plain channel', async () => { + const queueName = 'testQueue2'; + const content = `hello world - ${Date.now()}`; + + connection = new AmqpConnectionManager('amqp://localhost'); + const sendChannel = connection.createChannel({ + confirm: false, + setup: async (channel: Channel) => { + await channel.assertQueue(queueName, { durable: false, autoDelete: true }); + }, + }); + + const receiveChannel = connection.createChannel({ + confirm: false, + setup: async (channel: Channel) => { + await channel.assertQueue(queueName, { durable: false, autoDelete: true }); + }, + }); + + await connection.connect(); + + const rxPromise = defer(); + await receiveChannel.consume(queueName, (message) => { + rxPromise.resolve(message); + }); + + await sendChannel.sendToQueue(queueName, content); + + const result = await timeout(rxPromise.promise, 3000); + expect(result.content.toString()).to.equal(content); + + await sendChannel.close(); + await receiveChannel.close(); + }); + it('RPC', async () => { const queueName = 'testQueueRpc';