Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Sender.createBatch cancellation support #9233

65 changes: 33 additions & 32 deletions sdk/servicebus/service-bus/.vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Debug sample",
"program": "${file}",
"preLaunchTask": "npm: build:samples",
"outFiles": [
"${workspaceFolder}/dist-esm/**/*.js"
]
},
{
"type": "node",
"request": "launch",
"name": "Debug Unit Tests",
"program": "${workspaceFolder}/node_modules/mocha/bin/_mocha",
"args": [
"-u",
"tdd",
"--timeout",
"999999",
"--colors",
"${workspaceFolder}/test-dist/index.js"
],
"internalConsoleOptions": "openOnSessionStart",
"preLaunchTask": "npm: build:test"
}
]
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Debug sample",
"program": "${file}",
"preLaunchTask": "npm: build:samples",
"outFiles": ["${workspaceFolder}/dist-esm/**/*.js"]
},
{
"type": "node",
"request": "launch",
"name": "Debug Unit Tests",
"program": "${workspaceFolder}/node_modules/mocha/bin/_mocha",
"args": [
"-r",
"esm",
"--require",
"source-map-support/register",
"--timeout",
"120000",
"--full-trace",
"dist-esm/test/*.spec.js",
"dist-esm/test/**/*.spec.js"
],
"internalConsoleOptions": "openOnSessionStart",
"preLaunchTask": "npm: build:test"
}
]
}
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ export class LinkEntity {
this.address
);
if (setTokenRenewal) {
await this._ensureTokenRenewal();
this._ensureTokenRenewal();
}
}

