-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add cancellation support to all async operations #66
Conversation
1216742
to
d53dab8
Compare
a39d6b7
to
a01480c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an exciting change!
Just have some comments, assume we're waiting on a final review from @chradek.
@@ -187,12 +192,48 @@ export class AwaitableSender extends BaseSender { | |||
return reject(new OperationTimeoutError(message)); | |||
}, this.sendTimeoutInSeconds * 1000); | |||
|
|||
const onAbort = () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put this into a function rather than inlining it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you thinking of a function that accepts delivery
as an argument, and then still in-lining, something like:
const onAbort = () => {
this.abort(delivery);
}
@@ -540,24 +628,27 @@ export class Connection extends Entity { | |||
* | |||
* @return Promise<AwaitableSender>. | |||
*/ | |||
async createAwaitableSender(options?: AwaitableSenderOptionsWithSession): Promise<AwaitableSender> { | |||
async createAwaitableSender(options?: AwaitableSenderOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<AwaitableSender> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't abortSignal
just in the AwaitableSenderOptionsWithSession
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AwaitableSenderOptionsWithSession
extends AwaitableSenderOptions
which extends BaseSenderOptions
SenderOptionsWithSession
extends SenderOptions
which extends BaseSenderOptions
.
ReceiverOptionsWithSession
extends ReceiverOptions
So, the right place to add the abortSignal would have been BaseSenderOptions
and ReceiverOptions
But then, createRequestResponseLink()
takes both SenderOptions
and ReceiverOptions
. Both options supporting abortSignal
would be not be desirable for this method. So, I could either do the Pick
/Omit
magic, or keep abortSignal
separate as done here. I chose the latter.
* @param {ReceiverOptionsWithSession} options Optional parameters to create a receiver link. | ||
* @return {Promise<Receiver>} Promise<Receiver>. | ||
*/ | ||
async createReceiver(options?: ReceiverOptionsWithSession): Promise<Receiver> { | ||
async createReceiver(options?: ReceiverOptionsWithSession & { abortSignal?: AbortSignalLike; }): Promise<Receiver> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't abortSignal
just in the ReceiverOptionsWithSession
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding cancellation support! My biggest question is if we're properly handling cancellation when the service responds with an error when we open/close.
It seems like if we remove the Error
handler from whatever we're cancelling and the service does respond with an error (e.g. ConnectionError is response to ConnectionOpen or ConnectionClose), we could end up with an unhandled exception unless rhea
allows the Error
events to be fired without any listeners attached.
@@ -187,12 +192,48 @@ export class AwaitableSender extends BaseSender { | |||
return reject(new OperationTimeoutError(message)); | |||
}, this.sendTimeoutInSeconds * 1000); | |||
|
|||
const onAbort = () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you thinking of a function that accepts delivery
as an argument, and then still in-lining, something like:
const onAbort = () => {
this.abort(delivery);
}
@@ -366,6 +413,14 @@ export class Connection extends Entity { | |||
log.error("[%s] Connection got disconnected while closing itself: %O.", this.id, error); | |||
}; | |||
|
|||
onAbort = () => { | |||
removeListeners(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious here as well what happens if the server does close the connection with an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the way rhea is set up is to fire events and not throw errors. If there are listeners, they respond, else the fired event gets ignored. There is no unhandled exceptions
I looked into why the CI is failing for this PR. It looks like the new tests don't set I didn't dig into exactly which tests were still allowing reconnects to occur, maybe there are some that don't explicitly close the connection so rhea attempts to reconnect. |
yay! Thanks for the hint @chradek! |
lib/connection.ts
Outdated
return new Promise((resolve, reject) => { | ||
if (!this.isOpen()) { | ||
|
||
let onOpen: Func<RheaEventContext, void>; | ||
let onClose: Func<RheaEventContext, void>; | ||
let onAbort: Func<void, void>; | ||
const abortSignal = options?.abortSignal; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this must work or else CI wouldn't run but the package.json says we're using TS 3.5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's because 3.5.1 is the minimum version 😄 But that does remind me that before a new version is published, we should evaluate if down-leveled types are needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right! :)
It's a dev dependency anyways - so should we bump it up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed optional chaining in c12aeed to avoid rocking the boat regarding TS versions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concerns were addressed, looks good to me!
Please make sure to look at #67 before publishing a new release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Someday we'll upgrade our TS version. :)
* @returns {Promise<Delivery>} Promise<Delivery> The delivery information about the sent message. | ||
*/ | ||
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, timeoutInSeconds?: number): Promise<Delivery> { | ||
send(msg: Message | Buffer, tag?: Buffer | string, format?: number, options?: AwaitableSendOptions): Promise<Delivery> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not considered a breaking change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeoutInSeconds was introduced in #53 which has not been released yet. So, no breaking changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Go go go!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Please add changelog |
98b84fb
to
5f0266e
Compare
Description
Brief description of the changes made in the PR. This helps in making better changelog
Few design discussions:
AbortSignalLike
interface is redefined in this PR instead of relying on an Azure scoped package like@azure/abort-controller
which is used in the testsAbortError
class due to potential conflicts with upstream usage. Instead, using same oldError
class with custom name. This is definitely open for debate as to how we should approach this.Reference to any github issues