From ab19f5c79783df88dea25208215692ab3cb8257b Mon Sep 17 00:00:00 2001 From: chradek Date: Thu, 11 Jun 2020 14:16:46 -0700 Subject: [PATCH] [event-hubs] fix subscription deadlock --- sdk/eventhub/event-hubs/CHANGELOG.md | 3 + sdk/eventhub/event-hubs/src/eventProcessor.ts | 19 +++++- .../test/eventHubConsumerClient.spec.ts | 59 +++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/event-hubs/CHANGELOG.md b/sdk/eventhub/event-hubs/CHANGELOG.md index e6254885c6a4..d9a75a77bc32 100644 --- a/sdk/eventhub/event-hubs/CHANGELOG.md +++ b/sdk/eventhub/event-hubs/CHANGELOG.md @@ -2,6 +2,9 @@ ## 5.2.2 (Unreleased) +- Fixes issue [#9289](https://github.com/Azure/azure-sdk-for-js/issues/9289) + where calling `await subscription.close()` inside of a subscription's `processError` + handler would cause the subscription to deadlock. ## 5.2.1 (2020-06-08) diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index b17fdc9cf5c1..e27a63f6db50 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -386,6 +386,18 @@ export class EventProcessor { loadBalancer: PartitionLoadBalancer, abortSignal: AbortSignalLike ): Promise { + let cancelLoopResolver; + // This provides a mechanism for exiting the loop early + // if the subscription has had `close` called. + const cancelLoopPromise = new Promise((resolve) => { + cancelLoopResolver = resolve; + if (abortSignal.aborted) { + return resolve(); + } + + abortSignal.addEventListener("abort", resolve); + }); + // periodically check if there is any partition not being processed and process it while (!abortSignal.aborted) { try { @@ -444,7 +456,8 @@ export class EventProcessor { } catch (err) { logger.warning(`[${this._id}] An error occured within the EventProcessor loop: ${err}`); logErrorStackTrace(err); - await this._handleSubscriptionError(err); + // Protect against the scenario where the user awaits on subscription.close() from inside processError. + await Promise.race([this._handleSubscriptionError(err), cancelLoopPromise]); } finally { // sleep for some time, then continue the loop again. logger.verbose( @@ -454,6 +467,10 @@ export class EventProcessor { await delayWithoutThrow(this._loopIntervalInMs, abortSignal); } } + + if (cancelLoopResolver) { + abortSignal.removeEventListener("abort", cancelLoopResolver); + } this._isRunning = false; } diff --git a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts index d7cad7f50b42..ac9771f46e92 100644 --- a/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts +++ b/sdk/eventhub/event-hubs/test/eventHubConsumerClient.spec.ts @@ -681,5 +681,64 @@ describe("EventHubConsumerClient", () => { "processClose was not called the same number of times as processInitialize." ); }); + + describe("processError", function(): void { + it("supports awaiting subscription.close on non partition-specific errors", async function(): Promise< + void + > { + // Use an invalid Event Hub name to trigger a non partition-specific error. + const client = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + "Fake-Hub" + ); + + let subscription: Subscription; + const caughtErr: Error = await new Promise((resolve) => { + subscription = client.subscribe({ + processEvents: async () => {}, + processError: async (err, context) => { + if (!context.partitionId) { + await subscription.close(); + resolve(err); + } + } + }); + }); + + should.exist(caughtErr); + + await client.close(); + }); + + it("supports awaiting subscription.close on partition-specific errors", async function(): Promise< + void + > { + // Use an invalid Event Hub name to trigger a non partition-specific error. + const client = new EventHubConsumerClient( + EventHubConsumerClient.defaultConsumerGroupName, + service.connectionString, + service.path + ); + + let subscription: Subscription; + const caughtErr: Error = await new Promise((resolve) => { + // Subscribe to an invalid partition id to trigger a partition-specific error. + subscription = client.subscribe("-1", { + processEvents: async () => {}, + processError: async (err, context) => { + if (context.partitionId) { + await subscription.close(); + resolve(err); + } + } + }); + }); + + should.exist(caughtErr); + + await client.close(); + }); + }); }); });