Skip to content

Commit

Permalink
[service-bus] Fix message loss issues with peekLock and receiveAndDel…
Browse files Browse the repository at this point in the history
…ete (#15989)

Fixing an issue where we could lose messages or provoke an alarming message from rhea (`Received transfer when credit was 0`)
    
The message loss issue is related to how we trigger 'drain' using 'addCredit(1)'. Our 'receiver.drain; receiver.addCredit(1)' pattern actually does add a credit, which shows up in the flow frame that gets sent for our drain. This has led to occasionally receiving more messages than we intended.
    
The second part of this was that we were masking this error because we had code that specifically threw out messages if more arrived than were requested. If the message was being auto-renewed it's possible for the message to appear to be missing, and if we were in receiveAndDelete the message is effectively lost at that point. That code is now removed (we defer to just allowing the extrra message, should a bug arise that causes that) and we log an error indicating it did happen.
    
The rhea error message appeared to be triggered by our accidentally allowing multiple overlapping 'drain's to occur (finalAction did not check to see if we were _already_ draining and would allow it to happen multiple times). Removing the concurrent drains fixed this issue but I didn't fully investigate why.

Fixes #15606, #15115
  • Loading branch information
richardpark-msft authored Jul 1, 2021
1 parent cec69b6 commit d75f119
Show file tree
Hide file tree
Showing 17 changed files with 735 additions and 278 deletions.
31 changes: 16 additions & 15 deletions common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 4 additions & 7 deletions sdk/core/core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
# Release History

## 3.0.1 (Unreleased)

### Features Added

### Breaking Changes
## 3.1.0 (Unreleased)

### Key Bugs Fixed

### Fixed

- Updated to use the latest version of the `rhea` package.
Part of a fix for PR#15989, where draining messages could sometimes lead to message loss with `receiver.receiveMessages()`.
[PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989)

## 3.0.0 (2021-06-09)

Expand Down
6 changes: 3 additions & 3 deletions sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@azure/core-amqp",
"sdk-type": "client",
"version": "3.0.1",
"version": "3.1.0",
"description": "Common library for amqp based azure sdks like @azure/event-hubs.",
"author": "Microsoft Corporation",
"license": "MIT",
Expand Down Expand Up @@ -76,8 +76,8 @@
"events": "^3.0.0",
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea": "^2.0.2",
"rhea-promise": "^2.0.0",
"rhea": "^2.0.3",
"rhea-promise": "^2.1.0",
"tslib": "^2.2.0",
"url": "^0.11.0",
"util": "^0.12.1"
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
"is-buffer": "^2.0.3",
"jssha": "^3.1.0",
"process": "^0.11.10",
"rhea-promise": "^2.0.0",
"rhea-promise": "^2.1.0",
"tslib": "^2.2.0",
"uuid": "^8.3.0"
},
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/mock-hub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"dependencies": {
"@azure/abort-controller": "^1.0.0",
"@azure/core-asynciterator-polyfill": "^1.0.0",
"rhea": "^2.0.2",
"rhea": "^2.0.3",
"tslib": "^2.2.0"
},
"//sampleConfiguration": {
Expand Down
10 changes: 5 additions & 5 deletions sdk/servicebus/service-bus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Release History

## 7.3.0 (Unreleased)

## 7.3.0 (2021-07-06)
### Features Added
- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.

### Breaking Changes

- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details.
### Key Bugs Fixed

- Fixed a bug that could lead to message loss in certain conditions when using `receiver.receiveMessages()`.
[PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989)

### Fixed

- Fixing an issue where the internal link cache would not properly remove closed links.
Expand Down
2 changes: 1 addition & 1 deletion sdk/servicebus/service-bus/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
"long": "^4.0.0",
"process": "^0.11.10",
"tslib": "^2.2.0",
"rhea-promise": "^2.0.0"
"rhea-promise": "^2.1.0"
},
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
Expand Down
49 changes: 33 additions & 16 deletions sdk/servicebus/service-bus/src/core/batchingReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
OnAmqpEvent,
ReceiverEvents,
SessionEvents,
Receiver,
Receiver as RheaPromiseReceiver,
Session
} from "rhea-promise";
import { ServiceBusMessageImpl } from "../serviceBusMessage";
Expand Down Expand Up @@ -191,16 +191,22 @@ export function getRemainingWaitTimeInMsFn(
*
* @internal
*/
type EventEmitterLike<T extends Receiver | Session> = Pick<T, "once" | "removeListener" | "on">;
type EventEmitterLike<T extends RheaPromiseReceiver | Session> = Pick<
T,
"once" | "removeListener" | "on"
>;

/**
* The bare minimum needed to receive messages for batched
* message receiving.
*
* @internal
*/
export type MinimalReceiver = Pick<Receiver, "name" | "isOpen" | "credit" | "addCredit" | "drain"> &
EventEmitterLike<Receiver> & {
export type MinimalReceiver = Pick<
RheaPromiseReceiver,
"name" | "isOpen" | "credit" | "addCredit" | "drain" | "drainCredit"
> &
EventEmitterLike<RheaPromiseReceiver> & {
session: EventEmitterLike<Session>;
} & {
connection: {
Expand Down Expand Up @@ -269,6 +275,7 @@ export class BatchingReceiverLite {

private _getRemainingWaitTimeInMsFn: typeof getRemainingWaitTimeInMsFn;
private _closeHandler: ((connectionError?: AmqpError | Error) => void) | undefined;
private _finalAction: (() => void) | undefined;

isReceivingMessages: boolean;

Expand Down Expand Up @@ -389,16 +396,17 @@ export class BatchingReceiverLite {
// - maxMessageCount is reached or
// - maxWaitTime is passed or
// - newMessageWaitTimeoutInSeconds is passed since the last message was received
const finalAction = (): void => {
this._finalAction = (): void => {
if (receiver.drain) {
// If a drain is already in process then we should let it complete. Some messages might still be in flight, but they will
// arrive before the drain completes.
return;
}

// Drain any pending credits.
if (receiver.isOpen() && receiver.credit > 0) {
logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`);

// setting .drain and combining it with .addCredit results in (eventually) sending
// a drain request to Service Bus. When the drain completes rhea will call `onReceiveDrain`
// at which point we'll wrap everything up and resolve the promise.
receiver.drain = true;
receiver.addCredit(1);
receiver.drainCredit();
} else {
logger.verbose(
`${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.`
Expand Down Expand Up @@ -429,15 +437,24 @@ export class BatchingReceiverLite {
logger.verbose(
`${loggingPrefix} Batching, waited for ${remainingWaitTimeInMs} milliseconds after receiving the first message.`
);
finalAction();
this._finalAction!();
}, remainingWaitTimeInMs);
}
}

try {
const data: ServiceBusMessageImpl = this._createServiceBusMessage(context);
if (brokeredMessages.length < args.maxMessageCount) {
brokeredMessages.push(data);
brokeredMessages.push(data);

// NOTE: we used to actually "lose" any extra messages. At this point I've fixed the areas that were causing us to receive
// extra messages but if this bug arises in some other way it's better to return the message than it would be to let it be
// silently dropped on the floor.
if (brokeredMessages.length > args.maxMessageCount) {
logger.warning(
`More messages arrived than were expected: ${
args.maxMessageCount
} vs ${brokeredMessages.length + 1}`
);
}
} catch (err) {
const errObj = err instanceof Error ? err : new Error(JSON.stringify(err));
Expand All @@ -448,7 +465,7 @@ export class BatchingReceiverLite {
reject(errObj);
}
if (brokeredMessages.length === args.maxMessageCount) {
finalAction();
this._finalAction!();
}
};

Expand Down Expand Up @@ -515,7 +532,7 @@ export class BatchingReceiverLite {
logger.verbose(
`${loggingPrefix} Batching, waited for max wait time ${args.maxWaitTimeInMs} milliseconds.`
);
finalAction();
this._finalAction!();
}, args.maxWaitTimeInMs);

receiver.on(ReceiverEvents.message, onReceiveMessage);
Expand Down
5 changes: 1 addition & 4 deletions sdk/servicebus/service-bus/src/core/receiverHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ export class ReceiverHelper {
resolve();
});

receiver.drain = true;
// this is not actually adding another credit - it'll just
// cause the drain call to start.
receiver.addCredit(1);
receiver.drainCredit();
});

return drainPromise;
Expand Down
21 changes: 5 additions & 16 deletions sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1388,24 +1388,13 @@ function causeDisconnectDuringDrain(
throw new Error("No active link for batching receiver");
}

const origAddCredit = link.addCredit;

// We want to simulate a disconnect once the batching receiver is draining.
// We can detect when the receiver enters a draining state when `addCredit` is
// called while didRequestDrainResolver is called to resolve the promise.
const addCreditThatImmediatelyDetaches = function(credits: number): void {
origAddCredit.call(link, credits);

if (link.drain && credits === 1) {
// initiate the detach now (prior to any possibilty of the 'drain' call being scheduled)
batchingReceiver
.onDetached(new Error("Test: fake connection failure"))
.then(() => resolveOnDetachedCallPromise());
}
link["drainCredit"] = () => {
// don't send the drain request, we'll just detach.
batchingReceiver
.onDetached(new Error("Test: fake connection failure"))
.then(() => resolveOnDetachedCallPromise());
};

link["addCredit"] = addCreditThatImmediatelyDetaches;

return {
onDetachedCalledPromise
};
Expand Down
5 changes: 4 additions & 1 deletion sdk/servicebus/service-bus/test/internal/smoketest.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import {
chai.use(chaiAsPromised);
const assert = chai.assert;

describe("Sample scenarios for track 2", () => {
/**
* A basic suite that exercises most of the core functionality.
*/
describe("Smoke tests", () => {
let serviceBusClient: ServiceBusClientForTests;

before(async () => {
Expand Down
Loading

0 comments on commit d75f119

Please sign in to comment.