From bdac23d8e348245660c0724974d32e4b1d496d38 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Tue, 15 Sep 2020 17:00:20 -0700 Subject: [PATCH 1/3] Adding in abortSignal (and also tracing options) to the message handler options. As part of this I can remove it from the SubscribeOptions and SessionSubscribeOptions since they already inherit from MessageHandlerOptionsBase. --- sdk/servicebus/service-bus/review/service-bus.api.md | 6 +++--- sdk/servicebus/service-bus/src/models.ts | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/service-bus/review/service-bus.api.md b/sdk/servicebus/service-bus/review/service-bus.api.md index cc90d35423fe..e3bd17e72f09 100644 --- a/sdk/servicebus/service-bus/review/service-bus.api.md +++ b/sdk/servicebus/service-bus/review/service-bus.api.md @@ -189,7 +189,7 @@ export interface MessageHandlerOptions extends MessageHandlerOptionsBase { } // @public -export interface MessageHandlerOptionsBase { +export interface MessageHandlerOptionsBase extends OperationOptionsBase { autoComplete?: boolean; maxConcurrentCalls?: number; } @@ -459,7 +459,7 @@ export interface ServiceBusSessionReceiver Date: Tue, 15 Sep 2020 17:02:12 -0700 Subject: [PATCH 2/3] Check the abortSignal in message session so sessionReceiver.subscribe() will respect an abort signal like Receiver.subscribe() does. Adding in tests for receiver and sessionreceiver. --- .../service-bus/src/session/messageSession.ts | 13 ++- .../test/internal/abortSignal.spec.ts | 88 +++++++++++++++++++ 2 files changed, 99 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/service-bus/src/session/messageSession.ts b/sdk/servicebus/service-bus/src/session/messageSession.ts index 07daf3222ace..6232af22163f 100644 --- a/sdk/servicebus/service-bus/src/session/messageSession.ts +++ b/sdk/servicebus/service-bus/src/session/messageSession.ts @@ -18,10 +18,14 @@ import { OnAmqpEventAsPromise, OnError, OnMessage } from "../core/messageReceive import { logger } from "../log"; import { DispositionType, InternalReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage"; import { logError, throwErrorIfConnectionClosed } from "../util/errors"; -import { calculateRenewAfterDuration, convertTicksToDate } from "../util/utils"; +import { + calculateRenewAfterDuration, + convertTicksToDate, + StandardAbortMessage +} from "../util/utils"; import { BatchingReceiverLite, MinimalReceiver } from "../core/batchingReceiver"; import { onMessageSettled, DeferredPromiseAndTimer } from "../core/shared"; -import { AbortSignalLike } from "@azure/abort-controller"; +import { AbortError, AbortSignalLike } from "@azure/abort-controller"; import { ReceiverHelper } from "../core/receiverHelper"; import { CreateSessionReceiverOptions, MessageHandlerOptionsBase } from "../models"; @@ -618,6 +622,11 @@ export class MessageSession extends LinkEntity { */ subscribe(onMessage: OnMessage, onError: OnError, options?: MessageHandlerOptionsBase): void { if (!options) options = {}; + + if (options.abortSignal?.aborted) { + throw new AbortError(StandardAbortMessage); + } + this._isReceivingMessagesForSubscriber = true; if (typeof options.maxConcurrentCalls === "number" && options.maxConcurrentCalls > 0) { this.maxConcurrentCalls = options.maxConcurrentCalls; diff --git a/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts index 7d0eceaef068..d9962bb3af41 100644 --- a/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts @@ -18,6 +18,9 @@ import { import { createConnectionContextForTests } from "./unittestUtils"; import { StandardAbortMessage } from "../../src/util/utils"; import { isLinkLocked } from "../utils/misc"; +import { ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver"; +import { Constants } from "@azure/core-amqp"; +import { ServiceBusReceiverImpl } from "../../src/receivers/receiver"; describe("AbortSignal", () => { const testMessageThatDoesntMatter = { @@ -314,4 +317,89 @@ describe("AbortSignal", () => { assert.isFalse(isLinkLocked(messageReceiver)); }); }); + + describe("subscribe", () => { + /** + * SessionReceiver is a bit of an odd duck because it doesn't do initialization + * in its subscribe() call (like non-session receiver) so our normal abortSignal + * code isn't running there. So we have to check this _and_ Receiver. + */ + it("SessionReceiver.subscribe", async () => { + const session = await ServiceBusSessionReceiverImpl.createInitializedSessionReceiver( + createConnectionContextForTests({ + onCreateReceiverCalled: (receiver) => { + (receiver as any).source = { + filter: { + [Constants.sessionFilterName]: "hello" + } + }; + + (receiver as any).properties = { + ["com.microsoft:locked-until-utc"]: Date.now() + }; + } + }), + "entityPath", + "peekLock", + { + sessionId: "hello" + } + ); + + try { + const abortSignal = createAbortSignalForTest(true); + const receivedErrors: Error[] = []; + + session.subscribe( + { + processMessage: async (_msg) => {}, + processError: async (err) => { + receivedErrors.push(err); + } + }, + { + abortSignal + } + ); + + assert.equal(receivedErrors[0].message, "The operation was aborted."); + assert.equal(receivedErrors[0].name, "AbortError"); + } finally { + await session.close(); + } + }); + + it("Receiver.subscribe", async () => { + const receiver = new ServiceBusReceiverImpl( + createConnectionContextForTests(), + "entityPath", + "peekLock" + ); + + try { + const abortSignal = createAbortSignalForTest(true); + const receivedErrors: Error[] = []; + + await new Promise((resolve) => { + receiver.subscribe( + { + processMessage: async (_msg: any) => {}, + processError: async (err: Error) => { + resolve(); + receivedErrors.push(err); + } + }, + { + abortSignal + } + ); + }); + + assert.equal(receivedErrors[0].message, "The operation was aborted."); + assert.equal(receivedErrors[0].name, "AbortError"); + } finally { + await receiver.close(); + } + }); + }); }); From 42a40a91d8a3da9f6e077284d6b18478d4ff6fd9 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Tue, 15 Sep 2020 17:04:59 -0700 Subject: [PATCH 3/3] Fixing caption --- sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts b/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts index d9962bb3af41..bd6c38b3b746 100644 --- a/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts @@ -322,7 +322,7 @@ describe("AbortSignal", () => { /** * SessionReceiver is a bit of an odd duck because it doesn't do initialization * in its subscribe() call (like non-session receiver) so our normal abortSignal - * code isn't running there. So we have to check this _and_ Receiver. + * code isn't running there. So we have to check this separately from Receiver. */ it("SessionReceiver.subscribe", async () => { const session = await ServiceBusSessionReceiverImpl.createInitializedSessionReceiver(