From b866ef25ebe97c1cf4fe421835291584cb738f41 Mon Sep 17 00:00:00 2001 From: Jason Walton Date: Thu, 26 Aug 2021 11:00:39 -0400 Subject: [PATCH] perf: Send messages to underlying channel in synchronous batches. --- src/ChannelWrapper.ts | 160 +++++++++++++++++++++++++----------------- 1 file changed, 97 insertions(+), 63 deletions(-) diff --git a/src/ChannelWrapper.ts b/src/ChannelWrapper.ts index a73f2a8..6a050f4 100644 --- a/src/ChannelWrapper.ts +++ b/src/ChannelWrapper.ts @@ -4,6 +4,8 @@ import { EventEmitter } from 'events'; import pb from 'promise-breaker'; import { IAmqpConnectionManager } from './AmqpConnectionManager.js'; +const MAX_MESSAGES_PER_BATCH = 1000; + export type SetupFunc = | ((channel: ConfirmChannel, callback: (error?: Error) => void) => void) | ((channel: ConfirmChannel) => Promise); @@ -106,6 +108,11 @@ export default class ChannelWrapper extends EventEmitter { */ private _workerNumber = 0; + /** + * True if the underlying channel has room for more messages. + */ + private _channelHasRoom = true; + public name?: string; addListener(event: string, listener: (...args: any[]) => void): this; @@ -337,7 +344,9 @@ export default class ChannelWrapper extends EventEmitter { const channel = await connection.createConfirmChannel(); this._channel = channel; + this._channelHasRoom = true; channel.on('close', () => this._onChannelClose(channel)); + channel.on('drain', () => this._onChannelDrain()); this._settingUp = Promise.all( this._setups.map((setupFn) => @@ -379,6 +388,12 @@ export default class ChannelWrapper extends EventEmitter { // Wait for another reconnect to create a new channel. } + /** Called whenever the channel drains. */ + private _onChannelDrain(): void { + this._channelHasRoom = true; + this._startWorker(); + } + // Called whenever we disconnect from the AMQP server. private _onDisconnect(ex: { err: Error & { code: number } }): void { this._irrecoverableCode = ex.err instanceof Error ? ex.err.code : undefined; @@ -425,7 +440,9 @@ export default class ChannelWrapper extends EventEmitter { } private _shouldPublish(): boolean { - return this._messages.length > 0 && !this._settingUp && !!this._channel; + return ( + this._messages.length > 0 && !this._settingUp && !!this._channel && this._channelHasRoom + ); } // Start publishing queued messages, if there isn't already a worker doing this. @@ -461,6 +478,27 @@ export default class ChannelWrapper extends EventEmitter { } } + private _getEncodedMessage(content: Message['content']): Buffer { + let encodedMessage: Buffer; + + if (this._json) { + encodedMessage = Buffer.from(JSON.stringify(content)); + } else if (typeof content === 'string') { + encodedMessage = Buffer.from(content); + } else if (content instanceof Buffer) { + encodedMessage = content; + } else if (typeof content === 'object' && typeof (content as any).toString === 'function') { + encodedMessage = Buffer.from((content as any).toString()); + } else { + console.warn( + 'amqp-connection-manager: Sending JSON message, but json option not speicifed' + ); + encodedMessage = Buffer.from(JSON.stringify(content)); + } + + return encodedMessage; + } + private _publishQueuedMessages(workerNumber: number): void { const channel = this._channel; if ( @@ -474,76 +512,72 @@ export default class ChannelWrapper extends EventEmitter { return; } - const message = this._messages.shift(); - if (message) { - this._unconfirmedMessages.push(message); - - try { - let encodedMessage: Buffer | undefined; - if (this._json) { - encodedMessage = Buffer.from(JSON.stringify(message.content)); - } else if (typeof message.content === 'string') { - encodedMessage = Buffer.from(message.content); - } else if (message.content instanceof Buffer) { - encodedMessage = message.content; - } else if ( - typeof message.content === 'object' && - typeof (message.content as any).toString === 'function' - ) { - encodedMessage = Buffer.from((message.content as any).toString()); - } else { - this._messageRejected(message, new Error('Invalid message content')); + try { + // Send messages in batches of 1000 - don't want to starve the event loop. + let sendsLeft = MAX_MESSAGES_PER_BATCH; + while (this._channelHasRoom && this._messages.length > 0 && sendsLeft > 0) { + sendsLeft--; + + const message = this._messages.shift(); + if (!message) { + break; } - let result = true; - if (encodedMessage) { - switch (message.type) { - case 'publish': - result = channel.publish( - message.exchange, - message.routingKey, - encodedMessage, - message.options, - (err) => { - if (err) { - this._messageRejected(message, err); - } else { - this._messageResolved(message, result); - } + this._unconfirmedMessages.push(message); + + const encodedMessage = this._getEncodedMessage(message.content); + + switch (message.type) { + case 'publish': { + let thisCanSend = true; + thisCanSend = this._channelHasRoom = channel.publish( + message.exchange, + message.routingKey, + encodedMessage, + message.options, + (err) => { + if (err) { + this._messageRejected(message, err); + } else { + this._messageResolved(message, thisCanSend); } - ); - break; - case 'sendToQueue': - result = channel.sendToQueue( - message.queue, - encodedMessage, - message.options, - (err) => { - if (err) { - this._messageRejected(message, err); - } else { - this._messageResolved(message, result); - } + } + ); + break; + } + case 'sendToQueue': { + let thisCanSend = true; + thisCanSend = this._channelHasRoom = channel.sendToQueue( + message.queue, + encodedMessage, + message.options, + (err) => { + if (err) { + this._messageRejected(message, err); + } else { + this._messageResolved(message, thisCanSend); } - ); - break; - /* istanbul ignore next */ - default: - throw new Error(`Unhandled message type ${(message as any).type}`); + } + ); + break; } + /* istanbul ignore next */ + default: + throw new Error(`Unhandled message type ${(message as any).type}`); } + } - if (result) { - setImmediate(() => this._publishQueuedMessages(workerNumber)); - } else { - channel.once('drain', () => this._publishQueuedMessages(workerNumber)); - } - - /* istanbul ignore next */ - } catch (err) { - this.emit('error', err); - this._working = false; + // If we didn't send all the messages, send some more... + if (this._channelHasRoom && this._messages.length > 0) { + setImmediate(() => this._publishQueuedMessages(workerNumber)); } + + this._working = false; + + /* istanbul ignore next */ + } catch (err) { + this._working = false; + this.emit('error', err); } }