Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Update send operation to include initialization #4319

Merged
merged 27 commits into from
Jul 22, 2019
Merged
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
260b0e6
[Event Hubs] Introduce timeoutInMs on RetryOptions (#4239)
ramya0820 Jul 11, 2019
c74f5e4
Merge branch 'master' of https://github.com/ramya0820/azure-sdk-for-js
Jul 13, 2019
00016cb
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-js
ramya0820 Jul 15, 2019
4916ce3
Move init() to within sendOperation()
ramya0820 Jul 15, 2019
af5d8c4
Update operation timeout error in send() to throw the newly defined O…
ramya0820 Jul 15, 2019
a66272a
Update log message
ramya0820 Jul 15, 2019
a74824d
Fix operation timeout handling
ramya0820 Jul 15, 2019
1607ddd
Merge from master
ramya0820 Jul 16, 2019
0631ceb
Fix timer clearance
ramya0820 Jul 16, 2019
60fa5ae
Simplify promise handling using async await
ramya0820 Jul 16, 2019
3168339
Fix few merge errors
ramya0820 Jul 16, 2019
b449cef
Simplify changes
ramya0820 Jul 16, 2019
e0c8e92
Undo prettier changes, fix merge
ramya0820 Jul 16, 2019
85c2ffc
Nit: whitespaces
ramya0820 Jul 16, 2019
1212ae7
Fix code move around and error name
ramya0820 Jul 16, 2019
f28c566
Fix timer handling
ramya0820 Jul 17, 2019
66127ee
Refactor to simplify callback definitions
ramya0820 Jul 17, 2019
0e54d61
Remove jitter
ramya0820 Jul 17, 2019
3ec9c9d
Update try catch scope
ramya0820 Jul 17, 2019
d9d38a3
Fix timer clearance
ramya0820 Jul 17, 2019
f85e112
Remove comment
ramya0820 Jul 17, 2019
f5b54eb
Return reject
ramya0820 Jul 17, 2019
ddb6195
Fix build error
ramya0820 Jul 17, 2019
6e6ef92
Merge branch 'master' into issue-2835-p1
ramya0820 Jul 18, 2019
eaaf17e
Revert refactoring
ramya0820 Jul 19, 2019
78c0b4a
Merge branch 'issue-2835-p1' of https://github.com/ramya0820/azure-sd…
ramya0820 Jul 19, 2019
f6e7271
Handle abort around init
ramya0820 Jul 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 124 additions & 119 deletions sdk/eventhub/event-hubs/src/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ export class EventHubSender extends LinkEntity {

if (events instanceof EventDataBatch && options && options.partitionKey) {
// throw an error if partition key is different than the one provided in the options.
const error = new Error("Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead.");
const error = new Error(
"Partition key is not supported when sending a batch message. Pass the partition key when creating the batch message instead."
);
log.error(
"[%s] Partition key is not supported when using createBatch(). %O",
this._context.connectionId,
Expand All @@ -412,16 +414,6 @@ export class EventHubSender extends LinkEntity {
throw error;
}

if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
}
log.sender(
"[%s] Sender '%s', trying to send EventData[].",
this._context.connectionId,
Expand Down Expand Up @@ -508,24 +500,139 @@ export class EventHubSender extends LinkEntity {
message: AmqpMessage | Buffer,
options: SendOptions & EventHubProducerOptions = {}
): Promise<void> {

const abortSignal: AbortSignalLike | undefined = options.abortSignal;
const sendEventPromise = () =>
new Promise<void>((resolve, reject) => {
new Promise<void>(async (resolve, reject) => {
const rejectOnAbort = () => {
const desc: string =
`[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` +
`address "${this.address}" has been cancelled by the user.`;
`[${this._context.connectionId}] The send operation on the Sender "${
this.name
}" with ` + `address "${this.address}" has been cancelled by the user.`;
log.error(desc);
reject(new AbortError("The send operation has been cancelled by the user."));
};

const onRejected: Func<EventContext, void> = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event rejected.", this._context.connectionId, this.name);
const err = translate(context!.delivery!.remote_state!.error);
log.error(err);
reject(err);
};

const onAccepted: Func<EventContext, void> = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', got event accepted.",
this._context.connectionId,
this.name
);
resolve();
};

const onAborted = () => {
removeListeners();
rejectOnAbort();
};

const onReleased = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event released.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};

const onModified = (context: EventContext) => {
removeListeners();
log.error("[%s] Sender '%s', got event modified.", this._context.connectionId, this.name);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};

const removeListeners = (): void => {
clearTimeout(waitTimer);
// When `removeListeners` is called on timeout, the sender might be closed and cleared
// So, check if it exists, before removing listeners from it.
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" with ` +
`address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: Error = {
name: "OperationTimeoutError",
message: desc
};
return reject(translate(e));
};

const waitTimer = setTimeout(
actionAfterTimeout,
getRetryAttemptTimeoutInMs(options.retryOptions)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the re-arrangement of the timer, if if (abortSignal && abortSignal.aborted) below returns true, then we end up returning a rejected promise (which is right) without clearing the timer.

Please consider keeping the previous order of the callbacks/helper-functions and only adding the link creation part at the right place

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering in the recent commit of eaaf17e is better, but still needs work.

In the current setup, if the abort signal is fired when the async process of init() is in progress, then the promise is being rejected only after init() is complete. This is because init() is being called before the event handler for abort event was added.

Please make the below changes

  • Add the event handler for abort before calling init() and before starting the timer
  • Remove the event handler for abort before rejecting the promise when init() fails

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#4322 (comment) thread specifically clarifies about abort during init().

This scenario if we want to address would then need to apply for the managementRequest as well correct? Or did we specifically exclude this because of complexity involved? (In not having access to removeListeners() on sender link from the SDK?) It just feels like UX is not consistent in both cases.

Okay to implement the optimization and new corner cases from this.

Copy link
Contributor

@ramya-rao-a ramya-rao-a Jul 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scenario applies to managementRequest as well i.e. the scenario of abort signal being fired when the async process of init() is in progress.

I have updated the comment thread for managementRequest. Please see https://github.com/Azure/azure-sdk-for-js/pull/4322/files#r305599979


if (abortSignal && abortSignal.aborted) {
ramya0820 marked this conversation as resolved.
Show resolved Hide resolved
// operation has been cancelled, so exit quickly
return rejectOnAbort();
}

let waitTimer: any;
if (!this.isOpen()) {
log.sender(
"Acquiring lock %s for initializing the session, sender and " +
"possibly the connection.",
this.senderLock
);

try {
await defaultLock.acquire(this.senderLock, () => {
return this._init();
});
} catch (err) {
clearTimeout(waitTimer);
err = translate(err);
log.error(
"[%s] An error occurred while creating the sender %s",
this._context.connectionId,
this.name,
err
);
return reject(err);
}
}

log.sender(
"[%s] Sender '%s', credit: %d available: %d",
this._context.connectionId,
Expand All @@ -539,106 +646,6 @@ export class EventHubSender extends LinkEntity {
this._context.connectionId,
this.name
);
let onRejected: Func<EventContext, void>;
let onReleased: Func<EventContext, void>;
let onModified: Func<EventContext, void>;
let onAccepted: Func<EventContext, void>;
let onAborted: () => void;

const removeListeners = (): void => {
clearTimeout(waitTimer);
// When `removeListeners` is called on timeout, the sender might be closed and cleared
// So, check if it exists, before removing listeners from it.
if (abortSignal) {
abortSignal.removeEventListener("abort", onAborted);
}
if (this._sender) {
this._sender.removeListener(SenderEvents.rejected, onRejected);
this._sender.removeListener(SenderEvents.accepted, onAccepted);
this._sender.removeListener(SenderEvents.released, onReleased);
this._sender.removeListener(SenderEvents.modified, onModified);
}
};

onAborted = () => {
removeListeners();
rejectOnAbort();
};
onAccepted = (context: EventContext) => {
// Since we will be adding listener for accepted and rejected event every time
// we send a message, we need to remove listener for both the events.
// This will ensure duplicate listeners are not added for the same event.
removeListeners();
log.sender(
"[%s] Sender '%s', got event accepted.",
this._context.connectionId,
this.name
);
resolve();
};
onRejected = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event rejected.",
this._context.connectionId,
this.name
);
const err = translate(context!.delivery!.remote_state!.error);
log.error(err);
reject(err);
};
onReleased = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event released.",
this._context.connectionId,
this.name
);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender '${this.name}', ` +
`received a release disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};
onModified = (context: EventContext) => {
removeListeners();
log.error(
"[%s] Sender '%s', got event modified.",
this._context.connectionId,
this.name
);
let err: Error;
if (context!.delivery!.remote_state!.error) {
err = translate(context!.delivery!.remote_state!.error);
} else {
err = new Error(
`[${this._context.connectionId}] Sender "${this.name}", ` +
`received a modified disposition.Hence we are rejecting the promise.`
);
}
log.error(err);
reject(err);
};

const actionAfterTimeout = () => {
removeListeners();
const desc: string =
`[${this._context.connectionId}] Sender "${this.name}" with ` +
`address "${this.address}", was not able to send the message right now, due ` +
`to operation timeout.`;
log.error(desc);
const e: AmqpError = {
condition: ErrorNameConditionMapper.ServiceUnavailableError,
description: desc
};
return reject(translate(e));
};

if (abortSignal) {
abortSignal.addEventListener("abort", onAborted);
Expand All @@ -647,7 +654,6 @@ export class EventHubSender extends LinkEntity {
this._sender!.on(SenderEvents.rejected, onRejected);
this._sender!.on(SenderEvents.modified, onModified);
this._sender!.on(SenderEvents.released, onReleased);
waitTimer = setTimeout(actionAfterTimeout, getRetryAttemptTimeoutInMs(options.retryOptions));
const delivery = this._sender!.send(message, undefined, 0x80013700);
log.sender(
"[%s] Sender '%s', sent message with delivery id: %d",
Expand All @@ -669,7 +675,6 @@ export class EventHubSender extends LinkEntity {
}
});

const jitterInSeconds = randomNumberFromInterval(1, 4);
const maxRetries = options.retryOptions && options.retryOptions.maxRetries;
const delayInSeconds =
options.retryOptions &&
Expand All @@ -682,7 +687,7 @@ export class EventHubSender extends LinkEntity {
connectionId: this._context.connectionId,
operationType: RetryOperationType.sendMessage,
maxRetries: maxRetries,
delayInSeconds: delayInSeconds + jitterInSeconds
delayInSeconds: delayInSeconds
};
return retry<void>(config);
}
Expand Down