Skip to content

Commit

Permalink
Add support to cancel send()
Browse files Browse the repository at this point in the history
  • Loading branch information
ramya-rao-a committed Aug 3, 2020
1 parent 6978967 commit a01480c
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 4 deletions.
49 changes: 45 additions & 4 deletions lib/awaitableSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Session } from "./session";
import {
OperationTimeoutError, InsufficientCreditError, SendOperationFailedError
} from "./errorDefinitions";
import { AbortSignalLike, abortErrorName } from "./util/utils";

/**
* Describes the interface for the send operation Promise which contains a reference to resolve,
Expand All @@ -22,6 +23,7 @@ export interface PromiseLike {
resolve: (value?: any) => void;
reject: (reason?: any) => void;
timer: NodeJS.Timer;
onAbort: () => void;
}

/**
Expand Down Expand Up @@ -170,9 +172,12 @@ export class AwaitableSender extends BaseSender {
* @param {number} [format] The message format. Specify this if a message with custom format needs
* to be sent. `0` implies the standard AMQP 1.0 defined format. If no value is provided, then the
* given message is assumed to be of type Message interface and encoded appropriately.
* @param {AbortSignalLike} abortSignal A signal to cancel the send operation. This does not
* guarantee that the message will not be sent. It only stops listening for an acknowledgement from
* the remote endpoint.
* @returns {Promise<Delivery>} Promise<Delivery> The delivery information about the sent message.
*/
send(msg: Message | Buffer, tag?: Buffer | string, format?: number): Promise<Delivery> {
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, abortSignal?: AbortSignalLike): Promise<Delivery> {
return new Promise<Delivery>((resolve, reject) => {
log.sender("[%s] Sender '%s' on amqp session '%s', credit: %d available: %d",
this.connection.id, this.name, this.session.id, this.credit,
Expand All @@ -187,12 +192,48 @@ export class AwaitableSender extends BaseSender {
return reject(new OperationTimeoutError(message));
}, this.sendTimeoutInSeconds * 1000);

const onAbort = () => {
const promise = this.deliveryDispositionMap.get(delivery.id) as PromiseLike;
clearTimeout(promise.timer);
const deleteResult = this.deliveryDispositionMap.delete(delivery.id);
log.sender(
"[%s] Event: 'abort', Successfully deleted the delivery with id %d from the " +
" map of sender '%s' on amqp session '%s' and cleared the timer: %s.",
this.connection.id, delivery.id, this.name, this.session.id, deleteResult
);
const err = new Error("Send request has been cancelled.");
err.name = abortErrorName;
log.error("[%s] %s", this.connection.id, err.message);
promise.reject(err);
};

const removeAbortListener = () => {
if (abortSignal) {
abortSignal.removeEventListener("abort", onAbort);
}
};

const delivery = (this._link as RheaSender).send(msg, tag, format);
this.deliveryDispositionMap.set(delivery.id, {
resolve: resolve,
reject: reject,
timer: timer
resolve: () => {
resolve();
removeAbortListener();
},
reject: (reason?: any) => {
reject(reason);
removeAbortListener();
},
timer: timer,
onAbort
});

if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
} else {
abortSignal.addEventListener("abort", onAbort);
}
}
} else {
// Please send the message after some time.
const msg =
Expand Down
55 changes: 55 additions & 0 deletions test/sender.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as rhea from "rhea";
import { assert } from "chai";
import { Connection } from "../lib/index";
import { AbortController } from "@azure/abort-controller";
import { abortErrorName } from "../lib/util/utils";

describe("Sender", () => {
let mockService: rhea.Container;
Expand Down Expand Up @@ -116,4 +118,57 @@ describe("Sender", () => {
}
});
});

describe("AbortSignal", () => {
it("send() fails with aborted signal", async () => {
const connection = new Connection({
port: mockServiceListener.address().port,
});
await connection.open();
const sender = await connection.createAwaitableSender();

const abortController = new AbortController();
const abortSignal = abortController.signal;

// Pass an already aborted signal to send()
abortController.abort();
const sendPromise = sender.send({ body: "hello" }, undefined, undefined, abortSignal);

let abortErrorThrown = false;
try {
await sendPromise;
} catch (error) {
abortErrorThrown = error.name === abortErrorName;
}

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
await connection.close();
});

it("send() fails when abort signal is fired", async () => {
const connection = new Connection({
port: mockServiceListener.address().port,
});
await connection.open();
const sender = await connection.createAwaitableSender();

const abortController = new AbortController();
const abortSignal = abortController.signal;

// Fire abort signal after passing it to send()
const sendPromise = sender.send({ body: "hello" }, undefined, undefined, abortSignal);
abortController.abort();


let abortErrorThrown = false;
try {
await sendPromise;
} catch (error) {
abortErrorThrown = error.name === abortErrorName;
}

assert.isTrue(abortErrorThrown, "AbortError should have been thrown.");
await connection.close();
});
})
});

0 comments on commit a01480c

Please sign in to comment.