/**
* Ensures that the token is renewed within the predefined renewal margin.
* @returns {void}
*/
protected async _ensureTokenRenewal(): Promise<void> {
protected _ensureTokenRenewal(): void {
if (!this._tokenTimeout) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/managementClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ export class ManagementClient extends LinkEntity {
this._mgmtReqResLink!.sender.name,
this._mgmtReqResLink!.receiver.name
);
await this._ensureTokenRenewal();
this._ensureTokenRenewal();
}
} catch (err) {
err = translate(err);
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ export class MessageReceiver extends LinkEntity {
} else if (this.receiverType === ReceiverType.batching && !this._context.batchingReceiver) {
this._context.batchingReceiver = this as any;
}
await this._ensureTokenRenewal();
this._ensureTokenRenewal();
} else {
log.error(
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +
Expand Down
105 changes: 39 additions & 66 deletions sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
} from "../serviceBusMessage";
import { ClientEntityContext } from "../clientEntityContext";
import { LinkEntity } from "./linkEntity";
import { getUniqueName } from "../util/utils";
import { getUniqueName, waitForTimeoutOrAbortOrResolve } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
Expand Down Expand Up @@ -261,33 +261,19 @@ export class MessageSender extends LinkEntity {

const sendEventPromise = () =>
new Promise<void>(async (resolve, reject) => {
let initTimeoutTimer: any;

this._checkAndSetupAbortSignalCleanup(
abortSignal,
() => clearTimeout(initTimeoutTimer),
reject
);

const initStartTime = Date.now();
if (!this.isOpen()) {
const initTimeoutPromise = new Promise((_res, rejectInitTimeoutPromise) => {
initTimeoutTimer = setTimeout(() => {
const desc: string =
try {
await waitForTimeoutOrAbortOrResolve({
actionFn: () => this.open(undefined, options?.abortSignal),
abortMessage: "The send operation has been cancelled by the user.",
abortSignal: options?.abortSignal,
timeoutMs: timeoutInMs,
timeoutMessage:
`[${this._context.namespace.connectionId}] Sender "${this.name}" ` +
`with address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return rejectInitTimeoutPromise(translate(e));
}, timeoutInMs);
});

try {
await Promise.race([this.open(), initTimeoutPromise]);
`to operation timeout.`
});
} catch (err) {
err = translate(err);
log.warning(
Expand All @@ -297,8 +283,6 @@ export class MessageSender extends LinkEntity {
err
);
return reject(err);
} finally {
clearTimeout(initTimeoutTimer);
}
}
const timeTakenByInit = Date.now() - initStartTime;
Expand Down Expand Up @@ -392,7 +376,18 @@ export class MessageSender extends LinkEntity {
/**
* Initializes the sender session on the connection.
*/
public async open(options?: AwaitableSenderOptions): Promise<void> {
public async open(
options?: AwaitableSenderOptions,
abortSignal?: AbortSignalLike
): Promise<void> {
const checkAborted = (): void => {
if (abortSignal?.aborted) {
throw new AbortError("Sender creation was cancelled by the user.");
}
};

checkAborted();
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved

if (this.isOpen()) {
return;
}
Expand All @@ -402,8 +397,10 @@ export class MessageSender extends LinkEntity {
this.openLock
);

return await defaultLock.acquire(this.openLock, async () => {
return defaultLock.acquire(this.openLock, async () => {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
try {
checkAborted();

// isOpen isConnecting Should establish
// true false No
// true true No
Expand All @@ -419,6 +416,8 @@ export class MessageSender extends LinkEntity {
);
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
this.isConnecting = true;
await this._negotiateClaim();
checkAborted();
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved

log.error(
"[%s] Trying to create sender '%s'...",
this._context.namespace.connectionId,
Expand All @@ -427,8 +426,10 @@ export class MessageSender extends LinkEntity {
if (!options) {
options = this._createSenderOptions(Constants.defaultOperationTimeoutInMs);
}

this._sender = await this._context.namespace.connection.createAwaitableSender(options);
this.isConnecting = false;
checkAborted();
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved

log.error(
"[%s] Sender '%s' with address '%s' has established itself.",
this._context.namespace.connectionId,
Expand All @@ -450,7 +451,7 @@ export class MessageSender extends LinkEntity {
// It is possible for someone to close the sender and then start it again.
// Thus make sure that the sender is present in the client cache.
if (!this._sender) this._context.sender = this;
await this._ensureTokenRenewal();
this._ensureTokenRenewal();
}
} catch (err) {
err = translate(err);
Expand All @@ -461,6 +462,8 @@ export class MessageSender extends LinkEntity {
err
);
throw err;
} finally {
this.isConnecting = false;
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down Expand Up @@ -729,7 +732,7 @@ export class MessageSender extends LinkEntity {
async getMaxMessageSize(
options: {
retryOptions?: RetryOptions;
} = {}
} & Pick<OperationOptions, "abortSignal"> = {}
): Promise<number> {
const retryOptions = options.retryOptions || {};
if (this.isOpen()) {
Expand All @@ -738,10 +741,11 @@ export class MessageSender extends LinkEntity {
return new Promise<number>(async (resolve, reject) => {
try {
const config: RetryConfig<void> = {
operation: () => this.open(),
operation: () => this.open(undefined, options?.abortSignal),
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
connectionId: this._context.namespace.connectionId,
operationType: RetryOperationType.senderLink,
retryOptions: retryOptions
retryOptions: retryOptions,
abortSignal: options?.abortSignal
};

await retry<void>(config);
Expand All @@ -756,7 +760,8 @@ export class MessageSender extends LinkEntity {
async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
throwErrorIfConnectionClosed(this._context.namespace);
let maxMessageSize = await this.getMaxMessageSize({
retryOptions: this._retryOptions
retryOptions: this._retryOptions,
abortSignal: options?.abortSignal
});
if (options?.maxSizeInBytes) {
if (options.maxSizeInBytes > maxMessageSize!) {
Expand Down Expand Up @@ -792,38 +797,6 @@ export class MessageSender extends LinkEntity {
}
}

private _checkAndSetupAbortSignalCleanup(
abortSignal: AbortSignalLike | undefined,
clearStateFn: () => void,
reject: (err: Error) => void
) {
if (abortSignal == null) {
return;
}

const rejectOnAbort = () => {
const desc: string =
`[${this._context.namespace.connectionId}] The send operation on the Sender "${this.name}" with ` +
`address "${this.address}" has been cancelled by the user.`;
// Cancellation is user-intended, so log to info instead of warning.
log.error(desc);
return reject(new AbortError("The send operation has been cancelled by the user."));
};

if (abortSignal.aborted) {
// operation has been cancelled, so exit quickly
return rejectOnAbort();
}

const onAborted = () => {
clearStateFn();
abortSignal.removeEventListener("abort", onAborted);
return rejectOnAbort();
};

abortSignal.addEventListener("abort", onAborted);
}

/**
* Creates a new sender to the specific ServiceBus entity, and optionally to a given
* partition if it is not present in the context or returns the one present in the context.
Expand Down
13 changes: 11 additions & 2 deletions sdk/servicebus/service-bus/src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
retry
} from "@azure/core-amqp";
import { OperationOptions } from "./modelsToBeSharedWithEventHubs";
import { AbortError } from "@azure/abort-controller";

/**
* A Sender can be used to send messages, schedule messages to be sent at a later time
Expand Down Expand Up @@ -259,7 +260,15 @@ export class SenderImpl implements Sender {

async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
this._throwIfSenderOrConnectionClosed();
return this._sender.createBatch(options);
try {
return await this._sender.createBatch(options);
} catch (err) {
if (err.name === "AbortError") {
throw new AbortError("The createBatch operation has been cancelled by the user.");
}

throw err;
}
}

/**
Expand Down Expand Up @@ -417,7 +426,7 @@ export class SenderImpl implements Sender {
this._throwIfSenderOrConnectionClosed();

const config: RetryConfig<void> = {
operation: () => this._sender.open(),
operation: () => this._sender.open(undefined, options?.abortSignal),
connectionId: this._context.namespace.connectionId,
operationType: RetryOperationType.senderLink,
retryOptions: this._retryOptions,
Expand Down
14 changes: 9 additions & 5 deletions sdk/servicebus/service-bus/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,11 @@ export async function waitForTimeoutOrAbortOrResolve<T>(args: {
timeoutMessage: string;
abortMessage: string;
abortSignal?: AbortSignalLike;
// these are optional and only here for testing.
timeoutFunctions?: {
setTimeoutFn: (callback: (...args: any[]) => void, ms: number, ...args: any[]) => any;
clearTimeoutFn: (timeoutId: any) => void;
};
}): Promise<T> {
if (args.abortSignal && args.abortSignal.aborted) {
throw new AbortError(args.abortMessage);
Expand All @@ -509,25 +514,24 @@ export async function waitForTimeoutOrAbortOrResolve<T>(args: {
let clearAbortSignal: (() => void) | undefined = undefined;

const clearAbortSignalAndTimer = (): void => {
clearTimeout(timer);
(args.timeoutFunctions?.clearTimeoutFn ?? clearTimeout)(timer);

if (clearAbortSignal) {
clearAbortSignal();
}
};

// eslint-disable-next-line promise/param-names
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
const abortOrTimeoutPromise = new Promise<T>((_resolve, reject) => {
clearAbortSignal = checkAndRegisterWithAbortSignal(reject, args.abortMessage, args.abortSignal);

// using a named function here so we can identify it in our unit tests
timer = setTimeout(function timeoutCallback() {
timer = (args.timeoutFunctions?.setTimeoutFn ?? setTimeout)(() => {
reject(new OperationTimeoutError(args.timeoutMessage));
}, args.timeoutMs);
});

const actionPromise = args.actionFn();
try {
return await Promise.race([abortOrTimeoutPromise, actionPromise]);
return await Promise.race([abortOrTimeoutPromise, args.actionFn()]);
} finally {
clearAbortSignalAndTimer();
}
Expand Down
Loading