Skip to content

Commit

Permalink
fix: Fix handling of resending messages during a disconnect.
Browse files Browse the repository at this point in the history
The unit tests for this had a fatal flaw - they assumed that if the
connection dropped, we'd get nothing back for any in-flight messages.
This isn't true, though - we'd actually get back an error from amqplib
when the underlying connection fails.  This fixes the tests to reflect
this.  If we rely on amqplib to reject such messages, then moving all
messages from _unconfirmedMessages to _messages on a
reconnect now becomes superfluous.

I also reworked `_publishQueuedMessages()` to be more synchronous.
As it stood, I had to add a lot of pointless delays in my tests to make
sure that the `then` after publishing a message had time to run.

fix #152
  • Loading branch information
jwalton committed Aug 26, 2021
1 parent 8b1338b commit e1457a5
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 121 deletions.
178 changes: 82 additions & 96 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,14 +361,6 @@ export default class ChannelWrapper extends EventEmitter {
return;
}

if (this._unconfirmedMessages.length > 0) {
// requeue any messages that were left unconfirmed when connection was lost
let message: Message | undefined;
while ((message = this._unconfirmedMessages.shift())) {
this._messages.push(message);
}
}

// Since we just connected, publish any queued messages
this._startWorker();
this.emit('connect');
Expand Down Expand Up @@ -450,6 +442,25 @@ export default class ChannelWrapper extends EventEmitter {
return !this._irrecoverableCode || !IRRECOVERABLE_ERRORS.includes(this._irrecoverableCode);
}

private _messageResolved(message: Message, result: boolean) {
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.resolve(result);
}

private _messageRejected(message: Message, err: Error) {
if (!this._channel && this._canWaitReconnection()) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when
// we reconnect.
removeUnconfirmedMessage(this._unconfirmedMessages, message);
this._messages.push(message);
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be
// the broker rejected the message. Either way, reject it back
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.reject(err);
}
}

private _publishQueuedMessages(workerNumber: number): void {
const channel = this._channel;
if (
Expand All @@ -467,97 +478,72 @@ export default class ChannelWrapper extends EventEmitter {
if (message) {
this._unconfirmedMessages.push(message);

Promise.resolve()
.then(() => {
let encodedMessage: Buffer;
if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(message.content));
} else if (typeof message.content === 'string') {
encodedMessage = Buffer.from(message.content);
} else if (message.content instanceof Buffer) {
encodedMessage = message.content;
} else if (
typeof message.content === 'object' &&
typeof (message.content as any).toString === 'function'
) {
encodedMessage = Buffer.from((message.content as any).toString());
} else {
throw new Error('Invalid message content');
try {
let encodedMessage: Buffer | undefined;
if (this._json) {
encodedMessage = Buffer.from(JSON.stringify(message.content));
} else if (typeof message.content === 'string') {
encodedMessage = Buffer.from(message.content);
} else if (message.content instanceof Buffer) {
encodedMessage = message.content;
} else if (
typeof message.content === 'object' &&
typeof (message.content as any).toString === 'function'
) {
encodedMessage = Buffer.from((message.content as any).toString());
} else {
this._messageRejected(message, new Error('Invalid message content'));
}

let result = true;
if (encodedMessage) {
switch (message.type) {
case 'publish':
result = channel.publish(
message.exchange,
message.routingKey,
encodedMessage,
message.options,
(err) => {
if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, result);
}
}
);
break;
case 'sendToQueue':
result = channel.sendToQueue(
message.queue,
encodedMessage,
message.options,
(err) => {
if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, result);
}
}
);
break;
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${(message as any).type}`);
}
}

let result = true;
const sendPromise = (() => {
switch (message.type) {
case 'publish':
return new Promise<boolean>(function (resolve, reject) {
result = channel.publish(
message.exchange,
message.routingKey,
encodedMessage,
message.options,
(err) => {
if (err) {
reject(err);
} else {
resolve(result);
}
}
);
});
case 'sendToQueue':
return new Promise<boolean>(function (resolve, reject) {
result = channel.sendToQueue(
message.queue,
encodedMessage,
message.options,
(err) => {
if (err) {
reject(err);
} else {
resolve(result);
}
}
);
});
/* istanbul ignore next */
default:
throw new Error(`Unhandled message type ${(message as any).type}`);
}
})();
if (result) {
setImmediate(() => this._publishQueuedMessages(workerNumber));
} else {
channel.once('drain', () => this._publishQueuedMessages(workerNumber));
}

if (result) {
this._publishQueuedMessages(workerNumber);
} else {
channel.once('drain', () => this._publishQueuedMessages(workerNumber));
}
return sendPromise;
})
.then(
(result) => {
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.resolve(result);
},

(err) => {
if (!this._channel && this._canWaitReconnection()) {
// Tried to write to a closed channel. Leave the message in the queue and we'll try again when
// we reconnect.
removeUnconfirmedMessage(this._unconfirmedMessages, message);
this._messages.push(message);
} else {
// Something went wrong trying to send this message - could be JSON.stringify failed, could be
// the broker rejected the message. Either way, reject it back
removeUnconfirmedMessage(this._unconfirmedMessages, message);
message.reject(err);
}
}
)
.catch(
/* istanbul ignore next */ (err) => {
this.emit('error', err);
this._working = false;
}
);
/* istanbul ignore next */
} catch (err) {
this.emit('error', err);
this._working = false;
}
}
}

Expand Down
Loading

0 comments on commit e1457a5

Please sign in to comment.