Skip to content

Commit

Permalink
[event-hubs] fixes sendBatch race condition causing TypeError to be t…
Browse files Browse the repository at this point in the history
…hrown (Azure#15021)

* [event-hubs] fixes sendBatch race condition causing TypeError to be thrown

* [event-hubs] add changelog entry for 14606 bug fix

* [event-hubs] rename EventHubSender _createLinkIfNotOpen -> _getLink

* [event-hubs] cleanup EventHubSender _init if/else statements

* [event-hubs] centralize setting of this._sender link
  • Loading branch information
chradek authored Apr 26, 2021
1 parent 958f2fa commit b3db04d
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 38 deletions.
3 changes: 3 additions & 0 deletions sdk/eventhub/event-hubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 5.5.1 (Unreleased)

- Fixes issue [#14606](https://github.com/Azure/azure-sdk-for-js/issues/14606) where the `EventHubConsumerClient` could call subscribe's `processError` callback with a "Too much pending tasks" error. This could occur if the consumer was unable to connect to the service for an extended period of time.

- Fixes issue [#15002](https://github.com/Azure/azure-sdk-for-js/issues/15002) where in rare cases an unexpected `TypeError` could be thrown from `EventHubProducerClient.sendBatch` when the connection was disconnected while sending events was in progress.

## 5.5.0 (2021-04-06)

Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ export class EventHubReceiver extends LinkEntity {
// store the underlying link in a cache
this._context.receivers[this.name] = this;

await this._ensureTokenRenewal();
this._ensureTokenRenewal();
} else {
logger.verbose(
"[%s] The receiver '%s' with address '%s' is open -> %s and is connecting " +
Expand Down
68 changes: 32 additions & 36 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ export class EventHubSender extends LinkEntity {
* @returns boolean
*/
isOpen(): boolean {
const result: boolean = this._sender! && this._sender!.isOpen();
const result = Boolean(this._sender && this._sender.isOpen());
logger.verbose(
"[%s] Sender '%s' with address '%s' is open? -> %s",
this._context.connectionId,
Expand All @@ -216,9 +216,9 @@ export class EventHubSender extends LinkEntity {
abortSignal?: AbortSignalLike;
} = {}
): Promise<number> {
await this._createLinkIfNotOpen(options);
const sender = await this._getLink(options);

return this._sender!.maxMessageSize;
return sender.maxMessageSize;
}

/**
Expand Down Expand Up @@ -339,21 +339,20 @@ export class EventHubSender extends LinkEntity {
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
retryOptions.timeoutInMs = timeoutInMs;

const initStartTime = Date.now();
await this._createLinkIfNotOpen(options);
const timeTakenByInit = Date.now() - initStartTime;

const sendEventPromise = async (): Promise<void> => {
const initStartTime = Date.now();
const sender = await this._getLink(options);
const timeTakenByInit = Date.now() - initStartTime;
logger.verbose(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session.outgoing.available()
sender.credit,
sender.session.outgoing.available()
);

let waitTimeForSendable = 1000;
if (!this._sender!.sendable() && timeoutInMs - timeTakenByInit > waitTimeForSendable) {
if (!sender.sendable() && timeoutInMs - timeTakenByInit > waitTimeForSendable) {
logger.verbose(
"%s Sender '%s', waiting for 1 second for sender to become sendable",
this._context.connectionId,
Expand All @@ -366,14 +365,14 @@ export class EventHubSender extends LinkEntity {
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session?.outgoing?.available()
sender.credit,
sender.session?.outgoing?.available()
);
} else {
waitTimeForSendable = 0;
}

if (!this._sender!.sendable()) {
if (!sender.sendable()) {
// let us retry to send the message after some time.
const msg =
`[${this._context.connectionId}] Sender "${this.name}", ` +
Expand Down Expand Up @@ -404,10 +403,9 @@ export class EventHubSender extends LinkEntity {
throw translate(e);
}

this._sender!.sendTimeoutInSeconds =
(timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
sender.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit - waitTimeForSendable) / 1000;
try {
const delivery = await this._sender!.send(rheaMessage, undefined, 0x80013700, {
const delivery = await sender.send(rheaMessage, undefined, 0x80013700, {
abortSignal
});
logger.info(
Expand Down Expand Up @@ -444,22 +442,22 @@ export class EventHubSender extends LinkEntity {
}
}

private async _createLinkIfNotOpen(
private async _getLink(
options: {
retryOptions?: RetryOptions;
abortSignal?: AbortSignalLike;
} = {}
): Promise<void> {
if (this.isOpen()) {
return;
): Promise<AwaitableSender> {
if (this.isOpen() && this._sender) {
return this._sender;
}
const retryOptions = options.retryOptions || {};
const timeoutInMs = getRetryAttemptTimeoutInMs(retryOptions);
retryOptions.timeoutInMs = timeoutInMs;
const senderOptions = this._createSenderOptions(timeoutInMs);

const startTime = Date.now();
const createLinkPromise = async (): Promise<void> => {
const createLinkPromise = async (): Promise<AwaitableSender> => {
return defaultCancellableLock.acquire(
this.senderLock,
() => {
Expand All @@ -475,7 +473,7 @@ export class EventHubSender extends LinkEntity {
);
};

const config: RetryConfig<void> = {
const config: RetryConfig<AwaitableSender> = {
operation: createLinkPromise,
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink,
Expand All @@ -484,7 +482,7 @@ export class EventHubSender extends LinkEntity {
};

try {
await retry<void>(config);
return await retry<AwaitableSender>(config);
} catch (err) {
const translatedError = translate(err);
logger.warning(
Expand All @@ -500,18 +498,17 @@ export class EventHubSender extends LinkEntity {

/**
* Initializes the sender session on the connection.
* Should only be called from _createLinkIfNotOpen
* @hidden
*/
private async _init(
options: AwaitableSenderOptions & {
abortSignal: AbortSignalLike | undefined;
timeoutInMs: number;
}
): Promise<void> {
): Promise<AwaitableSender> {
try {
if (!this.isOpen() && !this.isConnecting) {
this.isConnecting = true;

if (!this.isOpen() || !this._sender) {
// Wait for the connectionContext to be ready to open the link.
await this._context.readyToOpenLink();
await this._negotiateClaim({
Expand All @@ -526,33 +523,32 @@ export class EventHubSender extends LinkEntity {
this.name
);

this._sender = await this._context.connection.createAwaitableSender(options);
this.isConnecting = false;
const sender = await this._context.connection.createAwaitableSender(options);
this._sender = sender;
logger.verbose(
"[%s] Sender '%s' created with sender options: %O",
this._context.connectionId,
this.name,
options
);
this._sender.setMaxListeners(1000);
sender.setMaxListeners(1000);

// It is possible for someone to close the sender and then start it again.
// Thus make sure that the sender is present in the client cache.
if (!this._context.senders[this.name]) this._context.senders[this.name] = this;
await this._ensureTokenRenewal();
this._ensureTokenRenewal();
return sender;
} else {
logger.verbose(
"[%s] The sender '%s' with address '%s' is open -> %s and is connecting " +
"-> %s. Hence not reconnecting.",
"[%s] The sender '%s' with address '%s' is open -> %s. Hence not reconnecting.",
this._context.connectionId,
this.name,
this.address,
this.isOpen(),
this.isConnecting
this.isOpen()
);
return this._sender;
}
} catch (err) {
this.isConnecting = false;
const translatedError = translate(err);
logger.warning(
"[%s] An error occurred while creating the sender %s: %s",
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/linkEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ export class LinkEntity {
* Ensures that the token is renewed within the predefined renewal margin.
* @hidden
*/
protected async _ensureTokenRenewal(): Promise<void> {
protected _ensureTokenRenewal(): void {
if (!this._tokenTimeoutInMs) {
return;
}
Expand Down
65 changes: 65 additions & 0 deletions sdk/eventhub/event-hubs/test/internal/node/disconnect.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import chai from "chai";
const should = chai.should();
import chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import { EnvVarKeys, getEnvVars } from "../../public/utils/testUtils";
import { EventHubSender } from "../../../src/eventHubSender";
import { createConnectionContext } from "../../../src/connectionContext";
import { stub } from "sinon";
import { MessagingError } from "@azure/core-amqp";
const env = getEnvVars();

describe("disconnected", function() {
const service = {
connectionString: env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
path: env[EnvVarKeys.EVENTHUB_NAME]
};
before("validate environment", function(): void {
should.exist(
env[EnvVarKeys.EVENTHUB_CONNECTION_STRING],
"define EVENTHUB_CONNECTION_STRING in your environment before running integration tests."
);
should.exist(
env[EnvVarKeys.EVENTHUB_NAME],
"define EVENTHUB_NAME in your environment before running integration tests."
);
});

describe("EventHubSender", function() {
/**
* Test added for issue https://github.com/Azure/azure-sdk-for-js/issues/15002
* Prior to fixing this issue, a TypeError would be thrown when this test was ran.
*/
it("send works after disconnect", async () => {
const context = createConnectionContext(service.connectionString, service.path);
const sender = EventHubSender.create(context);

// Create the sender link via getMaxMessageSize() so we can check when 'send' is about to be called on it.
await sender.getMaxMessageSize();
should.equal(sender.isOpen(), true, "Expected sender to be open.");

// Here we stub out the 'send' call on the AwaitableSender.
// We do 2 things:
// 1. Call `idle()` on the underlying rhea connection so that a disconnect is triggered.
// 2. Reject with a MessagingError.
// The MessagingError is thrown so that the send operation will be retried.
// The disconnect that's triggered will cause the existing AwaitableSender to be closed.

// If everything works as expected, then a new AwaitableSender should be created on the next
// retry attempt and the event should be successfully sent.
const senderLink = sender["_sender"]!;
const sendStub = stub(senderLink, "send");
sendStub.callsFake(async () => {
context.connection["_connection"].idle();
throw new MessagingError("Fake rejection!");
});

await sender.send([{ body: "foo" }]);

await context.close();
});
});
});

0 comments on commit b3db04d

Please sign in to comment.