Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Update send operation to include initialization #4319

Merged
merged 27 commits into from
Jul 22, 2019
Merged
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
260b0e6
[Event Hubs] Introduce timeoutInMs on RetryOptions (#4239)
ramya0820 Jul 11, 2019
c74f5e4
Merge branch 'master' of https://github.com/ramya0820/azure-sdk-for-js
Jul 13, 2019
00016cb
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 15, 2019
4916ce3
Move init() to within sendOperation()
ramya0820 Jul 15, 2019
af5d8c4
Update operation timeout error in send() to throw the newly defined O…
ramya0820 Jul 15, 2019
a66272a
Update log message
ramya0820 Jul 15, 2019
a74824d
Fix operation timeout handling
ramya0820 Jul 15, 2019
1607ddd
Merge from master
ramya0820 Jul 16, 2019
0631ceb
Fix timer clearance
ramya0820 Jul 16, 2019
60fa5ae
Simplify promise handling using async await
ramya0820 Jul 16, 2019
3168339
Fix few merge errors
ramya0820 Jul 16, 2019
b449cef
Simplify changes
ramya0820 Jul 16, 2019
e0c8e92
Undo prettier changes, fix merge
ramya0820 Jul 16, 2019
85c2ffc
Nit: whitespaces
ramya0820 Jul 16, 2019
1212ae7
Fix code move around and error name
ramya0820 Jul 16, 2019
f28c566
Fix timer handling
ramya0820 Jul 17, 2019
66127ee
Refactor to simplify callback definitions
ramya0820 Jul 17, 2019
0e54d61
Remove jitter
ramya0820 Jul 17, 2019
3ec9c9d
Update try catch scope
ramya0820 Jul 17, 2019
d9d38a3
Fix timer clearance
ramya0820 Jul 17, 2019
f85e112
Remove comment
ramya0820 Jul 17, 2019
f5b54eb
Return reject
ramya0820 Jul 17, 2019
ddb6195
Fix build error
ramya0820 Jul 17, 2019
6e6ef92
Merge branch 'master' into issue-2835-p1
ramya0820 Jul 18, 2019
eaaf17e
Revert refactoring
ramya0820 Jul 19, 2019
78c0b4a
Merge branch 'issue-2835-p1' of https://github.com/ramya0820/azure-sd…
ramya0820 Jul 19, 2019
f6e7271
Handle abort around init
ramya0820 Jul 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 138 additions & 125 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import {
ErrorNameConditionMapper,
RetryConfig,
RetryOperationType,
Constants,
randomNumberFromInterval
Constants
} from "@azure/core-amqp";
import { EventData, toAmqpMessage } from "./eventData";
import { ConnectionContext } from "./connectionContext";
Expand Down Expand Up @@ -414,16 +413,6 @@ export class EventHubSender extends LinkEntity {
throw error;
}

if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
}
log.sender(
"[%s] Sender '%s', trying to send EventData[].",
this._context.connectionId,
Expand Down Expand Up @@ -512,146 +501,171 @@ export class EventHubSender extends LinkEntity {
): Promise<void> {
const abortSignal: AbortSignalLike | undefined = options.abortSignal;
const sendEventPromise = () =>
new Promise<void>((resolve, reject) => {
new Promise<void>(async (resolve, reject) => {
let waitTimer: any;

let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
let onAccepted: Func<EventContext, void>;
let onAborted: () => void;

const rejectOnAbort = () => {
const desc: string =
`[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` +
`address "${this.address}" has been cancelled by the user.`;
`[${this._context.connectionId}] The send operation on the Sender "${
this.name
}" with ` + `address "${this.address}" has been cancelled by the user.`;
log.error(desc);
reject(new AbortError("The send operation has been cancelled by the user."));
return reject(new AbortError("The send operation has been cancelled by the user."));
};

if (abortSignal && abortSignal.aborted) {
// operation has been cancelled, so exit quickly
return rejectOnAbort();
}

let waitTimer: any;
log.sender(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session.outgoing.available()
);
if (this._sender!.sendable()) {
onAborted = () => {
ramya0820 marked this conversation as resolved.
Show resolved Hide resolved
removeListeners();
rejectOnAbort();
};

onAccepted = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', sending message with id '%s'.",
"[%s] Sender '%s', got event accepted.",
this._context.connectionId,
this.name
);
let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
let onAccepted: Func<EventContext, void>;
let onAborted: () => void;
resolve();
};

const removeListeners = (): void => {
clearTimeout(waitTimer);
// When `removeListeners` is called on timeout, the sender might be closed and cleared
// So, check if it exists, before removing listeners from it.
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};
onRejected = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name);
const err = translate(context!.delivery!.remote_state!.error);
log.error(err);
reject(err);
};

onAborted = () => {
removeListeners();
rejectOnAbort();
};
onAccepted = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', got event accepted.",
this._context.connectionId,
this.name
onReleased = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
resolve();
};
onRejected = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event rejected.",
this._context.connectionId,
this.name
}
log.error(err);
reject(err);
};

