Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancellation support to all async operations #66

Merged
merged 18 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions lib/awaitableSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Session } from "./session";
import {
OperationTimeoutError, InsufficientCreditError, SendOperationFailedError
} from "./errorDefinitions";
import { AbortSignalLike, createAbortError } from "./util/utils";

/**
* Describes the interface for the send operation Promise which contains a reference to resolve,
Expand Down Expand Up @@ -170,9 +171,12 @@ export class AwaitableSender extends BaseSender {
* @param {number} [format] The message format. Specify this if a message with custom format needs
* to be sent. `0` implies the standard AMQP 1.0 defined format. If no value is provided, then the
* given message is assumed to be of type Message interface and encoded appropriately.
* @param {AbortSignalLike} abortSignal A signal to cancel the send operation. This does not
* guarantee that the message will not be sent. It only stops listening for an acknowledgement from
* the remote endpoint.
* @returns {Promise<Delivery>} Promise<Delivery> The delivery information about the sent message.
*/
send(msg: Message | Buffer, tag?: Buffer | string, format?: number): Promise<Delivery> {
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, abortSignal?: AbortSignalLike): Promise<Delivery> {
return new Promise<Delivery>((resolve, reject) => {
log.sender("[%s] Sender '%s' on amqp session '%s', credit: %d available: %d",
this.connection.id, this.name, this.session.id, this.credit,
Expand All @@ -187,12 +191,44 @@ export class AwaitableSender extends BaseSender {
return reject(new OperationTimeoutError(message));
}, this.sendTimeoutInSeconds * 1000);

const onAbort = () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this into a function rather than inlining it here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you thinking of a function that accepts delivery as an argument, and then still in-lining, something like:

const onAbort = () => {
  this.abort(delivery);
}

chradek marked this conversation as resolved.
Show resolved Hide resolved
const promise = this.deliveryDispositionMap.get(delivery.id) as PromiseLike;
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
clearTimeout(promise.timer);
const deleteResult = this.deliveryDispositionMap.delete(delivery.id);
log.sender(
"[%s] Event: 'abort', Successfully deleted the delivery with id %d from the " +
" map of sender '%s' on amqp session '%s' and cleared the timer: %s.",
this.connection.id, delivery.id, this.name, this.session.id, deleteResult
);
const err = createAbortError("Send request has been cancelled.");
log.error("[%s] %s", this.connection.id, err.message);
promise.reject(err);
};

const removeAbortListener = () => {
abortSignal?.removeEventListener("abort", onAbort);
};

const delivery = (this._link as RheaSender).send(msg, tag, format);
this.deliveryDispositionMap.set(delivery.id, {
resolve: resolve,
reject: reject,
resolve: () => {
resolve();
removeAbortListener();
},
reject: (reason?: any) => {
reject(reason);
removeAbortListener();
},
timer: timer
});

if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
} else {
// Please send the message after some time.
const msg =
Expand Down
133 changes: 116 additions & 17 deletions lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Sender, SenderOptions } from "./sender";
import { Receiver, ReceiverOptions } from "./receiver";
import { Container } from "./container";
import { defaultOperationTimeoutInSeconds } from "./util/constants";
import { Func, EmitParameters, emitEvent } from "./util/utils";
import { Func, EmitParameters, emitEvent, AbortSignalLike, createAbortError } from "./util/utils";
import {
ConnectionEvents, SessionEvents, SenderEvents, ReceiverEvents, create_connection, websocket_connect,
ConnectionOptions as RheaConnectionOptions, Connection as RheaConnection, AmqpError, Dictionary,
Expand Down Expand Up @@ -48,6 +48,36 @@ export interface ReceiverOptionsWithSession extends ReceiverOptions {
session?: Session;
}

/**
* Set of options to use when running Connection.open()
*/
export interface ConnectionOpenOptions {
/**
* A signal used to cancel the Connection.open() operation.
*/
abortSignal?: AbortSignalLike;
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Set of options to use when running Connection.close()
*/
export interface ConnectionCloseOptions {
/**
* A signal used to cancel the Connection.close() operation.
*/
abortSignal?: AbortSignalLike;
}

/**
* Set of options to use when running Connection.createSession()
*/
export interface SessionCreateOptions {
/**
* A signal used to cancel the Connection.createSession() operation.
*/
abortSignal?: AbortSignalLike;
}

/**
* Describes the options that can be provided while creating an AMQP connection.
* @interface ConnectionOptions
Expand Down Expand Up @@ -264,17 +294,20 @@ export class Connection extends Entity {

/**
* Creates a new amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* @return {Promise<Connection>} Promise<Connection>
* - **Resolves** the promise with the Connection object when rhea emits the "connection_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_close" event
* while trying to establish an amqp connection.
* while trying to establish an amqp connection or with an AbortError if the operation was cancelled.
*/
open(): Promise<Connection> {
open(options?: ConnectionOpenOptions): Promise<Connection> {
return new Promise((resolve, reject) => {
if (!this.isOpen()) {

let onOpen: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options?.abortSignal;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this must work or else CI wouldn't run but the package.json says we're using TS 3.5.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because 3.5.1 is the minimum version 😄 But that does remind me that before a new version is published, we should evaluate if down-leveled types are needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right! :)

It's a dev dependency anyways - so should we bump it up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed optional chaining in c12aeed to avoid rocking the boat regarding TS versions

let waitTimer: any;

const removeListeners: Function = () => {
Expand All @@ -283,6 +316,7 @@ export class Connection extends Entity {
this._connection.removeListener(ConnectionEvents.connectionOpen, onOpen);
this._connection.removeListener(ConnectionEvents.connectionClose, onClose);
this._connection.removeListener(ConnectionEvents.disconnected, onClose);
abortSignal?.removeEventListener("abort", onAbort);
};

onOpen = (context: RheaEventContext) => {
Expand All @@ -299,6 +333,14 @@ export class Connection extends Entity {
return reject(err);
};

onAbort = () => {
removeListeners();
this._connection.close();
chradek marked this conversation as resolved.
Show resolved Hide resolved
const err = createAbortError("Connection open request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};

const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to open the amqp connection "${this.id}" due to operation timeout.`;
Expand All @@ -314,6 +356,14 @@ export class Connection extends Entity {
log.connection("[%s] Trying to create a new amqp connection.", this.id);
this._connection.connect();
this.actionInitiated++;

if (abortSignal) {
ramya-rao-a marked this conversation as resolved.
Show resolved Hide resolved
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
} else {
return resolve(this);
}
Expand All @@ -323,25 +373,33 @@ export class Connection extends Entity {

/**
* Closes the amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* When the abort signal in the options is fired, the local endpoint is closed.
* This does not guarantee that the remote has closed as well. It only stops listening for
* an acknowledgement that the remote endpoint is closed as well.
* @return {Promise<void>} Promise<void>
* - **Resolves** the promise when rhea emits the "connection_close" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "connection_error" event while
* trying to close an amqp connection.
* trying to close an amqp connection or with an AbortError if the operation was cancelled.
*/
close(): Promise<void> {
close(options?: ConnectionCloseOptions): Promise<void> {
return new Promise<void>((resolve, reject) => {
log.error("[%s] The connection is open ? -> %s", this.id, this.isOpen());
if (this.isOpen()) {
let onClose: Func<RheaEventContext, void>;
let onError: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options?.abortSignal;
let waitTimer: any;

const removeListeners = () => {
clearTimeout(waitTimer);
this.actionInitiated--;
this._connection.removeListener(ConnectionEvents.connectionError, onError);
this._connection.removeListener(ConnectionEvents.connectionClose, onClose);
this._connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
abortSignal?.removeEventListener("abort", onAbort);
};

onClose = (context: RheaEventContext) => {
Expand All @@ -366,6 +424,13 @@ export class Connection extends Entity {
log.error("[%s] Connection got disconnected while closing itself: %O.", this.id, error);
};

onAbort = () => {
removeListeners();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious here as well what happens if the server does close the connection with an error.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the way rhea is set up is to fire events and not throw errors. If there are listeners, they respond, else the fired event gets ignored. There is no unhandled exceptions

const err = createAbortError("Connection close request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};

const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to close the amqp connection "${this.id}" due to operation timeout.`;
Expand All @@ -380,6 +445,14 @@ export class Connection extends Entity {
waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);
this._connection.close();
this.actionInitiated++;

if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
} else {
return resolve();
}
Expand Down Expand Up @@ -452,19 +525,22 @@ export class Connection extends Entity {

/**
* Creates an amqp session on the provided amqp connection.
* @param options A set of options including a signal used to cancel the operation.
* @return {Promise<Session>} Promise<Session>
* - **Resolves** the promise with the Session object when rhea emits the "session_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "session_close" event while
* trying to create an amqp session.
* trying to create an amqp session or with an AbortError if the operation was cancelled.
*/
createSession(): Promise<Session> {
createSession(options?: SessionCreateOptions): Promise<Session> {
return new Promise((resolve, reject) => {
const rheaSession = this._connection.create_session();
const session = new Session(this, rheaSession);
session.actionInitiated++;
let onOpen: Func<RheaEventContext, void>;
let onClose: Func<RheaEventContext, void>;
let onDisconnected: Func<RheaEventContext, void>;
let onAbort: Func<void, void>;
const abortSignal = options?.abortSignal;
let waitTimer: any;

const removeListeners = () => {
Expand All @@ -473,6 +549,7 @@ export class Connection extends Entity {
rheaSession.removeListener(SessionEvents.sessionOpen, onOpen);
rheaSession.removeListener(SessionEvents.sessionClose, onClose);
rheaSession.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
abortSignal?.removeEventListener("abort", onAbort);
};

onOpen = (context: RheaEventContext) => {
Expand All @@ -498,6 +575,14 @@ export class Connection extends Entity {
return reject(error);
};

onAbort = () => {
removeListeners();
rheaSession.close();
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
const err = createAbortError("Create session request has been cancelled.");
log.error("[%s] [%s]", this.id, err.message);
return reject(err);
};

const actionAfterTimeout = () => {
removeListeners();
const msg: string = `Unable to create the amqp session due to operation timeout.`;
Expand All @@ -512,19 +597,30 @@ export class Connection extends Entity {
log.session("[%s] Calling amqp session.begin().", this.id);
waitTimer = setTimeout(actionAfterTimeout, this.options!.operationTimeoutInSeconds! * 1000);
rheaSession.begin();

if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
});
}

/**
* Creates an amqp sender link. It either uses the provided session or creates a new one.
* - **Resolves** the promise with the Sender object when rhea emits the "sender_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "sender_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {SenderOptionsWithSession} options Optional parameters to create a sender link.
* @return {Promise<Sender>} Promise<Sender>.
*/
async createSender(options?: SenderOptionsWithSession): Promise<Sender> {
async createSender(options?: SenderOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<Sender> {
if (options && options.session && options.session.createSender) {
return options.session.createSender(options);
}
const session = await this.createSession();
const session = await this.createSession({ abortSignal: options?.abortSignal });
return session.createSender(options);
}

Expand All @@ -540,24 +636,27 @@ export class Connection extends Entity {
*
* @return Promise<AwaitableSender>.
*/
async createAwaitableSender(options?: AwaitableSenderOptionsWithSession): Promise<AwaitableSender> {
async createAwaitableSender(options?: AwaitableSenderOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<AwaitableSender> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't abortSignal just in the AwaitableSenderOptionsWithSession?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AwaitableSenderOptionsWithSession extends AwaitableSenderOptions which extends BaseSenderOptions
SenderOptionsWithSession extends SenderOptions which extends BaseSenderOptions.
ReceiverOptionsWithSession extends ReceiverOptions

So, the right place to add the abortSignal would have been BaseSenderOptions and ReceiverOptions

But then, createRequestResponseLink() takes both SenderOptions and ReceiverOptions. Both options supporting abortSignal would be not be desirable for this method. So, I could either do the Pick/Omit magic, or keep abortSignal separate as done here. I chose the latter.

if (options && options.session && options.session.createAwaitableSender) {
return options.session.createAwaitableSender(options);
}
const session = await this.createSession();
const session = await this.createSession({ abortSignal: options?.abortSignal });
return session.createAwaitableSender(options);
}

/**
* Creates an amqp receiver link. It either uses the provided session or creates a new one.
* - **Resolves** the promise with the Sender object when rhea emits the "receiver_open" event.
* - **Rejects** the promise with an AmqpError when rhea emits the "receiver_close" event while
* trying to create an amqp session or with an AbortError if the operation was cancelled.
* @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link.
* @return {Promise<Receiver>} Promise<Receiver>.
*/
async createReceiver(options?: ReceiverOptionsWithSession): Promise<Receiver> {
async createReceiver(options?: ReceiverOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<Receiver> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't abortSignal just in the ReceiverOptionsWithSession?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (options && options.session && options.session.createReceiver) {
return options.session.createReceiver(options);
}
const session = await this.createSession();
const session = await this.createSession({ abortSignal: options?.abortSignal });
return session.createReceiver(options);
}

Expand All @@ -572,17 +671,17 @@ export class Connection extends Entity {
* @return {Promise<ReqResLink>} Promise<ReqResLink>
*/
async createRequestResponseLink(senderOptions: SenderOptions, receiverOptions: ReceiverOptions,
providedSession?: Session): Promise<ReqResLink> {
providedSession?: Session, abortSignal?: AbortSignal): Promise<ReqResLink> {
if (!senderOptions) {
throw new Error(`Please provide sender options.`);
}
if (!receiverOptions) {
throw new Error(`Please provide receiver options.`);
}
const session = providedSession || await this.createSession();
const session = providedSession || await this.createSession({ abortSignal });
const [sender, receiver] = await Promise.all([
session.createSender(senderOptions),
session.createReceiver(receiverOptions)
session.createSender({ ...senderOptions, abortSignal }),
session.createReceiver({ ...receiverOptions, abortSignal })
]);
log.connection("[%s] Successfully created the sender '%s' and receiver '%s' on the same " +
"amqp session '%s'.", this.id, sender.name, receiver.name, session.id);
Expand Down
Loading