Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[service-bus] Reliability improvements and testing updates #15098

Merged
merged 21 commits into from
May 8, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Release History

## 7.1.0-beta.1 (Unreleased)
## 7.0.6 (Unreleased)

### New Features

Expand All @@ -16,6 +16,12 @@
- Settling messages now use the `retryOptions` passed to `ServiceBusClient`, making it more resilient against network failures.
[PR#14867](https://github.com/Azure/azure-sdk-for-js/pull/14867/files)

## 7.0.5 (2021-04-06)

### Bug fixes

- Some of the queue properties such as "forwardTo" and "autoDeleteOnIdle" were not being set as requested through the `ServiceBusAdministrationClient.createQueue` method because of a bug with regards to the ordering of XML properties. The issue has been fixed in [#14692](https://github.com/Azure/azure-sdk-for-js/pull/14692).

## 7.0.4 (2021-03-31)

### Bug fixes
Expand Down
4 changes: 2 additions & 2 deletions sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@azure/service-bus",
"sdk-type": "client",
"author": "Microsoft Corporation",
"version": "7.1.0-beta.1",
"version": "7.0.6",
"license": "MIT",
"description": "Azure Service Bus SDK for JavaScript",
"homepage": "https://github.com/Azure/azure-sdk-for-js/tree/master/sdk/servicebus/service-bus/",
Expand Down Expand Up @@ -73,7 +73,7 @@
"test:node": "npm run clean && npm run build:test:node && npm run integration-test:node",
"test": "npm run test:node && npm run test:browser",
"unit-test:browser": "echo skipped",
"unit-test:node": "mocha -r esm --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"dist-esm/test/internal/unit/*.spec.js\" \"dist-esm/test/internal/node/*.spec.js\"",
"unit-test:node": "mocha -r esm -r ts-node/register --require source-map-support/register --reporter ../../../common/tools/mocha-multi-reporter.js --timeout 120000 --full-trace \"test/internal/unit/*.spec.ts\" \"test/internal/node/*.spec.ts\"",
"unit-test": "npm run unit-test:node && npm run unit-test:browser",
"docs": "typedoc --excludePrivate --excludeNotExported --excludeExternals --stripInternal --mode file --out ./dist/docs ./src"
},
Expand Down
10 changes: 0 additions & 10 deletions sdk/servicebus/service-bus/src/core/messageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,6 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
number,
DeferredPromiseAndTimer
>();
/**
* The message handler provided by the user that will be wrapped
* inside _onAmqpMessage.
*/
protected _onMessage!: OnMessage;
/**
* The error handler provided by the user that will be wrapped
* inside _onAmqpError.
*/
protected _onError?: OnError;

/**
* A lock renewer that handles message lock auto-renewal. This is undefined unless the user
Expand Down
78 changes: 53 additions & 25 deletions sdk/servicebus/service-bus/src/core/receiverHelper.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { AbortError } from "@azure/abort-controller";
import { Receiver, ReceiverEvents } from "rhea-promise";
import { receiverLogger as logger } from "../log";
import { ServiceBusError } from "../serviceBusError";

/**
* Wraps the receiver with some higher level operations for managing state
Expand All @@ -11,37 +13,61 @@ import { receiverLogger as logger } from "../log";
* @internal
*/
export class ReceiverHelper {
private _isSuspended: boolean = false;
private _isSuspended: boolean = true;
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved

constructor(
private _getCurrentReceiver: () => { receiver: Receiver | undefined; logPrefix: string }
) {}

private _getCurrentReceiverOrError():
| "is undefined"
| "is not open"
| "is suspended"
| { receiver: Receiver | undefined; logPrefix: string } {
const currentReceiverData = this._getCurrentReceiver();

if (currentReceiverData.receiver == null) {
return "is undefined";
}

if (!currentReceiverData.receiver.isOpen()) {
return "is not open";
}

if (this._isSuspended) {
return "is suspended";
}

return currentReceiverData;
}

/**
* Adds credits to the receiver, respecting any state that
* indicates the receiver is closed or should not continue
* to receive more messages.
*
* @param credits - Number of credits to add.
* @returns true if credits were added, false if there is no current receiver instance
* or `stopReceivingMessages` has been called.
*/
addCredit(credits: number): boolean {
const { receiver, logPrefix } = this._getCurrentReceiver();
addCredit(credits: number): void {
richardpark-msft marked this conversation as resolved.
Show resolved Hide resolved
const currentReceiverOrError = this._getCurrentReceiverOrError();

if (!this.canReceiveMessages()) {
logger.verbose(
`${logPrefix} Asked to add ${credits} credits but the receiver is not able to receive messages`
);
return false;
}
if (typeof currentReceiverOrError === "string") {
const errorMessage = `Cannot request messages on the receiver since it ${currentReceiverOrError}.`;

if (receiver != null) {
logger.verbose(`${logPrefix} Adding ${credits} credits`);
receiver.addCredit(credits);
if (currentReceiverOrError === "is suspended") {
// if a user has suspended the receiver we should consider this a non-retryable
// error since it absolutely requires user intervention.
throw new AbortError(errorMessage);
chradek marked this conversation as resolved.
Show resolved Hide resolved
}

throw new ServiceBusError(errorMessage, "GeneralError");
}

return true;
if (currentReceiverOrError.receiver != null) {
logger.verbose(`${currentReceiverOrError.logPrefix} Adding ${credits} credits`);
currentReceiverOrError.receiver.addCredit(credits);
}
}

/**
Expand All @@ -60,36 +86,38 @@ export class ReceiverHelper {
logger.verbose(
`${logPrefix} User has requested to stop receiving new messages, attempting to drain.`
);

return this.drain();
}

/**
* Resets tracking so `addCredit` works again.
* Resets tracking so `addCredit` works again by toggling the `_isSuspended` flag.
*/
resume(): void {
this._isSuspended = false;
}

/**
* Whether the receiver can receive messages.
*
* This checks if the the caller has decided to disable adding
* credits via 'suspend' as well as whether the receiver itself is
* still open.
*/
canReceiveMessages(): boolean {
const { receiver } = this._getCurrentReceiver();
return !this._isSuspended && this._isValidReceiver(receiver);
isSuspended(): boolean {
return this._isSuspended;
}

/**
* Initiates a drain for the current receiver and resolves when
* the drain has completed.
*
* NOTE: This method returns immediately if the receiver is not valid or if there
* are no pending credits on the receiver (ie: `receiver.credit === 0`).
*/
async drain(): Promise<void> {
const { receiver, logPrefix } = this._getCurrentReceiver();

if (!this._isValidReceiver(receiver)) {
// TODO: should we throw?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess draining is technically done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this comment was me thinking "aloud".

I think we could consider it a candidate to throw on but it seemed less important. The spot where it's used would basically do nothing useful if the receiver was invalid anyways but it'd be nice to go through here again. I think we have some future work to add in a "drain with timeout" for another bug in our tests, so perhaps we could lump in the investigation there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider this a no-op and not an error scenario. The need for drain comes in order to avoid receiving messages when we are not ready for them. Absence of the receiver achieves that goal. So, I would combine the two checks here

if (!receiver || !received.isOpen() || receiver.credit === 0) {
   return;
   }

return;
}

if (receiver.credit === 0) {
// nothing to drain
return;
}

Expand Down
Loading