Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Check abortSignal in SessionReceiver.subscribe() #11281

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions sdk/servicebus/service-bus/review/service-bus.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ export interface MessageHandlerOptions extends MessageHandlerOptionsBase {
}

// @public
export interface MessageHandlerOptionsBase {
export interface MessageHandlerOptionsBase extends OperationOptionsBase {
autoComplete?: boolean;
maxConcurrentCalls?: number;
}
Expand Down Expand Up @@ -459,7 +459,7 @@ export interface ServiceBusSessionReceiver<ReceivedMessageT extends ServiceBusRe
}

// @public
export interface SessionSubscribeOptions extends OperationOptionsBase, MessageHandlerOptionsBase {
export interface SessionSubscribeOptions extends MessageHandlerOptionsBase {
}

// @public
Expand All @@ -482,7 +482,7 @@ export interface SqlRuleFilter {
export type SubQueue = "deadLetter" | "transferDeadLetter";

// @public
export interface SubscribeOptions extends OperationOptionsBase, MessageHandlerOptions {
export interface SubscribeOptions extends MessageHandlerOptions {
}

// @public
Expand Down
6 changes: 3 additions & 3 deletions sdk/servicebus/service-bus/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,18 @@ export interface GetMessageIteratorOptions extends OperationOptionsBase {}
/**
* Options used when subscribing to a Service Bus queue or subscription.
*/
export interface SubscribeOptions extends OperationOptionsBase, MessageHandlerOptions {}
export interface SubscribeOptions extends MessageHandlerOptions {}

/**
* Options used when subscribing to a Service Bus queue or subscription.
*/
export interface SessionSubscribeOptions extends OperationOptionsBase, MessageHandlerOptionsBase {}
export interface SessionSubscribeOptions extends MessageHandlerOptionsBase {}

/**
* Describes the options passed to `registerMessageHandler` method when receiving messages from a
* Queue/Subscription.
*/
export interface MessageHandlerOptionsBase {
export interface MessageHandlerOptionsBase extends OperationOptionsBase {
/**
* @property Indicates whether the `complete()` method on the message should automatically be
* called by the sdk after the user provided onMessage handler has been executed.
Expand Down
13 changes: 11 additions & 2 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ import { OnAmqpEventAsPromise, OnError, OnMessage } from "../core/messageReceive
import { logger } from "../log";
import { DispositionType, InternalReceiveMode, ServiceBusMessageImpl } from "../serviceBusMessage";
import { logError, throwErrorIfConnectionClosed } from "../util/errors";
import { calculateRenewAfterDuration, convertTicksToDate } from "../util/utils";
import {
calculateRenewAfterDuration,
convertTicksToDate,
StandardAbortMessage
} from "../util/utils";
import { BatchingReceiverLite, MinimalReceiver } from "../core/batchingReceiver";
import { onMessageSettled, DeferredPromiseAndTimer } from "../core/shared";
import { AbortSignalLike } from "@azure/abort-controller";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { ReceiverHelper } from "../core/receiverHelper";
import { CreateSessionReceiverOptions, MessageHandlerOptionsBase } from "../models";

Expand Down Expand Up @@ -618,6 +622,11 @@ export class MessageSession extends LinkEntity<Receiver> {
*/
subscribe(onMessage: OnMessage, onError: OnError, options?: MessageHandlerOptionsBase): void {
if (!options) options = {};

if (options.abortSignal?.aborted) {
throw new AbortError(StandardAbortMessage);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be the only net change... and is very deceptively simple...

I see how the abort signal is passed to the link creation time and the one with the receiveMessages().
This change ensures that an already aborted signal will result in cancelling the subscribe() method.

How are we ensuring that once subscribe() is called and then abort signal is fired that we cancel the operation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to check - is this the scenario you're talking about with regards to cancellation?

https://github.com/Azure/azure-sdk-for-js/pull/11281/files#diff-220199656e7c07d2ec0c2ab9378ccb28R321

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, @richardpark-msft will be logging an issue for the concern I raised above

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(for summary)

The abortSignal passed into subscribe, currently, only applies logically to the initialization of the subscription itself. If the subscription starts (ie, handlers are registered for receiving messages, credits on the line) the abortSignal no longer applies.

The way we have it, as is, has tripped up a few people in discussions so we want to revisit this: #11296


this._isReceivingMessagesForSubscriber = true;
if (typeof options.maxConcurrentCalls === "number" && options.maxConcurrentCalls > 0) {
this.maxConcurrentCalls = options.maxConcurrentCalls;
Expand Down
88 changes: 88 additions & 0 deletions sdk/servicebus/service-bus/test/internal/abortSignal.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import {
import { createConnectionContextForTests } from "./unittestUtils";
import { StandardAbortMessage } from "../../src/util/utils";
import { isLinkLocked } from "../utils/misc";
import { ServiceBusSessionReceiverImpl } from "../../src/receivers/sessionReceiver";
import { Constants } from "@azure/core-amqp";
import { ServiceBusReceiverImpl } from "../../src/receivers/receiver";

describe("AbortSignal", () => {
const testMessageThatDoesntMatter = {
Expand Down Expand Up @@ -314,4 +317,89 @@ describe("AbortSignal", () => {
assert.isFalse(isLinkLocked(messageReceiver));
});
});

describe("subscribe", () => {
/**
* SessionReceiver is a bit of an odd duck because it doesn't do initialization
* in its subscribe() call (like non-session receiver) so our normal abortSignal
* code isn't running there. So we have to check this separately from Receiver.
*/
it("SessionReceiver.subscribe", async () => {
const session = await ServiceBusSessionReceiverImpl.createInitializedSessionReceiver(
createConnectionContextForTests({
onCreateReceiverCalled: (receiver) => {
(receiver as any).source = {
filter: {
[Constants.sessionFilterName]: "hello"
}
};

(receiver as any).properties = {
["com.microsoft:locked-until-utc"]: Date.now()
};
}
}),
"entityPath",
"peekLock",
{
sessionId: "hello"
}
);

try {
const abortSignal = createAbortSignalForTest(true);
const receivedErrors: Error[] = [];

session.subscribe(
{
processMessage: async (_msg) => {},
processError: async (err) => {
receivedErrors.push(err);
}
},
{
abortSignal
}
);

assert.equal(receivedErrors[0].message, "The operation was aborted.");
assert.equal(receivedErrors[0].name, "AbortError");
} finally {
await session.close();
}
});

it("Receiver.subscribe", async () => {
const receiver = new ServiceBusReceiverImpl(
createConnectionContextForTests(),
"entityPath",
"peekLock"
);

try {
const abortSignal = createAbortSignalForTest(true);
const receivedErrors: Error[] = [];

await new Promise<void>((resolve) => {
receiver.subscribe(
{
processMessage: async (_msg: any) => {},
processError: async (err: Error) => {
resolve();
receivedErrors.push(err);
}
},
{
abortSignal
}
);
});

assert.equal(receivedErrors[0].message, "The operation was aborted.");
assert.equal(receivedErrors[0].name, "AbortError");
} finally {
await receiver.close();
}
});
});
});