Skip to content

Commit

Permalink
feat: plain channel
Browse files Browse the repository at this point in the history
Options to use channel without confirms
  • Loading branch information
luddd3 committed Dec 29, 2021
1 parent 4ede1b2 commit 328d31d
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 74 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ Options:
Note that `this` inside the setup function will the returned ChannelWrapper.
The ChannelWrapper has a special `context` member you can use to store
arbitrary data in.
- `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()`
- `options.json` - if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()`
are plain JSON objects. These will be encoded automatically before being sent.
- `options.confirm` - if true (default), the created channel will be a ConfirmChannel
- `options.publishTimeout` - a default timeout for messages published to this channel.

### AmqpConnectionManager#isConnected()
Expand Down
157 changes: 98 additions & 59 deletions src/ChannelWrapper.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import * as amqplib from 'amqplib';
import { ConfirmChannel, Options } from 'amqplib';
import type * as amqplib from 'amqplib';
import { Options } from 'amqplib';
import { EventEmitter } from 'events';
import pb from 'promise-breaker';
import { IAmqpConnectionManager } from './AmqpConnectionManager.js';

const MAX_MESSAGES_PER_BATCH = 1000;

export type Channel = amqplib.ConfirmChannel | amqplib.Channel;

export type SetupFunc =
| ((channel: ConfirmChannel, callback: (error?: Error) => void) => void)
| ((channel: ConfirmChannel) => Promise<void>);
| ((channel: Channel, callback: (error?: Error) => void) => void)
| ((channel: Channel) => Promise<void>)
| ((channel: amqplib.ConfirmChannel, callback: (error?: Error) => void) => void)
| ((channel: amqplib.ConfirmChannel) => Promise<void>);

export interface CreateChannelOpts {
/** Name for this channel. Used for debugging. */
Expand All @@ -18,6 +22,10 @@ export interface CreateChannelOpts {
* This function should either accept a callback, or return a Promise. See addSetup below
*/
setup?: SetupFunc;
/**
* True to create a ConfirmChannel (default). False to create a regular Channel.
*/
confirm?: boolean;
/**
* if true, then ChannelWrapper assumes all messages passed to publish() and sendToQueue() are plain JSON objects.
* These will be encoded automatically before being sent.
Expand All @@ -33,7 +41,7 @@ interface PublishMessage {
type: 'publish';
exchange: string;
routingKey: string;
content: Buffer | string | unknown;
content: Buffer;
options?: amqplib.Options.Publish;
resolve: (result: boolean) => void;
reject: (err: Error) => void;
Expand All @@ -44,7 +52,7 @@ interface PublishMessage {
interface SendToQueueMessage {
type: 'sendToQueue';
queue: string;
content: Buffer | string | unknown;
content: Buffer;
options?: amqplib.Options.Publish;
resolve: (result: boolean) => void;
reject: (err: Error) => void;
Expand Down Expand Up @@ -119,8 +127,12 @@ export default class ChannelWrapper extends EventEmitter {
* have been run on this channel until `@_settingUp` is either null or
* resolved.
*/
private _channel?: amqplib.ConfirmChannel;
private _channel?: Channel;

/**
* True to create a ConfirmChannel. False to create a regular Channel.
*/
private _confirm = true;
/**
* True if the "worker" is busy sending messages. False if we need to
* start the worker to get stuff done.
Expand Down Expand Up @@ -234,11 +246,7 @@ export default class ChannelWrapper extends EventEmitter {
* @param {function} [done] - Optional callback.
* @returns {void | Promise} - Resolves when complete.
*/
removeSetup(
setup: SetupFunc,
teardown?: SetupFunc,
done?: pb.Callback<void>
): Promise<void> {
removeSetup(setup: SetupFunc, teardown?: SetupFunc, done?: pb.Callback<void>): Promise<void> {
return pb.addCallback(done, () => {
this._setups = this._setups.filter((s) => s !== setup);

Expand Down Expand Up @@ -291,7 +299,7 @@ export default class ChannelWrapper extends EventEmitter {
type: 'publish',
exchange,
routingKey,
content,
content: this._getEncodedMessage(content),
resolve,
reject,
options: opts,
Expand Down Expand Up @@ -319,6 +327,8 @@ export default class ChannelWrapper extends EventEmitter {
options?: PublishOptions,
done?: pb.Callback<boolean>
): Promise<boolean> {
const encodedContent = this._getEncodedMessage(content);

return pb.addCallback(
done,
new Promise<boolean>((resolve, reject) => {
Expand All @@ -327,7 +337,7 @@ export default class ChannelWrapper extends EventEmitter {
{
type: 'sendToQueue',
queue,
content,
content: encodedContent,
resolve,
reject,
options: opts,
Expand Down Expand Up @@ -378,6 +388,7 @@ export default class ChannelWrapper extends EventEmitter {
this._onConnect = this._onConnect.bind(this);
this._onDisconnect = this._onDisconnect.bind(this);
this._connectionManager = connectionManager;
this._confirm = options.confirm ?? true;
this.name = options.name;

this._publishTimeout = options.publishTimeout;
Expand All @@ -404,7 +415,12 @@ export default class ChannelWrapper extends EventEmitter {
this._irrecoverableCode = undefined;

try {
const channel = await connection.createConfirmChannel();
let channel: Channel;
if (this._confirm) {
channel = await connection.createConfirmChannel();
} else {
channel = await connection.createChannel();
}

this._channel = channel;
this._channelHasRoom = true;
Expand Down Expand Up @@ -447,7 +463,7 @@ export default class ChannelWrapper extends EventEmitter {
}

// Called whenever the channel closes.
private _onChannelClose(channel: amqplib.ConfirmChannel): void {
private _onChannelClose(channel: Channel): void {
if (this._channel === channel) {
this._channel = undefined;
}
Expand Down Expand Up @@ -552,7 +568,7 @@ export default class ChannelWrapper extends EventEmitter {
}
}

private _getEncodedMessage(content: Message['content']): Buffer {
private _getEncodedMessage(content: Buffer | string | unknown): Buffer {
let encodedMessage: Buffer;

if (this._json) {
Expand Down Expand Up @@ -597,58 +613,81 @@ export default class ChannelWrapper extends EventEmitter {
break;
}

this._unconfirmedMessages.push(message);

const encodedMessage = this._getEncodedMessage(message.content);
let thisCanSend = true;

switch (message.type) {
case 'publish': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.publish(
message.exchange,
message.routingKey,
encodedMessage,
message.options,
(err) => {
if (message.isTimedout) {
return;
}

if (message.timeout) {
clearTimeout(message.timeout);
}

if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, thisCanSend);
if (this._confirm) {
this._unconfirmedMessages.push(message);
thisCanSend = this._channelHasRoom = channel.publish(
message.exchange,
message.routingKey,
message.content,
message.options,
(err) => {
if (message.isTimedout) {
return;
}

if (message.timeout) {
clearTimeout(message.timeout);
}

if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, thisCanSend);
}
}
);
} else {
if (message.timeout) {
clearTimeout(message.timeout);
}
);
thisCanSend = this._channelHasRoom = channel.publish(
message.exchange,
message.routingKey,
message.content,
message.options
);
message.resolve(thisCanSend);
}
break;
}
case 'sendToQueue': {
let thisCanSend = true;
thisCanSend = this._channelHasRoom = channel.sendToQueue(
message.queue,
encodedMessage,
message.options,
(err) => {
if (message.isTimedout) {
return;
}

if (message.timeout) {
clearTimeout(message.timeout);
}

if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, thisCanSend);
if (this._confirm) {
this._unconfirmedMessages.push(message);
thisCanSend = this._channelHasRoom = channel.sendToQueue(
message.queue,
message.content,
message.options,
(err) => {
if (message.isTimedout) {
return;
}

if (message.timeout) {
clearTimeout(message.timeout);
}

if (err) {
this._messageRejected(message, err);
} else {
this._messageResolved(message, thisCanSend);
}
}
);
} else {
if (message.timeout) {
clearTimeout(message.timeout);
}
);
thisCanSend = this._channelHasRoom = channel.sendToQueue(
message.queue,
message.content,
message.options
);
message.resolve(thisCanSend);
}
break;
}
/* istanbul ignore next */
Expand Down
7 changes: 6 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ export type {
ConnectionUrl,
IAmqpConnectionManager as AmqpConnectionManager,
} from './AmqpConnectionManager.js';
export type { CreateChannelOpts, default as ChannelWrapper, SetupFunc } from './ChannelWrapper.js';
export type {
CreateChannelOpts,
default as ChannelWrapper,
SetupFunc,
Channel,
} from './ChannelWrapper.js';

export function connect(
urls: ConnectionUrl | ConnectionUrl[] | undefined | null,
Expand Down
17 changes: 16 additions & 1 deletion test/ChannelWrapperTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ function makeMessage(content: string): amqplib.Message {
};
}

function getUnderlyingChannel(channelWrapper: ChannelWrapper): fixtures.FakeConfirmChannel {
function getUnderlyingChannel(
channelWrapper: ChannelWrapper
): fixtures.FakeConfirmChannel | fixtures.FakeChannel {
const channel = (channelWrapper as any)._channel;
if (!channel) {
throw new Error('No underlying channel');
Expand Down Expand Up @@ -217,6 +219,19 @@ describe('ChannelWrapper', function () {
expect(lastArgs(errorHandler)?.[0]?.message).to.equal('No channel for you!');
});

it('should create plain channel', async function () {
const setup = jest.fn().mockImplementation(() => promiseTools.delay(10));

connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager, {
setup,
confirm: false,
});
await channelWrapper.waitForConnect();

expect(setup).to.have.beenCalledTimes(1);
});

it('should work if there are no setup functions', async function () {
connectionManager.simulateConnect();
const channelWrapper = new ChannelWrapper(connectionManager);
Expand Down
Loading

0 comments on commit 328d31d

Please sign in to comment.