Skip to content

Commit

Permalink
[ServiceBus] throw error earlier when timing out (Azure#27308)
Browse files Browse the repository at this point in the history
Related: issue #9775, Azure#27010

This PR fixed the following timing issue when trying to establish link
for next session but there are none available.

what's happening is

- SDK is trying to create a link for next avaiable session
(`options.source.filter: { com.microsoft:session-filter': undefined }`)
- Because no sessions are available, after one minute, a link is
established, but with `"source":null,"target":null`. The call to create
link resolves.
- However, the service immediately use that link to ask SDK to close the
link, with an error of `"com.microsoft:timeout"`/`The operation did not
complete within the allotted timeout of 00:00:59.9130000.`
- SDK respects the request and start acquiring a lock to close the link
- Meanwhile, SDK gets a link, nice! It will continue as the link has
been created
- SDK gets lock to close link and sets `this._link` to `undefined` and
start closing receiver and amqp session
- SDK null-check the link and throws INTERNAL ERROR

Instead of returning the link in this case, This PR checks whether we
got a session id back. If not and there's an error from service, we
rethrow the error. Some of the existing error handling is also moved
before returning the link, instead of afterwards.

### Are there test cases added in this PR? _(If not, why?)_
There's one test covers this scenario and verifies two possible errors.
https://github.com/Azure/azure-sdk-for-js/blob/28cbcd053daabb7f86816014d8cb8f8004bbc18f/sdk/servicebus/service-bus/test/public/sessionsTests.spec.ts#L81
  • Loading branch information
jeremymeng authored Oct 4, 2023
1 parent 912ed6f commit 57cd99f
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 29 deletions.
2 changes: 2 additions & 0 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### Bugs Fixed

- Fix an INTERNAL ERROR due to timing [PR #27308](https://github.com/Azure/azure-sdk-for-js/pull/27308)

### Other Changes

## 7.9.1 (2023-09-12)
Expand Down
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/src/serviceBusError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export const wellKnownMessageCodesToServiceBusCodes: Map<string, ServiceBusError
["ServerBusyError", "ServiceBusy"],

["OperationTimeoutError", "ServiceTimeout"],
["ServiceUnavailableError", "ServiceTimeout"],
["ServiceCommunicationError", "ServiceCommunicationProblem"],
["SessionCannotBeLockedError", "SessionCannotBeLocked"],
["SessionLockLostError", "SessionLockLost"],
Expand Down
76 changes: 47 additions & 29 deletions sdk/servicebus/service-bus/src/session/messageSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import {
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { ServiceBusError, translateServiceBusError } from "../serviceBusError";
import { abandonMessage, completeMessage } from "../receivers/receiverCommon";
import { isDefined } from "@azure/core-util";
import { delay, isDefined } from "@azure/core-util";

/**
* Describes the options that need to be provided while creating a message session receiver link.
Expand Down Expand Up @@ -257,11 +257,50 @@ export class MessageSession extends LinkEntity<Receiver> {
}
}

protected createRheaLink(
protected async createRheaLink(
options: ReceiverOptions,
_abortSignal?: AbortSignalLike
): Promise<Receiver> {
return this._context.connection.createReceiver(options);
this._lastSBError = undefined;
let errorMessage: string = "";

const link = await this._context.connection.createReceiver(options);

const receivedSessionId = link.source?.filter?.[Constants.sessionFilterName];
if (!this._providedSessionId && !receivedSessionId) {
// When we ask for any sessions (passing option of session-filter: undefined),
// but don't receive one back, check whether service has sent any error.
if (
options.source &&
typeof options.source !== "string" &&
options.source.filter &&
Constants.sessionFilterName in options.source.filter &&
options.source.filter![Constants.sessionFilterName] === undefined
) {
await delay(1); // yield to eventloop
if (this._lastSBError) {
throw this._lastSBError;
}
}
// Ideally this code path should never be reached as `MessageSession.createReceiver()` should fail instead
// TODO: https://github.com/Azure/azure-sdk-for-js/issues/9775 to figure out why this code path indeed gets hit.
errorMessage = `Failed to create a receiver. No unlocked sessions available.`;
} else if (this._providedSessionId && receivedSessionId !== this._providedSessionId) {
// This code path is reached if the session is already locked by another receiver.
// TODO: Check why the service would not throw an error or just timeout instead of giving a misleading successful receiver
errorMessage = `Failed to create a receiver for the requested session '${this._providedSessionId}'. It may be locked by another receiver.`;
}

if (errorMessage) {
const error = translateServiceBusError({
description: errorMessage,
condition: ErrorNameConditionMapper.SessionCannotBeLockedError,
});
logger.logError(error, this.logPrefix);
throw error;
}

return link;
}

/**
Expand All @@ -274,36 +313,13 @@ export class MessageSession extends LinkEntity<Receiver> {
const sessionOptions = this._createMessageSessionOptions(this.identifier, opts.timeoutInMs);
await this.initLink(sessionOptions, opts.abortSignal);

if (this.link == null) {
if (!this.link) {
throw new Error("INTERNAL ERROR: failed to create receiver but without an error.");
}

const receivedSessionId =
this.link.source &&
this.link.source.filter &&
this.link.source.filter[Constants.sessionFilterName];

let errorMessage: string = "";

if (this._providedSessionId == null && receivedSessionId == null) {
// Ideally this code path should never be reached as `MessageSession.createReceiver()` should fail instead
// TODO: https://github.com/Azure/azure-sdk-for-js/issues/9775 to figure out why this code path indeed gets hit.
errorMessage = `Failed to create a receiver. No unlocked sessions available.`;
} else if (this._providedSessionId != null && receivedSessionId !== this._providedSessionId) {
// This code path is reached if the session is already locked by another receiver.
// TODO: Check why the service would not throw an error or just timeout instead of giving a misleading successful receiver
errorMessage = `Failed to create a receiver for the requested session '${this._providedSessionId}'. It may be locked by another receiver.`;
}
const receivedSessionId = this.link.source?.filter?.[Constants.sessionFilterName];

if (errorMessage) {
const error = translateServiceBusError({
description: errorMessage,
condition: ErrorNameConditionMapper.SessionCannotBeLockedError,
});
logger.logError(error, this.logPrefix);
throw error;
}
if (this._providedSessionId == null) this.sessionId = receivedSessionId;
if (!this._providedSessionId) this.sessionId = receivedSessionId;
this.sessionLockedUntilUtc = convertTicksToDate(
this.link.properties["com.microsoft:locked-until-utc"]
);
Expand Down Expand Up @@ -371,6 +387,7 @@ export class MessageSession extends LinkEntity<Receiver> {
}

private _retryOptions: RetryOptions | undefined;
private _lastSBError: Error | ServiceBusError | undefined;

/**
* Constructs a MessageSession instance which lets you receive messages as batches
Expand Down Expand Up @@ -444,6 +461,7 @@ export class MessageSession extends LinkEntity<Receiver> {
if (sbError.code === "SessionLockLostError") {
sbError.message = `The session lock has expired on the session with id ${this.sessionId}.`;
}
this._lastSBError = sbError;
logger.logError(sbError, "%s An error occurred for Receiver", this.logPrefix);
this._notifyError({
error: sbError,
Expand Down

0 comments on commit 57cd99f

Please sign in to comment.