diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index c1958d7937e9..72b8c87c5b2f 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -6552,26 +6552,26 @@ packages: dev: false resolution: integrity: sha512-+6uilZXSJGyiqVeHQI3Krv6NTAd8cWRCY2uyCxmzR4/5IFtBqqFem1HV2OiwSj0Gu7OFChIJDfH2JyjN7J0vRA== - /rhea-promise/2.0.0: + /rhea-promise/2.1.0: dependencies: debug: 3.2.7 - rhea: 2.0.2 + rhea: 2.0.3 tslib: 2.3.0 dev: false resolution: - integrity: sha512-hoK6pTrFyIFDp0jrC2FQXzZPFnJXG78OHikpSHeC2gjdKGT+ofoFsbbxA0FcYjBqU2r2hDKRZbacBBvGtGAzhw== + integrity: sha512-CRMwdJ/o4oO/xKcvAwAsd0AHy5fVvSlqso7AadRmaaLGzAzc9LCoW7FOFnucI8THasVmOeCnv5c/fH/n7FcNaA== /rhea/1.0.24: dependencies: debug: 3.2.7 dev: false resolution: integrity: sha512-PEl62U2EhxCO5wMUZ2/bCBcXAVKN9AdMSNQOrp3+R5b77TEaOSiy16MQ0sIOmzj/iqsgIAgPs1mt3FYfu1vIXA== - /rhea/2.0.2: + /rhea/2.0.3: dependencies: debug: 3.2.7 dev: false resolution: - integrity: sha512-G2QqyVzRnZvv+WkpKBmWrVmkeeLRX7xKZB3wutU2fs/qvr4PJvAqKO7ymSYX/0dm2xt1QhNJO/Af17x49f7FBw== + integrity: sha512-goQWv15ci6RdjtSpDezknlJ0PQDDkkJiMVxC3oS8DpECnzLFov01WMJ23cpXW+L3hlSQwyunqX9kc0JW6/lziw== /rimraf/2.7.1: dependencies: glob: 7.1.7 @@ -8347,7 +8347,7 @@ packages: dev: false name: '@rush-temp/ai-text-analytics' resolution: - integrity: sha512-xOTFTYbC7zkPZU+4uJ63+sFWxsXvaQQwQRRQpeH0x8TbxUpWfELGqZ7tmkaqiiA/kWNSjwRweMooEPznC2/0XA== + integrity: sha512-vf8Pnl7FTqnmhkjs7l9bLgV6pbmosqg0oPmo/hOA3kaE5Ekj+4sryjMjcwoi2mexR7wFEO2Ck2K1KBYPnyRTiQ== tarball: file:projects/ai-text-analytics.tgz version: 0.0.0 file:projects/app-configuration.tgz: @@ -8903,8 +8903,8 @@ packages: prettier: 1.19.1 process: 0.11.10 puppeteer: 3.3.0 - rhea: 2.0.2 - rhea-promise: 2.0.0 + rhea: 2.0.3 + rhea-promise: 2.1.0 rimraf: 3.0.2 rollup: 1.32.1 rollup-plugin-shim: 1.0.0 @@ -8921,7 +8921,7 @@ packages: dev: false name: '@rush-temp/core-amqp' resolution: - integrity: sha512-NsjbIvK0AepoWd0jHaFjQIsq2AIm10LaxkUXhc55cqQTJUCfPmPjw/OENA/4TcvI7c9lMe+5ZhGxyEbTiUI0Xw== + integrity: sha512-FHTxFxeancQjNBMEO+8O8eirRfSQp0TUzFfwBEBKy+/psSDQnFUCiVuYo9Q1DoXUDTFNX56lGOEhcuiG2fp7GA== tarball: file:projects/core-amqp.tgz version: 0.0.0 file:projects/core-asynciterator-polyfill.tgz: @@ -9732,6 +9732,7 @@ packages: buffer: 5.7.1 chai: 4.3.4 chai-as-promised: 7.1.1_chai@4.3.4 + chai-exclude: 2.0.3_chai@4.3.4 chai-string: 1.5.0_chai@4.3.4 cross-env: 7.0.3 debug: 4.3.1 @@ -9760,7 +9761,7 @@ packages: prettier: 1.19.1 process: 0.11.10 puppeteer: 3.3.0 - rhea-promise: 2.0.0 + rhea-promise: 2.1.0 rimraf: 3.0.2 rollup: 1.32.1 rollup-plugin-shim: 1.0.0 @@ -9776,7 +9777,7 @@ packages: dev: false name: '@rush-temp/event-hubs' resolution: - integrity: sha512-WNJyVRH8bzOhs/DZ6caEh2RTqDJ4yO/U/yvcrY1F8Vzdb9RA0wRHZ4J+iROn6z07M1uMw6SbP/O21YoiSdWb8g== + integrity: sha512-/EtEDC3Q1pKc+3/XrfBrbs7bBwQDV42aTIg9WMVRo0bjdBC6Wr8iIVvyCQMzgnQ9MbDQeHA/cILgbBZgfkgHLw== tarball: file:projects/event-hubs.tgz version: 0.0.0 file:projects/event-processor-host.tgz: @@ -10471,14 +10472,14 @@ packages: dotenv: 8.6.0 eslint: 7.29.0 prettier: 1.19.1 - rhea: 2.0.2 + rhea: 2.0.3 rimraf: 3.0.2 tslib: 2.3.0 typescript: 4.2.4 dev: false name: '@rush-temp/mock-hub' resolution: - integrity: sha512-w4gQTrqQney0Jkj0NMc1D9KrgNJ1upzYkheaNFdeA84uApywwVloe7o8mOhGMVhzR1uQMsq2zFlNGdDgbByDog== + integrity: sha512-K6rFTLoACIBX5It6x5v3r668Kpt7jjkgCeukzTyZ13cvPrjqJ4M7L9y0oU2ECIigGm59cbHx4sWOX/rHAhELhw== tarball: file:projects/mock-hub.tgz version: 0.0.0 file:projects/monitor-opentelemetry-exporter.tgz: @@ -11160,7 +11161,7 @@ packages: process: 0.11.10 promise: 8.1.0 puppeteer: 3.3.0 - rhea-promise: 2.0.0 + rhea-promise: 2.1.0 rimraf: 3.0.2 rollup: 1.32.1 rollup-plugin-shim: 1.0.0 @@ -11175,7 +11176,7 @@ packages: dev: false name: '@rush-temp/service-bus' resolution: - integrity: sha512-n1TTvwS5Z9hDPLyLYzZQbZONAM0ZqDz2pT8Q3XopMzeCYQIw9Iuj+fCtCDm1MQsVuo2rbVGGMzA6AqqivD2F8A== + integrity: sha512-UBMn4Zxni71LSfmuDpKr50cz6JFTgZxnuM8ESeHPfMa5Ibzx1b8lLbURKcqiqRaatEyxueFRzESf46JDUv78ug== tarball: file:projects/service-bus.tgz version: 0.0.0 file:projects/storage-blob-changefeed.tgz: diff --git a/sdk/core/core-amqp/CHANGELOG.md b/sdk/core/core-amqp/CHANGELOG.md index a26a03e169fd..65754897e780 100644 --- a/sdk/core/core-amqp/CHANGELOG.md +++ b/sdk/core/core-amqp/CHANGELOG.md @@ -1,15 +1,12 @@ # Release History -## 3.0.1 (Unreleased) - -### Features Added - -### Breaking Changes +## 3.1.0 (Unreleased) ### Key Bugs Fixed -### Fixed - +- Updated to use the latest version of the `rhea` package. + Part of a fix for PR#15989, where draining messages could sometimes lead to message loss with `receiver.receiveMessages()`. + [PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989) ## 3.0.0 (2021-06-09) diff --git a/sdk/core/core-amqp/package.json b/sdk/core/core-amqp/package.json index 02aa3ab9fba8..3d0424cf39a0 100644 --- a/sdk/core/core-amqp/package.json +++ b/sdk/core/core-amqp/package.json @@ -1,7 +1,7 @@ { "name": "@azure/core-amqp", "sdk-type": "client", - "version": "3.0.1", + "version": "3.1.0", "description": "Common library for amqp based azure sdks like @azure/event-hubs.", "author": "Microsoft Corporation", "license": "MIT", @@ -76,8 +76,8 @@ "events": "^3.0.0", "jssha": "^3.1.0", "process": "^0.11.10", - "rhea": "^2.0.2", - "rhea-promise": "^2.0.0", + "rhea": "^2.0.3", + "rhea-promise": "^2.1.0", "tslib": "^2.2.0", "url": "^0.11.0", "util": "^0.12.1" diff --git a/sdk/eventhub/event-hubs/package.json b/sdk/eventhub/event-hubs/package.json index 953a510a95a0..555d4e7c215b 100644 --- a/sdk/eventhub/event-hubs/package.json +++ b/sdk/eventhub/event-hubs/package.json @@ -115,7 +115,7 @@ "is-buffer": "^2.0.3", "jssha": "^3.1.0", "process": "^0.11.10", - "rhea-promise": "^2.0.0", + "rhea-promise": "^2.1.0", "tslib": "^2.2.0", "uuid": "^8.3.0" }, diff --git a/sdk/eventhub/mock-hub/package.json b/sdk/eventhub/mock-hub/package.json index 8fa980a59215..a4bc00a2a119 100644 --- a/sdk/eventhub/mock-hub/package.json +++ b/sdk/eventhub/mock-hub/package.json @@ -62,7 +62,7 @@ "dependencies": { "@azure/abort-controller": "^1.0.0", "@azure/core-asynciterator-polyfill": "^1.0.0", - "rhea": "^2.0.2", + "rhea": "^2.0.3", "tslib": "^2.2.0" }, "//sampleConfiguration": { diff --git a/sdk/servicebus/service-bus/CHANGELOG.md b/sdk/servicebus/service-bus/CHANGELOG.md index 1d22f6cf2102..99e2a24d3f8f 100644 --- a/sdk/servicebus/service-bus/CHANGELOG.md +++ b/sdk/servicebus/service-bus/CHANGELOG.md @@ -1,14 +1,14 @@ # Release History -## 7.3.0 (Unreleased) - +## 7.3.0 (2021-07-06) ### Features Added -- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details. - -### Breaking Changes +- With the dropping of support for Node.js versions that are no longer in LTS, the dependency on `@types/node` has been updated to version 12. Read our [support policy](https://github.com/Azure/azure-sdk-for-js/blob/main/SUPPORT.md) for more details. ### Key Bugs Fixed +- Fixed a bug that could lead to message loss in certain conditions when using `receiver.receiveMessages()`. + [PR#15989](https://github.com/Azure/azure-sdk-for-js/pull/15989) + ### Fixed - Fixing an issue where the internal link cache would not properly remove closed links. diff --git a/sdk/servicebus/service-bus/package.json b/sdk/servicebus/service-bus/package.json index bd84a044cbef..1f4b33d69e0d 100644 --- a/sdk/servicebus/service-bus/package.json +++ b/sdk/servicebus/service-bus/package.json @@ -126,7 +126,7 @@ "long": "^4.0.0", "process": "^0.11.10", "tslib": "^2.2.0", - "rhea-promise": "^2.0.0" + "rhea-promise": "^2.1.0" }, "devDependencies": { "@azure/dev-tool": "^1.0.0", diff --git a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts index 413203ff3d03..e21d125163c1 100644 --- a/sdk/servicebus/service-bus/src/core/batchingReceiver.ts +++ b/sdk/servicebus/service-bus/src/core/batchingReceiver.ts @@ -8,7 +8,7 @@ import { OnAmqpEvent, ReceiverEvents, SessionEvents, - Receiver, + Receiver as RheaPromiseReceiver, Session } from "rhea-promise"; import { ServiceBusMessageImpl } from "../serviceBusMessage"; @@ -191,7 +191,10 @@ export function getRemainingWaitTimeInMsFn( * * @internal */ -type EventEmitterLike = Pick; +type EventEmitterLike = Pick< + T, + "once" | "removeListener" | "on" +>; /** * The bare minimum needed to receive messages for batched @@ -199,8 +202,11 @@ type EventEmitterLike = Pick & - EventEmitterLike & { +export type MinimalReceiver = Pick< + RheaPromiseReceiver, + "name" | "isOpen" | "credit" | "addCredit" | "drain" | "drainCredit" +> & + EventEmitterLike & { session: EventEmitterLike; } & { connection: { @@ -269,6 +275,7 @@ export class BatchingReceiverLite { private _getRemainingWaitTimeInMsFn: typeof getRemainingWaitTimeInMsFn; private _closeHandler: ((connectionError?: AmqpError | Error) => void) | undefined; + private _finalAction: (() => void) | undefined; isReceivingMessages: boolean; @@ -389,16 +396,17 @@ export class BatchingReceiverLite { // - maxMessageCount is reached or // - maxWaitTime is passed or // - newMessageWaitTimeoutInSeconds is passed since the last message was received - const finalAction = (): void => { + this._finalAction = (): void => { + if (receiver.drain) { + // If a drain is already in process then we should let it complete. Some messages might still be in flight, but they will + // arrive before the drain completes. + return; + } + // Drain any pending credits. if (receiver.isOpen() && receiver.credit > 0) { logger.verbose(`${loggingPrefix} Draining leftover credits(${receiver.credit}).`); - - // setting .drain and combining it with .addCredit results in (eventually) sending - // a drain request to Service Bus. When the drain completes rhea will call `onReceiveDrain` - // at which point we'll wrap everything up and resolve the promise. - receiver.drain = true; - receiver.addCredit(1); + receiver.drainCredit(); } else { logger.verbose( `${loggingPrefix} Resolving receiveMessages() with ${brokeredMessages.length} messages.` @@ -429,15 +437,24 @@ export class BatchingReceiverLite { logger.verbose( `${loggingPrefix} Batching, waited for ${remainingWaitTimeInMs} milliseconds after receiving the first message.` ); - finalAction(); + this._finalAction!(); }, remainingWaitTimeInMs); } } try { const data: ServiceBusMessageImpl = this._createServiceBusMessage(context); - if (brokeredMessages.length < args.maxMessageCount) { - brokeredMessages.push(data); + brokeredMessages.push(data); + + // NOTE: we used to actually "lose" any extra messages. At this point I've fixed the areas that were causing us to receive + // extra messages but if this bug arises in some other way it's better to return the message than it would be to let it be + // silently dropped on the floor. + if (brokeredMessages.length > args.maxMessageCount) { + logger.warning( + `More messages arrived than were expected: ${ + args.maxMessageCount + } vs ${brokeredMessages.length + 1}` + ); } } catch (err) { const errObj = err instanceof Error ? err : new Error(JSON.stringify(err)); @@ -448,7 +465,7 @@ export class BatchingReceiverLite { reject(errObj); } if (brokeredMessages.length === args.maxMessageCount) { - finalAction(); + this._finalAction!(); } }; @@ -515,7 +532,7 @@ export class BatchingReceiverLite { logger.verbose( `${loggingPrefix} Batching, waited for max wait time ${args.maxWaitTimeInMs} milliseconds.` ); - finalAction(); + this._finalAction!(); }, args.maxWaitTimeInMs); receiver.on(ReceiverEvents.message, onReceiveMessage); diff --git a/sdk/servicebus/service-bus/src/core/receiverHelper.ts b/sdk/servicebus/service-bus/src/core/receiverHelper.ts index 68c3ea6fd405..80086cc03d73 100644 --- a/sdk/servicebus/service-bus/src/core/receiverHelper.ts +++ b/sdk/servicebus/service-bus/src/core/receiverHelper.ts @@ -132,10 +132,7 @@ export class ReceiverHelper { resolve(); }); - receiver.drain = true; - // this is not actually adding another credit - it'll just - // cause the drain call to start. - receiver.addCredit(1); + receiver.drainCredit(); }); return drainPromise; diff --git a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts index a5ee9c1fa6bc..f86470518e13 100644 --- a/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/batchReceiver.spec.ts @@ -1388,24 +1388,13 @@ function causeDisconnectDuringDrain( throw new Error("No active link for batching receiver"); } - const origAddCredit = link.addCredit; - - // We want to simulate a disconnect once the batching receiver is draining. - // We can detect when the receiver enters a draining state when `addCredit` is - // called while didRequestDrainResolver is called to resolve the promise. - const addCreditThatImmediatelyDetaches = function(credits: number): void { - origAddCredit.call(link, credits); - - if (link.drain && credits === 1) { - // initiate the detach now (prior to any possibilty of the 'drain' call being scheduled) - batchingReceiver - .onDetached(new Error("Test: fake connection failure")) - .then(() => resolveOnDetachedCallPromise()); - } + link["drainCredit"] = () => { + // don't send the drain request, we'll just detach. + batchingReceiver + .onDetached(new Error("Test: fake connection failure")) + .then(() => resolveOnDetachedCallPromise()); }; - link["addCredit"] = addCreditThatImmediatelyDetaches; - return { onDetachedCalledPromise }; diff --git a/sdk/servicebus/service-bus/test/internal/smoketest.spec.ts b/sdk/servicebus/service-bus/test/internal/smoketest.spec.ts index eb2182e64b03..6b5b6f1cf999 100644 --- a/sdk/servicebus/service-bus/test/internal/smoketest.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/smoketest.spec.ts @@ -20,7 +20,10 @@ import { chai.use(chaiAsPromised); const assert = chai.assert; -describe("Sample scenarios for track 2", () => { +/** + * A basic suite that exercises most of the core functionality. + */ +describe("Smoke tests", () => { let serviceBusClient: ServiceBusClientForTests; before(async () => { diff --git a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts index 111a7d1c782b..b22b09bf163b 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/batchingReceiver.spec.ts @@ -19,14 +19,12 @@ import { createAbortSignalForTest } from "../../public/utils/abortSignalTestUtil import { AbortController } from "@azure/abort-controller"; import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage"; import { - Receiver as RheaReceiver, + Receiver as RheaPromiseReceiver, ReceiverEvents, SessionEvents, EventContext, - Message as RheaMessage, - Receiver + Message as RheaMessage } from "rhea-promise"; -import { OnAmqpEventAsPromise } from "../../../src/core/messageReceiver"; import { ConnectionContext } from "../../../src/connectionContext"; import { ServiceBusReceiverImpl } from "../../../src/receivers/receiver"; import { OperationOptionsBase } from "../../../src/modelsToBeSharedWithEventHubs"; @@ -148,7 +146,7 @@ describe("BatchingReceiver unit tests", () => { listeners.add(eventType); } } - } as any) as RheaReceiver; + } as any) as RheaPromiseReceiver; abortController.abort(); }; @@ -190,7 +188,7 @@ describe("BatchingReceiver unit tests", () => { }); it("1. We received 'max messages'", async () => { - const receiver = new BatchingReceiver( + const batchingReceiver = new BatchingReceiver( createConnectionContextForTests(), "dummyEntityPath", { @@ -198,17 +196,15 @@ describe("BatchingReceiver unit tests", () => { lockRenewer: undefined } ); - closeables.push(receiver); + closeables.push(batchingReceiver); - const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver( - receiver - ); + const { receiveIsReady, rheaReceiver } = setupBatchingReceiver(batchingReceiver); - const receivePromise = receiver.receive(1, bigTimeout, bigTimeout, {}); + const receivePromise = batchingReceiver.receive(1, bigTimeout, bigTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... - emitter.emit(ReceiverEvents.message, { + rheaReceiver.emit(ReceiverEvents.message, { message: { body: "the message" } as RheaMessage } as EventContext); @@ -218,7 +214,7 @@ describe("BatchingReceiver unit tests", () => { ["the message"] ); - assert.isEmpty(remainingRegisteredListeners); + assertListenersRemoved(rheaReceiver); }).timeout(5 * 1000); // in the new world the overall timeout firing means we've received _no_ messages @@ -234,19 +230,19 @@ describe("BatchingReceiver unit tests", () => { ); closeables.push(receiver); - const { receiveIsReady, remainingRegisteredListeners } = setupBatchingReceiver(receiver); + const { receiveIsReady, rheaReceiver } = setupBatchingReceiver(receiver); const receivePromise = receiver.receive(1, littleTimeout, bigTimeout, {}); await receiveIsReady; // force the overall timeout to fire - clock.tick(littleTimeout); + clock.tick(littleTimeout + 1); const messages = await receivePromise; assert.isEmpty(messages); - assert.isEmpty(remainingRegisteredListeners); + assertListenersRemoved(rheaReceiver); }).timeout(5 * 1000); // TODO: there's a bug that needs some more investigation where receiveAndDelete loses messages if we're @@ -256,7 +252,7 @@ describe("BatchingReceiver unit tests", () => { (lockMode === "peekLock" ? it : it.skip)( `3a. (with idle timeout) We've received 1 message and _now_ have exceeded 'max wait time past first message'`, async () => { - const receiver = new BatchingReceiver( + const batchingReceiver = new BatchingReceiver( createConnectionContextForTests(), "dummyEntityPath", { @@ -264,18 +260,15 @@ describe("BatchingReceiver unit tests", () => { lockRenewer: undefined } ); - closeables.push(receiver); + closeables.push(batchingReceiver); - const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver( - receiver, - clock - ); + const { receiveIsReady, rheaReceiver } = setupBatchingReceiver(batchingReceiver, clock); - const receivePromise = receiver.receive(3, bigTimeout, littleTimeout, {}); + const receivePromise = batchingReceiver.receive(3, bigTimeout, littleTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... - emitter.emit(ReceiverEvents.message, { + rheaReceiver.emit(ReceiverEvents.message, { message: { body: "the first message" } as RheaMessage } as EventContext); @@ -285,7 +278,7 @@ describe("BatchingReceiver unit tests", () => { // now emit a second message - this second message should _not_ change any existing timers // or start new ones. - emitter.emit(ReceiverEvents.message, { + rheaReceiver.emit(ReceiverEvents.message, { message: { body: "the second message" } as RheaMessage } as EventContext); @@ -298,7 +291,7 @@ describe("BatchingReceiver unit tests", () => { ["the first message", "the second message"] ); - assert.isEmpty(remainingRegisteredListeners); + assertListenersRemoved(rheaReceiver); } ).timeout(5 * 1000); @@ -307,7 +300,7 @@ describe("BatchingReceiver unit tests", () => { // the duration of time given (or max messages) with no idle timer. // When we eliminate that bug we can remove this test in favor of the idle timeout test above. (lockMode === "receiveAndDelete" ? it : it.skip)(`3b. (without idle timeout)`, async () => { - const receiver = new BatchingReceiver( + const batchingReceiver = new BatchingReceiver( createConnectionContextForTests(), "dummyEntityPath", { @@ -315,17 +308,15 @@ describe("BatchingReceiver unit tests", () => { lockRenewer: undefined } ); - closeables.push(receiver); + closeables.push(batchingReceiver); - const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver( - receiver - ); + const { receiveIsReady, rheaReceiver } = setupBatchingReceiver(batchingReceiver); - const receivePromise = receiver.receive(3, bigTimeout, littleTimeout, {}); + const receivePromise = batchingReceiver.receive(3, bigTimeout, littleTimeout, {}); await receiveIsReady; // batch fulfillment is checked when we receive a message... - emitter.emit(ReceiverEvents.message, { + rheaReceiver.emit(ReceiverEvents.message, { message: { body: "the first message" } as RheaMessage @@ -337,7 +328,7 @@ describe("BatchingReceiver unit tests", () => { // ...and emit another message _after_ the idle timer would have fired. Now when we advance // the time all the way.... - emitter.emit(ReceiverEvents.message, { + rheaReceiver.emit(ReceiverEvents.message, { message: { body: "the second message" } as RheaMessage @@ -353,7 +344,7 @@ describe("BatchingReceiver unit tests", () => { ["the first message", "the second message"] ); - assert.isEmpty(remainingRegisteredListeners); + assertListenersRemoved(rheaReceiver); }).timeout(5 * 1000); // TODO: there's a bug that needs some more investigation where receiveAndDelete loses messages if we're @@ -363,7 +354,7 @@ describe("BatchingReceiver unit tests", () => { (lockMode === "peekLock" ? it : it.skip)( "4. sanity check that we're using getRemainingWaitTimeInMs", async () => { - const receiver = new BatchingReceiver( + const batchingReceiver = new BatchingReceiver( createConnectionContextForTests(), "dummyEntityPath", { @@ -371,10 +362,10 @@ describe("BatchingReceiver unit tests", () => { lockRenewer: undefined } ); - closeables.push(receiver); + closeables.push(batchingReceiver); - const { receiveIsReady, emitter, remainingRegisteredListeners } = setupBatchingReceiver( - receiver, + const { receiveIsReady, rheaReceiver: emitter } = setupBatchingReceiver( + batchingReceiver, clock ); @@ -382,7 +373,7 @@ describe("BatchingReceiver unit tests", () => { const arbitraryAmountOfTimeInMs = 40; - receiver["_batchingReceiverLite"]["_getRemainingWaitTimeInMsFn"] = ( + batchingReceiver["_batchingReceiverLite"]["_getRemainingWaitTimeInMsFn"] = ( maxWaitTimeInMs: number, maxTimeAfterFirstMessageMs: number ) => { @@ -398,7 +389,7 @@ describe("BatchingReceiver unit tests", () => { }; }; - const receivePromise = receiver.receive(3, bigTimeout + 1, bigTimeout + 2, {}); + const receivePromise = batchingReceiver.receive(3, bigTimeout + 1, bigTimeout + 2, {}); await receiveIsReady; emitter.emit(ReceiverEvents.message, { @@ -417,7 +408,7 @@ describe("BatchingReceiver unit tests", () => { assert.isTrue(wasCalled); - assert.isEmpty(remainingRegisteredListeners); + assertListenersRemoved(emitter); } ); @@ -426,17 +417,11 @@ describe("BatchingReceiver unit tests", () => { clock?: ReturnType ): { receiveIsReady: Promise; - emitter: EventEmitter; - remainingRegisteredListeners: Set; + rheaReceiver: RheaPromiseReceiver; } { - const { - fakeRheaReceiver, - emitter, - remainingRegisteredListeners, - receiveIsReady - } = createFakeReceiver(clock); + const rheaReceiver = createFakeReceiver(clock); - batchingReceiver["_link"] = fakeRheaReceiver; + batchingReceiver["_link"] = rheaReceiver; batchingReceiver["_batchingReceiverLite"]["_createServiceBusMessage"] = (eventContext) => { return { @@ -444,86 +429,47 @@ describe("BatchingReceiver unit tests", () => { } as ServiceBusMessageImpl; }; + const receiveIsReady = getReceiveIsReadyPromise(batchingReceiver["_batchingReceiverLite"]); + return { receiveIsReady, - emitter, - remainingRegisteredListeners + rheaReceiver }; } }); }); - function createFakeReceiver( - clock?: ReturnType - ): { - receiveIsReady: Promise; - emitter: EventEmitter; - remainingRegisteredListeners: Set; - fakeRheaReceiver: Receiver; - } { - const emitter = new EventEmitter(); - const { promise: receiveIsReady, resolve: resolvePromiseIsReady } = defer(); - let credits = 0; - - const remainingRegisteredListeners = new Set(); - - const fakeRheaReceiver = { - on(evt: ReceiverEvents, handler: OnAmqpEventAsPromise) { - emitter.on(evt, handler); - - if (evt === ReceiverEvents.message) { - --credits; - } + function createFakeReceiver(clock?: ReturnType): RheaPromiseReceiver { + const fakeRheaReceiver = new EventEmitter() as RheaPromiseReceiver; + fakeRheaReceiver.drain = false; - assert.isFalse(remainingRegisteredListeners.has(evt.toString())); - remainingRegisteredListeners.add(evt.toString()); - }, - removeListener(evt: ReceiverEvents, handler: OnAmqpEventAsPromise) { - remainingRegisteredListeners.delete(evt.toString()); - emitter.removeListener(evt, handler); - }, - session: { - on(evt: SessionEvents, handler: OnAmqpEventAsPromise) { - emitter.on(evt, handler); - - if (evt === SessionEvents.sessionClose) { - // this also happens to be the final thing the Promise does - // as part of it's initialization. - resolvePromiseIsReady(); - } + let credit = 0; - assert.isFalse(remainingRegisteredListeners.has(evt.toString())); - remainingRegisteredListeners.add(evt.toString()); - }, - removeListener(evt: SessionEvents, handler: OnAmqpEventAsPromise) { - remainingRegisteredListeners.delete(evt.toString()); - emitter.removeListener(evt, handler); - } - }, - isOpen: () => true, - addCredit: (_credits: number) => { - if (_credits === 1 && fakeRheaReceiver.drain === true) { - // special case - if we're draining we should initiate a drain - emitter.emit(ReceiverEvents.receiverDrained, undefined); - clock?.runAll(); - } else { - credits += _credits; - } - }, - get credit() { - return credits; - }, - connection: { - id: "connection-id" - } - } as RheaReceiver; + fakeRheaReceiver.on(ReceiverEvents.message, function creditRemoverForTests() { + --credit; + }); + (fakeRheaReceiver as any).session = new EventEmitter(); + + fakeRheaReceiver["isOpen"] = () => true; + fakeRheaReceiver["addCredit"] = (_credit: number) => { + credit += _credit; + }; - return { - receiveIsReady, - emitter, - remainingRegisteredListeners, - fakeRheaReceiver + fakeRheaReceiver["drainCredit"] = () => { + fakeRheaReceiver.drain = true; + fakeRheaReceiver.emit(ReceiverEvents.receiverDrained, undefined); + clock?.runAll(); }; + + Object.defineProperty(fakeRheaReceiver, "credit", { + get: () => credit + }); + + (fakeRheaReceiver as any)["connection"] = { + id: "connection-id" + }; + + return fakeRheaReceiver; } describe("getRemainingWaitTimeInMs", () => { @@ -572,9 +518,9 @@ describe("BatchingReceiver unit tests", () => { }); it("isReceivingMessages is properly set and unset when receiving operations run", async () => { - const { fakeRheaReceiver, receiveIsReady } = createFakeReceiver(); + const fakeRheaReceiver = createFakeReceiver(); - const receiver = new BatchingReceiverLite( + const batchingReceiver = new BatchingReceiverLite( createConnectionContextForTests(), "fakeEntityPath", async () => { @@ -583,27 +529,28 @@ describe("BatchingReceiver unit tests", () => { "peekLock" ); - assert.isFalse(receiver.isReceivingMessages); + assert.isFalse(batchingReceiver.isReceivingMessages); + const receiveIsReady = getReceiveIsReadyPromise(batchingReceiver); - const prm = receiver.receiveMessages({ + const prm = batchingReceiver.receiveMessages({ maxMessageCount: 1, - maxTimeAfterFirstMessageInMs: 1, - maxWaitTimeInMs: 1 + maxTimeAfterFirstMessageInMs: 20, + maxWaitTimeInMs: 10 }); - assert.isTrue(receiver.isReceivingMessages); + assert.isTrue(batchingReceiver.isReceivingMessages); await receiveIsReady; - await clock.tick(1); + await clock.tick(10 + 1); await prm; - assert.isFalse(receiver.isReceivingMessages); + assert.isFalse(batchingReceiver.isReceivingMessages); }); it("batchingReceiverLite.close(actual-error) - throws the error from the current receiverMessages() call", async () => { - const { fakeRheaReceiver, receiveIsReady } = createFakeReceiver(); + const fakeRheaReceiver = createFakeReceiver(); - const receiver = new BatchingReceiverLite( + const batchingReceiver = new BatchingReceiverLite( {} as ConnectionContext, "fakeEntityPath", async () => { @@ -612,18 +559,20 @@ describe("BatchingReceiver unit tests", () => { "peekLock" ); - assert.notExists(receiver["_closeHandler"]); + assert.notExists(batchingReceiver["_closeHandler"]); - const receiveMessagesPromise = receiver.receiveMessages({ + const receiveIsReady = getReceiveIsReadyPromise(batchingReceiver); + + const receiveMessagesPromise = batchingReceiver.receiveMessages({ maxMessageCount: 1, maxTimeAfterFirstMessageInMs: 1, maxWaitTimeInMs: 1 }); await receiveIsReady; - assert.exists(receiver["_closeHandler"]); + assert.exists(batchingReceiver["_closeHandler"]); - await receiver.terminate(new Error("actual error")); + await batchingReceiver.terminate(new Error("actual error")); try { await receiveMessagesPromise; @@ -634,9 +583,9 @@ describe("BatchingReceiver unit tests", () => { }); it("batchingReceiverLite.close() (ie, no error) just shuts down the current operation with no error", async () => { - const { fakeRheaReceiver } = createFakeReceiver(); + const fakeRheaReceiver = createFakeReceiver(); - const receiver = new BatchingReceiverLite( + const batchingReceiver = new BatchingReceiverLite( createConnectionContextForTests(), "fakeEntityPath", async () => { @@ -645,13 +594,13 @@ describe("BatchingReceiver unit tests", () => { "peekLock" ); - assert.notExists(receiver["_closeHandler"]); + assert.notExists(batchingReceiver["_closeHandler"]); let resolveWasCalled = false; let rejectWasCalled = false; - receiver["_receiveMessagesImpl"]( - (await receiver["_getCurrentReceiver"]())!, + batchingReceiver["_receiveMessagesImpl"]( + (await batchingReceiver["_getCurrentReceiver"]())!, { maxMessageCount: 1, maxTimeAfterFirstMessageInMs: 1, @@ -665,11 +614,11 @@ describe("BatchingReceiver unit tests", () => { } ); - assert.exists(receiver["_closeHandler"]); + assert.exists(batchingReceiver["_closeHandler"]); assert.isFalse(resolveWasCalled); assert.isFalse(rejectWasCalled); - receiver.terminate(); + batchingReceiver.terminate(); // these are still false because we used setTimeout() (and we're using sinon) // so the clock is "frozen" @@ -682,12 +631,71 @@ describe("BatchingReceiver unit tests", () => { assert.isTrue(resolveWasCalled); assert.isFalse(rejectWasCalled); }); + + it("finalAction prevents multiple concurrent drain calls", async () => { + // there are unintended side effects if multiple drains are requested (ie - you start to get + // mismatches between responses, resulting in this error message ("Received transfer + // when credit was 0") bring printed by rhea. + const fakeRheaReceiver = createFakeReceiver(); + + const batchingReceiverLite = new BatchingReceiverLite( + createConnectionContextForTests(), + "fakeEntityPath", + async () => { + return fakeRheaReceiver; + }, + "peekLock" + ); + + batchingReceiverLite["_receiveMessagesImpl"]( + fakeRheaReceiver, + { + maxMessageCount: 2, + maxTimeAfterFirstMessageInMs: 1, + maxWaitTimeInMs: 1 + }, + () => {}, + () => {} + ); + + assert.equal( + fakeRheaReceiver.credit, + 2, + "No messages received, nothing drained, should have all the credits from the start." + ); + + const finalAction = batchingReceiverLite["_finalAction"]; + + if (!finalAction) { + throw new Error("No finalAction defined!"); + } + + fakeRheaReceiver.removeAllListeners(ReceiverEvents.receiverDrained); + + // the first call (when there are no received messages) will initiate a drain + assert.isFalse(fakeRheaReceiver.drain); + + const drainCreditSpy = sinon.spy(fakeRheaReceiver, "drainCredit"); + + finalAction(); + + assert.isTrue(drainCreditSpy.calledOnceWith()); + + // also our fix should leave our # of credits untouched (ie, no +1 effect) + assert.equal(fakeRheaReceiver.credit, 2); + + drainCreditSpy.resetHistory(); + + // subsequent calls will not initiate drains. + finalAction(); + assert.isTrue(drainCreditSpy.notCalled); + }); }); it("drain doesn't resolve before message callbacks have completed", async () => { - const { fakeRheaReceiver, emitter, receiveIsReady } = createFakeReceiver(); + const fakeRheaReceiver = createFakeReceiver(); - const receiver = new BatchingReceiverLite( + const batchingReceiverLite = new BatchingReceiverLite( createConnectionContextForTests(), "fakeEntityPath", async () => { @@ -696,7 +704,9 @@ describe("BatchingReceiver unit tests", () => { "peekLock" ); - const receiveMessagesPromise = receiver + const receiveIsReady = getReceiveIsReadyPromise(batchingReceiverLite); + + const receiveMessagesPromise = batchingReceiverLite .receiveMessages({ maxMessageCount: 3, maxTimeAfterFirstMessageInMs: 5000, @@ -740,7 +750,7 @@ describe("BatchingReceiver unit tests", () => { // us to enter into the same task queue as all the message callbacks, and makes it so everything occurs in the // right order. setTimeout(() => { - emitter.emit(ReceiverEvents.message, { + fakeRheaReceiver.emit(ReceiverEvents.message, { message: { body: "the first message", message_annotations: { @@ -750,10 +760,61 @@ describe("BatchingReceiver unit tests", () => { } as EventContext); }); - emitter.emit(ReceiverEvents.receiverDrained, {} as EventContext); + fakeRheaReceiver.emit(ReceiverEvents.receiverDrained, {} as EventContext); const results = await receiveMessagesPromise; assert.equal(1, results.length); }); }); + +function getReceiveIsReadyPromise(batchingReceiverLite: BatchingReceiverLite): Promise { + // receiveMessagesImpl is the 'non-async' method that sets up the receiver and adds credits. So it's a + // perfect method to hook into to test the internals of the BatchingReceiver(Lite) + const orig = batchingReceiverLite["_receiveMessagesImpl"]; + const { resolve, promise } = defer(); + + batchingReceiverLite["_receiveMessagesImpl"] = (...args) => { + orig.call(batchingReceiverLite, ...args); + resolve(); + }; + + return promise; +} + +function assertListenersRemoved(rheaReceiver: RheaPromiseReceiver): void { + const shouldBeEmpty = [ + ReceiverEvents.receiverClose, + ReceiverEvents.receiverDrained, + ReceiverEvents.receiverError, + ReceiverEvents.receiverFlow, + ReceiverEvents.receiverOpen, + ReceiverEvents.settled, + SessionEvents.sessionClose, + SessionEvents.sessionError, + SessionEvents.sessionOpen, + SessionEvents.settled + ]; + + // we add a little credit remover for our tests. Ignore it. + assert.isEmpty( + rheaReceiver + .listeners(ReceiverEvents.message) + .filter((f) => f.name !== "creditRemoverForTests"), + `No listeners (aside from the test credit remover) should be registered for ${ReceiverEvents.message}` + ); + + for (const eventName of shouldBeEmpty) { + assert.isEmpty( + rheaReceiver.listeners(eventName), + `No listeners should be registered for ${eventName} on the receiver` + ); + assert.isEmpty( + rheaReceiver.session.listeners(eventName), + `No listeners should be registered for ${eventName} on the receiver.session` + ); + } + + // check the session as well + rheaReceiver.session; +} diff --git a/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts b/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts index b53c0a15a70d..b915d982377a 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/messageSession.spec.ts @@ -14,10 +14,10 @@ import sinon, { SinonSpy } from "sinon"; import { EventEmitter } from "events"; import { ReceiverEvents, - Receiver as RheaReceiver, EventContext, Message as RheaMessage, - SessionEvents + SessionEvents, + Receiver as RheaPromiseReceiver } from "rhea-promise"; import { OnAmqpEventAsPromise } from "../../../src/core/messageReceiver"; import { ServiceBusMessageImpl } from "../../../src/serviceBusMessage"; @@ -280,16 +280,16 @@ describe("Message session unit tests", () => { } { const emitter = new EventEmitter(); const { promise: receiveIsReady, resolve: resolvePromiseIsReady } = defer(); - let credits = 0; const remainingRegisteredListeners = new Set(); + let credit = 0; const fakeRheaReceiver = { on(evt: ReceiverEvents, handler: OnAmqpEventAsPromise) { emitter.on(evt, handler); if (evt === ReceiverEvents.message) { - --credits; + --credit; } assert.isFalse(remainingRegisteredListeners.has(evt.toString())); @@ -318,22 +318,20 @@ describe("Message session unit tests", () => { } }, isOpen: () => true, - addCredit: (_credits: number) => { - if (_credits === 1 && fakeRheaReceiver.drain === true) { - // special case - if we're draining we should initiate a drain - emitter.emit(ReceiverEvents.receiverDrained, undefined); - clock?.runAll(); - } else { - credits += _credits; - } + addCredit: (_credit: number) => { + credit += _credit; + }, + drainCredit: () => { + emitter.emit(ReceiverEvents.receiverDrained, undefined); + clock?.runAll(); }, get credit() { - return credits; + return credit; }, connection: { id: "connection-id" } - } as RheaReceiver; + } as RheaPromiseReceiver; batchingReceiver["_link"] = fakeRheaReceiver; diff --git a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts index 5453faf70643..67cf7eab4c97 100644 --- a/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts +++ b/sdk/servicebus/service-bus/test/internal/unit/unittestUtils.ts @@ -4,7 +4,7 @@ import { ConnectionContext } from "../../../src/connectionContext"; import { AwaitableSender, - Receiver as RheaReceiver, + Receiver as RheaPromiseReceiver, ReceiverEvents, ReceiverOptions } from "rhea-promise"; @@ -21,7 +21,7 @@ export interface CreateConnectionContextForTestsOptions { host?: string; entityPath?: string; onCreateAwaitableSenderCalled?: () => void; - onCreateReceiverCalled?: (receiver: RheaReceiver) => void; + onCreateReceiverCalled?: (receiver: RheaPromiseReceiver) => void; } /** @@ -79,7 +79,7 @@ export function createConnectionContextForTests( return testAwaitableSender; }, - createReceiver: async (): Promise => { + createReceiver: async (): Promise => { const receiver = createRheaReceiverForTests(); if (options?.onCreateReceiverCalled) { @@ -165,8 +165,8 @@ export function createConnectionContextForTestsWithSessionId( * - It handles draining (via the .drain = true/addCredit(1) combo of operations). * - It respects .close(), so the state of the receiver should be accurate for isOpen(). */ -export function createRheaReceiverForTests(options?: ReceiverOptions): RheaReceiver { - const receiver = new EventEmitter() as RheaReceiver; +export function createRheaReceiverForTests(options?: ReceiverOptions): RheaPromiseReceiver { + const receiver = new EventEmitter() as RheaPromiseReceiver; (receiver as any).name = options?.name == null ? getUniqueName("entity") : options.name; @@ -174,6 +174,23 @@ export function createRheaReceiverForTests(options?: ReceiverOptions): RheaRecei id: "connection-id" }; + const link = { + credit: 0, + drain_credit(): void { + // simulate drain + (receiver as any).credit = 0; + receiver.emit(ReceiverEvents.receiverDrained, undefined); + } + }; + + (receiver as any)["_link"] = link; + + receiver.drain = false; + + (receiver as any)["drainCredit"] = () => { + link.drain_credit(); + }; + (receiver as any).addCredit = (credit: number) => { if (!receiver.isOpen()) { throw new Error("TEST INCONSISTENCY: trying to .addCredit() to a closed receiver"); @@ -184,11 +201,6 @@ export function createRheaReceiverForTests(options?: ReceiverOptions): RheaRecei } (receiver as any).credit += credit; - - if (credit === 1 && receiver.drain) { - (receiver as any).credit = 0; - receiver.emit(ReceiverEvents.receiverDrained, undefined); - } }; mockLinkProperties(receiver); diff --git a/sdk/servicebus/service-bus/test/stress/scenarioLongRunning.ts b/sdk/servicebus/service-bus/test/stress/scenarioLongRunning.ts index d1f1abc22766..e848ea1a463b 100644 --- a/sdk/servicebus/service-bus/test/stress/scenarioLongRunning.ts +++ b/sdk/servicebus/service-bus/test/stress/scenarioLongRunning.ts @@ -1,5 +1,10 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + import { captureConsoleOutputToAppInsights, + createServiceBusClient, + loopForever as loopInfinitely, ServiceBusStressTester } from "./serviceBusStressTester"; import { AbortController, AbortSignalLike } from "@azure/abort-controller"; @@ -8,14 +13,6 @@ import { v4 as uuidv4 } from "uuid"; captureConsoleOutputToAppInsights(); -async function looper(fn: () => Promise, delay: number, abortSignal: AbortSignalLike) { - const timeout = () => new Promise((resolve) => setTimeout(() => resolve(true), delay)); - - while (!abortSignal.aborted && (await timeout())) { - await fn(); - } -} - async function sendMessagesForever( stressTest: ServiceBusStressTester, clientForSender: ServiceBusClient, @@ -25,7 +22,7 @@ async function sendMessagesForever( let sender: ServiceBusSender | undefined; - return looper( + return loopInfinitely( async () => { if (abortSignal.aborted) { console.log(`Aborting sending because of abortSignal`); @@ -48,7 +45,7 @@ async function sendMessagesForever( await sender.sendMessages(messagesToSend); } catch (err) { console.log(`Sending message failed: `, err); - stressTest.trackError("send", err); + stressTest.trackError("send", err as Error); sender = undefined; } }, @@ -67,7 +64,7 @@ async function main() { }); const operation = async () => { - const clientForReceiver = stressTest.createServiceBusClient(); + const clientForReceiver = createServiceBusClient(); const receiver = clientForReceiver.createReceiver(stressTest.queueName, { receiveMode: "peekLock" @@ -92,7 +89,7 @@ async function main() { } ); - const clientForSender = stressTest.createServiceBusClient(); + const clientForSender = createServiceBusClient(); await sendMessagesForever(stressTest, clientForSender, abortSignal); }; diff --git a/sdk/servicebus/service-bus/test/stress/scenarioShortLivedReceivers.ts b/sdk/servicebus/service-bus/test/stress/scenarioShortLivedReceivers.ts new file mode 100644 index 000000000000..800e1f6f28f0 --- /dev/null +++ b/sdk/servicebus/service-bus/test/stress/scenarioShortLivedReceivers.ts @@ -0,0 +1,347 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { + captureConsoleOutputToAppInsights, + createRandomQueue, + createServiceBusClient, + getUniqueQueueName, + isReceiveMode +} from "./serviceBusStressTester"; +import { defaultClient as appInsightsClient, Contracts } from "applicationinsights"; +import { + ServiceBusClient, + ServiceBusReceivedMessage, + ServiceBusReceiver +} from "@azure/service-bus"; +import { EventEmitter } from "stream"; +import { EventContext, ReceiverEvents } from "rhea-promise"; +import parsedArgs from "minimist"; +import { generateUuid } from "@azure/core-http"; + +const messageNumberPropertyName = "messageNumber"; + +/** + * This test is checking for a few boundary/edge conditions that we've had in the library when + * receiving messages over longer periods of time with receiveMessages(). The bugs would typically + * result in message loss, as well as rhea printing out 'Received transfer when credit was 0'. + */ +async function main() { + captureConsoleOutputToAppInsights(); + + appInsightsClient.commonProperties = { + // these will be reported with each event + testName: "scenarioShortLivedReceiver", + testRunId: generateUuid() + }; + + const { receiveMode, maxWaitTimeInMs, numMessagesToSend, messagesPerReceive } = { + ...parsedArgs<{ + receiveMode: string; + maxWaitTimeInMs: number; + numMessagesToSend: number; + messagesPerReceive: number; + }>(process.argv, { + default: { + receiveMode: "peekLock", + + // there's nothing particularly special about these numbers but they do a decent job of provoking the bug + // when targeted to a Service Bus in AUS, connecting from a consumer network in Redmond. + maxWaitTimeInMs: 500, + numMessagesToSend: 1000, + messagesPerReceive: 5 + } + }) + }; + + try { + const queueName = getUniqueQueueName(); + + appInsightsClient.trackEvent({ + name: "start", + properties: { + queueName, + receiveMode, + maxWaitTimeInMs, + numMessagesToSend, + messagesPerReceive + } + }); + + if (!isReceiveMode(receiveMode)) { + throw new TypeError(`Invalid receive mode: ${receiveMode}`); + } + + console.log(`Test run ID(${appInsightsClient.commonProperties.testRunId!})`, { + queueName, + receiveMode, + maxWaitTimeInMs, + numMessagesToSend, + messagesPerReceive + }); + + await createRandomQueue(queueName); + + // create our entity + const serviceBusClient = createServiceBusClient(); + + const receiver = serviceBusClient.createReceiver(queueName, { + receiveMode, + // auto lock renewal is just noise for this particular test, disabling. + maxAutoLockRenewalDurationInMs: 0 + }); + + const rheaMessageNumbers = new Set(); + const userMessageNumbers = new Set(); + + await addValidatingListener(receiver, rheaMessageNumbers); + + await sendTestMessages(serviceBusClient, queueName, numMessagesToSend); + + console.log(`Starting receiver...`); + + // this is just a fail-safe so we don't run forever if we somehow don't get all the messages. + let gotZeroMessagesCounter = 0; + + while (userMessageNumbers.size < numMessagesToSend && gotZeroMessagesCounter < 3) { + const messages = await receiver.receiveMessages(messagesPerReceive, { + maxWaitTimeInMs + }); + + if (messages.length === 0) { + ++gotZeroMessagesCounter; + } + + for (const message of messages) { + assertAndAddMessageNumber(message, userMessageNumbers); + + if (receiveMode === "peekLock") { + await receiver.completeMessage(message); + } + } + + console.log(`Total: ${userMessageNumbers.size} messages`); + + appInsightsClient.trackMetric({ + name: "totalReceivedMessages", + value: userMessageNumbers.size + }); + } + + await receiver.close(); + await serviceBusClient.close(); + + // validate nothing is missing + let missingUserVisibleMessages = 0; + let missingInternalMessages = 0; + + for (let i = 0; i < numMessagesToSend; ++i) { + if (!userMessageNumbers.has(i)) { + missingUserVisibleMessages++; + } + + if (!rheaMessageNumbers.has(i)) { + missingInternalMessages++; + } + } + + appInsightsClient.trackMetric({ + name: "totalMissingUserVisibleMessages", + value: missingUserVisibleMessages + }); + + appInsightsClient.trackMetric({ + name: "totalMissingInternalMessages", + value: missingInternalMessages + }); + + if (missingUserVisibleMessages > 0 || missingInternalMessages > 0) { + console.log( + `Messages were missing: user:${missingUserVisibleMessages}, internal:${missingInternalMessages}` + ); + process.exit(1); + } else { + console.log(`Success - all messages accounted for with no duplicates.`); + process.exit(0); + } + } catch (err) { + console.log(`Exception thrown: `, err); + + appInsightsClient.trackException({ + exception: err as any + }); + } finally { + appInsightsClient.trackEvent({ + name: "End" + }); + + appInsightsClient.flush(); + } + + function assertAndAddMessageNumber( + message: ServiceBusReceivedMessage, + receivedMessageIndices: Set + ) { + const messageNumber = message.applicationProperties?.[messageNumberPropertyName]; + + if (messageNumber == null) { + console.log(`Message with id of ${message.messageId} did not have a messageNumber`); + throw new Error(`Message with id of ${message.messageId} did not have a messageNumber`); + } + + if (typeof messageNumber !== "number") { + console.log( + `Message with id of ${ + message.messageId + } had a messageNumber property with an incorrect type (${typeof messageNumber})` + ); + throw new TypeError( + `Message with id of ${ + message.messageId + } had a messageNumber property with an incorrect type (${typeof messageNumber})` + ); + } + + if (receivedMessageIndices.has(messageNumber)) { + console.log( + `Message with id of ${message.messageId} and message number ${messageNumber} has already been received` + ); + throw new Error( + `Message with id of ${message.messageId} and message number ${messageNumber} has already been received` + ); + } + + receivedMessageIndices.add(messageNumber); + } +} + +main().catch((err) => { + console.log(`Fatal error, exiting...`, err); + process.exit(1); +}); + +/** + * Adds in (through undocumented means) an event listener for messages. This is meant to be a simple check + * that we're not somehow losing messages that were actually delivered through rhea but not surfaced to the + * caller of our API. + * + * NOTE: This method does a single receive, so we can add in our batching receiver hook. The queue should be empty or + * else it could result in message loss. + * + * ADDITIONAL NOTE: this method (and it's associated message listener) will terminate the test if it detects these conditions: + * - Message received when the queue should have been empty (ie, initial call) + * - Batching receiver not properly initialized (ie: internal details have changed and broken us) + * - Duplicate messages are arriving (ie: all assumptions are wrong) + * + * @param receiver A receiver. + * @param rawMessageNumbers A set to add the 'messageNumber' property value to. + */ +async function addValidatingListener( + receiver: ServiceBusReceiver, + rawMessageNumbers: Set +): Promise { + // warm up the receiver so the batching receiver will be available (and we can install our 'raw messages' hook + // for some bookkeeping. + const ignoredMessages = await receiver.receiveMessages(1); + + if (ignoredMessages.length > 0) { + // the queue should start off empty! + console.log("Got messages when the queue should have been empty"); + throw new Error("Got messages when the queue should have been empty"); + } + + const linkEntity = (receiver as any)?.["_batchingReceiver"]?.["_link"] as EventEmitter; + + if (linkEntity == null) { + console.log( + "[raw message callback] Couldn't get a receiver._batchingReceiver._link property in the passed in receiver" + ); + process.exit(1); + } + + linkEntity.addListener(ReceiverEvents.message, (eventContext: EventContext) => { + const message = eventContext.message; + + if (message == null) { + console.log( + "[raw message callback] Fatal test error - no message was on EventContext, but we got a message callback." + ); + process.exit(1); + } + + const messageNumber = message?.application_properties?.[messageNumberPropertyName]; + + if (messageNumber == null || typeof messageNumber !== "number") { + console.log( + `[raw message callback] Fatal test error - message arrived, but without the '${messageNumberPropertyName}' property, type: ${typeof messageNumber}` + ); + process.exit(1); + } + + if (rawMessageNumbers.has(messageNumber)) { + console.log( + `[raw message callback] Fatal test error - ${messageNumber} was already received - we're receiving duplicates in our raw message callback` + ); + process.exit(1); + } + + rawMessageNumbers.add(messageNumber); + }); +} + +/** + * Sends `numMessagesToSend` messages that are 1000 bytes apiece. Each message + * will have an `messageNumber` application property, which will be unique + * for each message sent in this batch. + */ +async function sendTestMessages( + serviceBusClient: ServiceBusClient, + queueName: string, + numMessagesToSend: number +): Promise { + console.log(`Starting to send ${numMessagesToSend} messages to ${queueName}`); + + const sender = serviceBusClient.createSender(queueName); + + try { + let batch = await sender.createMessageBatch(); + + const largeMessagePayload = new Array(1000).fill("a", 0); + + for (let i = 0; i < numMessagesToSend; ++i) { + const message = { + body: largeMessagePayload, + applicationProperties: { + messageNumber: i + } + }; + + const added = batch.tryAddMessage(message); + + if (!added) { + await sender.sendMessages(batch); + batch = await sender.createMessageBatch(); + + if (!batch.tryAddMessage(message)) { + console.log("Message was too big to fit in the array and can NEVER fit"); + throw new Error("Message was too big to fit in the array and can NEVER fit"); + } + } + } + + if (batch?.count > 0) { + await sender.sendMessages(batch); + } + + console.log(`Done sending messages to ${queueName}`); + } catch (err) { + console.log(`Exception thrown: `, err); + + appInsightsClient.trackException({ + exception: err as Error, + severity: Contracts.SeverityLevel.Critical + }); + } finally { + await sender.close(); + } +} diff --git a/sdk/servicebus/service-bus/test/stress/serviceBusStressTester.ts b/sdk/servicebus/service-bus/test/stress/serviceBusStressTester.ts index 90cc3ca9f05d..7dc825bae994 100644 --- a/sdk/servicebus/service-bus/test/stress/serviceBusStressTester.ts +++ b/sdk/servicebus/service-bus/test/stress/serviceBusStressTester.ts @@ -4,6 +4,7 @@ import { ProcessErrorArgs, ServiceBusAdministrationClient, ServiceBusClient, + ServiceBusClientOptions, ServiceBusMessage, ServiceBusReceivedMessage, ServiceBusReceiver, @@ -21,17 +22,20 @@ import { SnapshotOptions, TrackedMessageIdsInfo } from "./utils"; -import * as appInsights from "applicationinsights"; +import * as appInsights from "applicationinsights"; import * as dotenv from "dotenv"; +import { AbortSignalLike } from "@azure/abort-controller"; + dotenv.config(); appInsights .setup() + .setAutoCollectConsole(true) .setUseDiskRetryCaching(true) .start(); -export const defaultClient = appInsights.defaultClient; +const defaultClient = appInsights.defaultClient; export interface StressTestInitOptions { /** @@ -109,19 +113,6 @@ export class ServiceBusStressTester { this.snapshotTimer = setInterval(this.snapshot.bind(this), snapshotIntervalMs); } - /** - * Creates a ServiceBusClient using the connection string in the SERVICEBUS_CONNECTION_STRING environment variable. - */ - public createServiceBusClient(): ServiceBusClient { - if (!process.env.SERVICEBUS_CONNECTION_STRING) { - throw new Error( - "Failed to create a ServiceBusClient - no connection string defined in the environment" - ); - } - - return new ServiceBusClient(process.env.SERVICEBUS_CONNECTION_STRING); - } - private async _init(options?: StressTestInitOptions) { console.log(`[BEGIN]: Initializing...`); this.queueName = `queue` + `-${Math.ceil(Math.random() * 100000)}`; @@ -140,11 +131,7 @@ export class ServiceBusStressTester { } }); - await this.serviceBusAdministrationClient.createQueue( - this.queueName, - options?.createQueueOptions - ); - + await createRandomQueue(this.queueName, options?.createQueueOptions); console.log(`[END]: Initializing...`); } @@ -561,13 +548,12 @@ export class ServiceBusStressTester { try { try { // Define connection string and related Service Bus entity names here - const connectionString = process.env.SERVICEBUS_CONNECTION_STRING || ""; - serviceBusClient = new ServiceBusClient(connectionString); + serviceBusClient = createServiceBusClient(); await this._init(initOptions); } catch (err) { console.log(`ERROR: error thrown by init`, err); - this.trackError("init", err); + this.trackError("init", err as Error); defaultClient.flush(); throw err; } @@ -578,7 +564,7 @@ export class ServiceBusStressTester { } catch (err) { console.log(`ERROR: error thrown by test`, err); - this.trackError("test", err); + this.trackError("test", err as Error); defaultClient.flush(); } } finally { @@ -588,7 +574,7 @@ export class ServiceBusStressTester { await serviceBusClient?.close(); } catch (err) { defaultClient.trackException({ - exception: err, + exception: err as Error, properties: { from: "end" } @@ -600,3 +586,55 @@ export class ServiceBusStressTester { } } } + +export function getUniqueQueueName(): string { + return `queue` + `-${Math.ceil(Math.random() * 100000)}`; +} + +export async function createRandomQueue( + queueName: string, + queueOptions?: CreateQueueOptions +): Promise { + const serviceBusAdministrationClient = createAdminClient(); + await serviceBusAdministrationClient.createQueue(queueName, queueOptions); +} + +export function createAdminClient() { + const connectionString = process.env.SERVICEBUS_CONNECTION_STRING; + + if (!connectionString) { + throw new Error("SERVICEBUS_CONNECTION_STRING not defined in the environment!"); + } + + const serviceBusAdministrationClient = new ServiceBusAdministrationClient(connectionString); + return serviceBusAdministrationClient; +} + +export function createServiceBusClient(options?: ServiceBusClientOptions): ServiceBusClient { + const connectionString = process.env.SERVICEBUS_CONNECTION_STRING; + + if (!connectionString) { + throw new Error("SERVICEBUS_CONNECTION_STRING not defined in the environment!"); + } + + return new ServiceBusClient(connectionString, options); +} + +/** + * Loops infinitely with a delay between invocations. + */ +export async function loopForever( + fn: () => Promise, + delay: number, + abortSignal?: AbortSignalLike +) { + const timeout = () => new Promise((resolve) => setTimeout(() => resolve(true), delay)); + + while (abortSignal?.aborted === false && (await timeout())) { + await fn(); + } +} + +export function isReceiveMode(receiveMode: string): receiveMode is "peekLock" | "receiveAndDelete" { + return receiveMode === "peekLock" || receiveMode === "receiveAndDelete"; +}