onModified = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
const err = translate(context!.delivery!.remote_state!.error);
log.error(err);
reject(err);
}
log.error(err);
reject(err);
};

const removeListeners = (): void => {
clearTimeout(waitTimer);
// When `removeListeners` is called on timeout, the sender might be closed and cleared
// So, check if it exists, before removing listeners from it.
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" with ` +
`address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
onReleased = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event released.",
this._context.connectionId,
this.name
);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
return reject(translate(e));
};

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}

waitTimer = setTimeout(
ramya0820 marked this conversation as resolved.
Show resolved Hide resolved
actionAfterTimeout,
getRetryAttemptTimeoutInMs(options.retryOptions)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the re-arrangement of the timer, if if (abortSignal && abortSignal.aborted) below returns true, then we end up returning a rejected promise (which is right) without clearing the timer.

Please consider keeping the previous order of the callbacks/helper-functions and only adding the link creation part at the right place

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering in the recent commit of eaaf17e is better, but still needs work.

In the current setup, if the abort signal is fired when the async process of init() is in progress, then the promise is being rejected only after init() is complete. This is because init() is being called before the event handler for abort event was added.

Please make the below changes

  • Add the event handler for abort before calling init() and before starting the timer
  • Remove the event handler for abort before rejecting the promise when init() fails

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4322 (comment) thread specifically clarifies about abort during init().

This scenario if we want to address would then need to apply for the managementRequest as well correct? Or did we specifically exclude this because of complexity involved? (In not having access to removeListeners() on sender link from the SDK?) It just feels like UX is not consistent in both cases.

Okay to implement the optimization and new corner cases from this.

Copy link
Contributor

@ramya-rao-a ramya-rao-a Jul 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scenario applies to managementRequest as well i.e. the scenario of abort signal being fired when the async process of init() is in progress.

I have updated the comment thread for managementRequest. Please see https://github.com/Azure/azure-sdk-for-js/pull/4322/files#r305599979


if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);

try {
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
} catch (err) {
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
log.error(err);
reject(err);
};
onModified = (context: EventContext) => {
removeListeners();
clearTimeout(waitTimer);
err = translate(err);
log.error(
"[%s] Sender '%s', got event modified.",
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name
this.name,
err
);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};
return reject(err);
}
}

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" with ` +
`address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
};
log.sender(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
this.name,
this._sender!.credit,
this._sender!.session.outgoing.available()
);
if (this._sender!.sendable()) {
log.sender(
"[%s] Sender '%s', sending message with id '%s'.",
this._context.connectionId,
this.name
);

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
}
this._sender!.on(SenderEvents.accepted, onAccepted);
this._sender!.on(SenderEvents.rejected, onRejected);
this._sender!.on(SenderEvents.modified, onModified);
this._sender!.on(SenderEvents.released, onReleased);
waitTimer = setTimeout(
actionAfterTimeout,
getRetryAttemptTimeoutInMs(options.retryOptions)
);

const delivery = this._sender!.send(message, undefined, 0x80013700);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
Expand All @@ -673,7 +687,6 @@ export class EventHubSender extends LinkEntity {
}
});

const jitterInSeconds = randomNumberFromInterval(1, 4);
const maxRetries = options.retryOptions && options.retryOptions.maxRetries;
const delayInSeconds =
options.retryOptions &&
Expand All @@ -686,7 +699,7 @@ export class EventHubSender extends LinkEntity {
connectionId: this._context.connectionId,
operationType: RetryOperationType.sendMessage,
maxRetries: maxRetries,
delayInSeconds: delayInSeconds + jitterInSeconds
delayInSeconds: delayInSeconds
};
return retry<void>(config);
}
Expand Down