Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancellation support to all async operations #66

Merged
merged 18 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 58 additions & 7 deletions lib/awaitableSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Session } from "./session";
import {
OperationTimeoutError, InsufficientCreditError, SendOperationFailedError
} from "./errorDefinitions";
import { AbortSignalLike, createAbortError } from "./util/utils";

/**
* Describes the interface for the send operation Promise which contains a reference to resolve,
Expand Down Expand Up @@ -41,6 +42,20 @@ export interface AwaitableSenderOptions extends BaseSenderOptions {
sendTimeoutInSeconds?: number;
}

export interface AwaitableSendOptions {
/**
* The duration in which the promise to send the message should complete (resolve/reject).
* If it is not completed, then the Promise will be rejected after timeout occurs.
* Default: `20 seconds`.
*/
timeoutInSeconds?: number;
/**
* A signal to cancel the send operation. This does not guarantee that the message will not be
* sent. It only stops listening for an acknowledgement from the remote endpoint.
*/
abortSignal?: AbortSignalLike;
}

/**
* Describes the sender where one can await on the message being sent.
* @class AwaitableSender
Expand Down Expand Up @@ -170,17 +185,25 @@ export class AwaitableSender extends BaseSender {
* @param {number} [format] The message format. Specify this if a message with custom format needs
* to be sent. `0` implies the standard AMQP 1.0 defined format. If no value is provided, then the
* given message is assumed to be of type Message interface and encoded appropriately.
* @param {number} [timeoutInSeconds] If provided, this timeout overrides the `sendTimeoutInSeconds`
* that is set when the `AwaitableSender` is created. This timeout represents the duration in which
* the promise to send the message should complete (resolve/reject). If not, the Promise will be
* rejected after timeout.
* @param {AwaitableSendOptions} [options] Options to configure the timeout and cancellation for
* the send operation.
* @returns {Promise<Delivery>} Promise<Delivery> The delivery information about the sent message.
*/
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, timeoutInSeconds?: number): Promise<Delivery> {
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, options?: AwaitableSendOptions): Promise<Delivery> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not considered a breaking change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeoutInSeconds was introduced in #53 which has not been released yet. So, no breaking changes

return new Promise<Delivery>((resolve, reject) => {
log.sender("[%s] Sender '%s' on amqp session '%s', credit: %d available: %d",
this.connection.id, this.name, this.session.id, this.credit,
this.session.outgoing.available());

const abortSignal = options && options.abortSignal;
const timeoutInSeconds = options && options.timeoutInSeconds;

if (abortSignal && abortSignal.aborted) {
const err = createAbortError("Send request has been cancelled.");
log.error("[%s] %s", this.connection.id, err.message);
return reject(err);
}

if (this.sendable()) {
let sendTimeoutInSeconds = this.sendTimeoutInSeconds;
if (typeof timeoutInSeconds === "number" && timeoutInSeconds > 0) sendTimeoutInSeconds = timeoutInSeconds;
Expand All @@ -193,12 +216,40 @@ export class AwaitableSender extends BaseSender {
return reject(new OperationTimeoutError(message));
}, sendTimeoutInSeconds * 1000);

const onAbort = () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this into a function rather than inlining it here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you thinking of a function that accepts delivery as an argument, and then still in-lining, something like:

const onAbort = () => {
  this.abort(delivery);
}

chradek marked this conversation as resolved.
Show resolved Hide resolved
if (this.deliveryDispositionMap.has(delivery.id)) {
const promise = this.deliveryDispositionMap.get(delivery.id) as PromiseLike;
clearTimeout(promise.timer);
const deleteResult = this.deliveryDispositionMap.delete(delivery.id);
log.sender(
"[%s] Event: 'abort', Successfully deleted the delivery with id %d from the " +
" map of sender '%s' on amqp session '%s' and cleared the timer: %s.",
this.connection.id, delivery.id, this.name, this.session.id, deleteResult
);
const err = createAbortError("Send request has been cancelled.");
log.error("[%s] %s", this.connection.id, err.message);
promise.reject(err);
}
};

const removeAbortListener = () => {
if (abortSignal) { abortSignal.removeEventListener("abort", onAbort); }
};

const delivery = (this._link as RheaSender).send(msg, tag, format);
this.deliveryDispositionMap.set(delivery.id, {
resolve: resolve,
reject: reject,
resolve: () => {
resolve();
removeAbortListener();
},
reject: (reason?: any) => {
reject(reason);
removeAbortListener();
},
timer: timer
});

if (abortSignal) { abortSignal.addEventListener("abort", onAbort); }
} else {
// Please send the message after some time.
const msg =
Expand Down
Loading