Skip to content

Commit

Permalink
[service-bus] Reliability improvements and testing updates (#15098)
Browse files Browse the repository at this point in the history
This PR has a few changes in it, primarily to improve our robustness and our reporting:

General reliability improvements:
- Migrates to a workflow that treats subscription start as a retryable entity, rather than just link creation (which is what had previously). 
- It checks and throws exceptions on much more granular conditions, particularly in addCredit
- Error checking and handling has been migrated to be in far fewer spots and to be more unconditional, which should hopefully eliminate any areas where an exception or error could occur but it never gets forwarded or seen.

SDK debugging:
- Adds a new SDK only flag (forwardInternalErrors) which will make it so areas that used to eat errors now can forward them to processError. Prior to this the errors were only logged, but that meant they could be missed. Most of these would probably be considered cosmetic by customers so this is primarly for debugging purposes within the SDK itself.
- The internal `processInitialize` handler has been split into two (still internal) handlers - preInitialize and postInitialize. preInitialize runs before init(), and postInitialize runs after init() but before addCredit. This lets us write more reliable tests. These are not exposed to customers.

Fixes #14535
  • Loading branch information
richardpark-msft authored May 8, 2021
1 parent 9e856ac commit e2b5a62
Show file tree
Hide file tree
Showing 23 changed files with 1,263 additions and 1,070 deletions.
12 changes: 10 additions & 2 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 7.1.0-beta.1 (Unreleased)
## 7.0.6 (Unreleased)

### New Features

Expand All @@ -14,7 +14,15 @@
- [Bug Fix] `expiresAtUtc` is `Invalid Date` in the received message when the ttl is not defined. Has been fixed in [#13543](https://github.com/Azure/azure-sdk-for-js/pull/13543)
- Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug w.r.t the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692).
- Settling messages now use the `retryOptions` passed to `ServiceBusClient`, making it more resilient against network failures.
[PR#14867](https://github.com/Azure/azure-sdk-for-js/pull/14867/files)
[PR#14867](https://github.com/Azure/azure-sdk-for-js/pull/14867)
- Fixes an issue where receiver link recovery/creation could fail, resulting in a receiver that was no longer receiving messages.
[PR#15098](https://github.com/Azure/azure-sdk-for-js/pull/15098)

## 7.0.5 (2021-04-06)

### Bug fixes

- Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug with regards to the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692).

## 7.0.4 (2021-03-31)

Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@azure/service-bus",
"sdk-type": "client",
"author": "Microsoft Corporation",
"version": "7.1.0-beta.1",
"version": "7.0.6",
"license": "MIT",
"description": "Azure Service Bus SDK for JavaScript",
"homepage": "https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/servicebus/service-bus/",
Expand Down Expand Up @@ -73,7 +73,7 @@
"test:node": "npm run clean && npm run build:test:node && npm run integration-test:node",
"test": "npm run test:node && npm run test:browser",
"unit-test:browser": "echo skipped",
"unit-test:node": "mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"dist-esm/test/internal/unit/*.spec.js\" \"dist-esm/test/internal/node/*.spec.js\"",
"unit-test:node": "mocha -r esm -r ts-node/register --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"test/internal/unit/*.spec.ts\" \"test/internal/node/*.spec.ts\"",
"unit-test": "npm run unit-test:node && npm run unit-test:browser",
"docs": "typedoc --excludePrivate --excludeNotExported --excludeExternals --stripInternal --mode file --out ./dist/docs ./src"
},
Expand Down
10 changes: 0 additions & 10 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,6 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
number,
DeferredPromiseAndTimer
>();
/**
* The message handler provided by the user that will be wrapped
* inside _onAmqpMessage.
*/
protected _onMessage!: OnMessage;
/**
* The error handler provided by the user that will be wrapped
* inside _onAmqpError.
*/
protected _onError?: OnError;

/**
* A lock renewer that handles message lock auto-renewal. This is undefined unless the user
Expand Down
78 changes: 53 additions & 25 deletions sdk/servicebus/service-bus/src/core/receiverHelper.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { AbortError } from "@azure/abort-controller";
import { Receiver, ReceiverEvents } from "rhea-promise";
import { receiverLogger as logger } from "../log";
import { ServiceBusError } from "../serviceBusError";

/**
* Wraps the receiver with some higher level operations for managing state
Expand All @@ -11,37 +13,61 @@ import { receiverLogger as logger } from "../log";
* @internal
*/
export class ReceiverHelper {
private _isSuspended: boolean = false;
private _isSuspended: boolean = true;

constructor(
private _getCurrentReceiver: () => { receiver: Receiver | undefined; logPrefix: string }
) {}

private _getCurrentReceiverOrError():
| "is undefined"
| "is not open"
| "is suspended"
| { receiver: Receiver | undefined; logPrefix: string } {
const currentReceiverData = this._getCurrentReceiver();

if (currentReceiverData.receiver == null) {
return "is undefined";
}

if (!currentReceiverData.receiver.isOpen()) {
return "is not open";
}

if (this._isSuspended) {
return "is suspended";
}

return currentReceiverData;
}

/**
* Adds credits to the receiver, respecting any state that
* indicates the receiver is closed or should not continue
* to receive more messages.
*
* @param credits - Number of credits to add.
* @returns true if credits were added, false if there is no current receiver instance
* or `stopReceivingMessages` has been called.
*/
addCredit(credits: number): boolean {
const { receiver, logPrefix } = this._getCurrentReceiver();
addCredit(credits: number): void {
const currentReceiverOrError = this._getCurrentReceiverOrError();

if (!this.canReceiveMessages()) {
logger.verbose(
`${logPrefix} Asked to add ${credits} credits but the receiver is not able to receive messages`
);
return false;
}
if (typeof currentReceiverOrError === "string") {
const errorMessage = `Cannot request messages on the receiver since it ${currentReceiverOrError}.`;

if (receiver != null) {
logger.verbose(`${logPrefix} Adding ${credits} credits`);
receiver.addCredit(credits);
if (currentReceiverOrError === "is suspended") {
// if a user has suspended the receiver we should consider this a non-retryable
// error since it absolutely requires user intervention.
throw new AbortError(errorMessage);
}

throw new ServiceBusError(errorMessage, "GeneralError");
}

return true;
if (currentReceiverOrError.receiver != null) {
logger.verbose(`${currentReceiverOrError.logPrefix} Adding ${credits} credits`);
currentReceiverOrError.receiver.addCredit(credits);
}
}

/**
Expand All @@ -60,36 +86,38 @@ export class ReceiverHelper {
logger.verbose(
`${logPrefix} User has requested to stop receiving new messages, attempting to drain.`
);

return this.drain();
}

/**
* Resets tracking so `addCredit` works again.
* Resets tracking so `addCredit` works again by toggling the `_isSuspended` flag.
*/
resume(): void {
this._isSuspended = false;
}

/**
* Whether the receiver can receive messages.
*
* This checks if the the caller has decided to disable adding
* credits via 'suspend' as well as whether the receiver itself is
* still open.
*/
canReceiveMessages(): boolean {
const { receiver } = this._getCurrentReceiver();
return !this._isSuspended && this._isValidReceiver(receiver);
isSuspended(): boolean {
return this._isSuspended;
}

/**
* Initiates a drain for the current receiver and resolves when
* the drain has completed.
*
* NOTE: This method returns immediately if the receiver is not valid or if there
* are no pending credits on the receiver (ie: `receiver.credit === 0`).
*/
async drain(): Promise<void> {
const { receiver, logPrefix } = this._getCurrentReceiver();

if (!this._isValidReceiver(receiver)) {
// TODO: should we throw?
return;
}

if (receiver.credit === 0) {
// nothing to drain
return;
}

Expand Down
Loading

0 comments on commit e2b5a62

Please sign in to comment.