Skip to content

Commit

Permalink
[service-bus] pass abortSignal to link initialization and awaitableSe…
Browse files Browse the repository at this point in the history
…nder (Azure#15349)

* [service-bus] pass abortSignal to link initialization and awaitableSender

* [service-bus] add changelog entry

* [core-amqp] remove AsyncLock!

* [service-bus] fix test after the great merge

* npm run format

* fix abort on send after great merge
  • Loading branch information
chradek authored Jun 9, 2021
1 parent 1fea6c0 commit 3866111
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 104 deletions.
3 changes: 2 additions & 1 deletion sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# Release History

## 3.0.0 (Unreleased)
## 3.0.0 (2021-06-09)

### Breaking changes

- Updates the `rhea-promise` and `rhea` dependencies to version 2.x. `rhea` contains a breaking change that changes deserialization of timestamps from numbers to Date objects.
- Removes the `AsyncLock` and `defaultLock` exports. `defaultCancellableLock` should be used instead.

## 2.3.0 (2021-04-29)

Expand Down
2 changes: 0 additions & 2 deletions sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@
"@azure/abort-controller": "^1.0.0",
"@azure/core-auth": "^1.3.0",
"@azure/logger": "^1.0.0",
"@types/async-lock": "^1.1.0",
"async-lock": "^1.1.3",
"buffer": "^5.2.1",
"events": "^3.0.0",
"jssha": "^3.1.0",
Expand Down
6 changes: 0 additions & 6 deletions sdk/core/core-amqp/review/core-amqp.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import { AbortSignalLike } from '@azure/abort-controller';
import { AccessToken } from '@azure/core-auth';
import { AmqpError } from 'rhea-promise';
import AsyncLock from 'async-lock';
import { Connection } from 'rhea-promise';
import { Message } from 'rhea-promise';
import { MessageHeader } from 'rhea-promise';
Expand Down Expand Up @@ -91,8 +90,6 @@ export const AmqpMessageProperties: {
fromRheaMessageProperties(props: MessageProperties): AmqpMessageProperties;
};

export { AsyncLock }

// @public
export interface CancellableAsyncLock {
acquire<T = void>(key: string, task: (...args: any[]) => Promise<T>, properties: AcquireLockProperties): Promise<T>;
Expand Down Expand Up @@ -368,9 +365,6 @@ export function createSasTokenProvider(data: {
// @public
export const defaultCancellableLock: CancellableAsyncLock;

// @public
export const defaultLock: AsyncLock;

// @public
export function delay<T>(delayInMs: number, abortSignal?: AbortSignalLike, abortErrorMsg?: string, value?: T): Promise<T | void>;

Expand Down
2 changes: 0 additions & 2 deletions sdk/core/core-amqp/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ export {
delay,
parseConnectionString,
defaultCancellableLock,
defaultLock,
ParsedOutput,
AsyncLock,
WebSocketOptions
} from "./util/utils";
export { AmqpAnnotatedMessage } from "./amqpAnnotatedMessage";
Expand Down
18 changes: 0 additions & 18 deletions sdk/core/core-amqp/src/util/utils.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import AsyncLock from "async-lock";
import { AbortError, AbortSignalLike } from "@azure/abort-controller";
import { WebSocketImpl } from "rhea-promise";
import { isDefined } from "./typeGuards";
import { StandardAbortMessage } from "../errors";
import { CancellableAsyncLock, CancellableAsyncLockImpl } from "./lock";

export { AsyncLock };
/**
* @internal
*
Expand Down Expand Up @@ -113,22 +111,6 @@ export function parseConnectionString<T>(connectionString: string): ParsedOutput
return output as any;
}

/**
* @internal
*
* Gets a new instance of the async lock with desired settings.
* @param options - The async lock options.
* @returns AsyncLock
*/
export function getNewAsyncLock(options?: AsyncLockOptions): AsyncLock {
return new AsyncLock(options);
}

/**
* The async lock instance with default settings.
*/
export const defaultLock: AsyncLock = new AsyncLock({ maxPending: 10000 });

/**
* The cancellable async lock instance.
*/
Expand Down
22 changes: 13 additions & 9 deletions sdk/core/core-amqp/test/cbs.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import { assert } from "chai";
import { AbortController } from "@azure/abort-controller";
import { CbsClient, defaultLock, TokenType } from "../src";
import { CbsClient, defaultCancellableLock, TokenType } from "../src";
import { createConnectionStub } from "./utils/createConnectionStub";
import { Connection } from "rhea-promise";
import { stub } from "sinon";
Expand Down Expand Up @@ -38,14 +38,18 @@ describe("CbsClient", function() {

// Make the existing `init` invocation wait until the abortSignal
// is aborted before acquiring it's lock.
await defaultLock.acquire(lock, () => {
return new Promise<void>((resolve) => {
setTimeout(() => {
controller.abort();
resolve();
}, 0);
});
});
await defaultCancellableLock.acquire(
lock,
() => {
return new Promise<void>((resolve) => {
setTimeout(() => {
controller.abort();
resolve();
}, 0);
});
},
{ abortSignal: undefined, timeoutInMs: undefined }
);

try {
await cbsClient.init({ abortSignal: signal });
Expand Down
3 changes: 3 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 7.2.0-beta.2 (Unreleased)

- Improves cancellation support when sending messages or initializing a connection to the service.
Resolves [#15311](https://github.com/Azure/azure-sdk-for-js/issues/15311) and [#13504](https://github.com/Azure/azure-sdk-for-js/issues/13504).

### Bug fixes

- ServiceBusSender could throw an error (`TypeError: Cannot read property 'maxMessageSize' of undefined`) if a link was being restarted while calling sendMessages().
Expand Down
98 changes: 76 additions & 22 deletions sdk/servicebus/service-bus/src/core/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import {
Constants,
TokenType,
defaultLock,
defaultCancellableLock,
RequestResponseLink,
StandardAbortMessage,
isSasTokenProvider
Expand Down Expand Up @@ -217,12 +217,19 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this._logger.verbose(
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for initializing link`
);
return defaultLock.acquire(this._openLock, () => {
this._logger.verbose(
`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`
);
return this._initLinkImpl(options, abortSignal);
});
return defaultCancellableLock.acquire(
this._openLock,
() => {
this._logger.verbose(
`${this._logPrefix} Lock ${this._openLock} acquired for initializing link`
);
return this._initLinkImpl(options, abortSignal);
},
{
abortSignal: abortSignal,
timeoutInMs: Constants.defaultOperationTimeoutInMs
}
);
}

private async _initLinkImpl(
Expand Down Expand Up @@ -258,7 +265,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
);

try {
await this._negotiateClaim();
await this._negotiateClaim({
abortSignal,
setTokenRenewal: false,
timeoutInMs: Constants.defaultOperationTimeoutInMs
});

checkAborted();
this.checkIfConnectionReady();
Expand Down Expand Up @@ -324,10 +335,14 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this._logger.verbose(
`${this._logPrefix} Attempting to acquire lock token ${this._openLock} for closing link`
);
return defaultLock.acquire(this._openLock, () => {
this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
return this.closeLinkImpl();
});
return defaultCancellableLock.acquire(
this._openLock,
() => {
this._logger.verbose(`${this._logPrefix} Lock ${this._openLock} acquired for closing link`);
return this.closeLinkImpl();
},
{ abortSignal: undefined, timeoutInMs: undefined }
);
}

private async closeLinkImpl(): Promise<void> {
Expand Down Expand Up @@ -375,7 +390,15 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
* Negotiates the cbs claim for the ClientEntity.
* @param setTokenRenewal - Set the token renewal timer. Default false.
*/
private async _negotiateClaim(setTokenRenewal?: boolean): Promise<void> {
private async _negotiateClaim({
abortSignal,
setTokenRenewal,
timeoutInMs
}: {
setTokenRenewal: boolean;
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}): Promise<void> {
this._logger.verbose(`${this._logPrefix} negotiateclaim() has been called`);

// Wait for the connectionContext to be ready to open the link.
Expand All @@ -394,10 +417,22 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
this.name,
this.address
);
await defaultLock.acquire(this._context.cbsSession.cbsLock, async () => {
this.checkIfConnectionReady();
return this._context.cbsSession.init();
});

const startTime = Date.now();
if (!this._context.cbsSession.isOpen()) {
await defaultCancellableLock.acquire(
this._context.cbsSession.cbsLock,
() => {
this.checkIfConnectionReady();
return this._context.cbsSession.init({ abortSignal, timeoutInMs });
},
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
}

let tokenObject: AccessToken;
let tokenType: TokenType;
if (isSasTokenProvider(this._context.tokenCredential)) {
Expand Down Expand Up @@ -433,10 +468,25 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
if (!tokenObject) {
throw new Error("Token cannot be null");
}
await defaultLock.acquire(this._context.negotiateClaimLock, () => {
this.checkIfConnectionReady();
return this._context.cbsSession.negotiateClaim(this.audience, tokenObject.token, tokenType);
});
await defaultCancellableLock.acquire(
this._context.negotiateClaimLock,
() => {
this.checkIfConnectionReady();
return this._context.cbsSession.negotiateClaim(
this.audience,
tokenObject.token,
tokenType,
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
},
{
abortSignal,
timeoutInMs: timeoutInMs - (Date.now() - startTime)
}
);
this._logger.verbose(
"%s Negotiated claim for %s '%s' with with address: %s",
this.logPrefix,
Expand Down Expand Up @@ -485,7 +535,11 @@ export abstract class LinkEntity<LinkT extends Receiver | AwaitableSender | Requ
}
this._tokenRenewalTimer = setTimeout(async () => {
try {
await this._negotiateClaim(true);
await this._negotiateClaim({
setTokenRenewal: true,
abortSignal: undefined,
timeoutInMs: Constants.defaultOperationTimeoutInMs
});
} catch (err) {
this._logger.logError(
err,
Expand Down
3 changes: 2 additions & 1 deletion sdk/servicebus/service-bus/src/core/messageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
try {
const delivery = await this.link!.send(encodedMessage, {
format: sendBatch ? 0x80013700 : 0,
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000
timeoutInSeconds: (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000,
abortSignal
});
logger.verbose(
"%s Sender '%s', sent message with delivery id: %d",
Expand Down
Loading

0 comments on commit 3866111

Please sign in to comment.