-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(ws server): close all subscription when the connection is closed #725
fix(ws server): close all subscription when the connection is closed #725
Conversation
let mut module = RpcModule::new(tx); | ||
|
||
module | ||
.register_subscription("subscribe_never_produce", "n", "unsubscribe_never_produce", |_, sink, mut tx| { | ||
// create stream that doesn't produce items. | ||
let stream = futures::stream::empty::<usize>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test was flaky I don't know what I was thinking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further, this could complete for notify_one
successfully if the receiver
from the connection is closed once the a new item is produced on the stream and is tried to be sent
to the subscriber.
so really tricky test to but I added sleep for one hour so should be "okeyisch" really tricky to test this.
// when the connection closes to be on safe side. | ||
close_notify_server_stop.notify_one(); | ||
// Notify all listeners and close down associated tasks. | ||
close_notify_server_stop.notify_waiters(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh okay makes sense. So listeners were being launched from pipe_from_try_stream
, and in this case because it was notify_one
only a single task was stopped.
Good catch 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, basically every connection can have n number of subscriptions and we want to cancel them all when the connection is killed.
the subscriptions will/may be canceled as soon some message is sent via them but that might take a while so we want to do it as soon as the connection is closed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, basically every connection can have n number of subscriptions and we want to cancel them all when the connection is killed.
When I wrote this code I went back and forth a bunch on this and in the end I landed on "connections have a single subscription", but that seems a bit dumb now, not sure what I was thinking. :/ Good catch!
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); | ||
assert_eq!(Some(()), rx.next().await, "subscription stream should be terminated after the client was dropped"); | ||
|
||
let rx_len = rx.take(10).fold(0, |acc, _| async move { acc + 1 }).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a chance that running drop(client)
; right after the subscribing might mean that we don't get all 10 items back from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it could a few milliseconds to send the WS close message but so not really is my understanding rx
here is another channel where tx
is encapsulated in the RpcModule
so it's not the subscription stream
Then after connection is closed the actual subscriptions should be terminated using Notify::notify_waiters
after that this messages and sent to channel above...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from what I understand (and well done for finding this!!)! I assume we need notify_waiters because there is more than one thing that might be waiting on that to resolve in order to properly clean up (and it def seems like the "safe" option regardless)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, good catch on the notifications.
In terms of the integration tests, I see how it's difficult to test. Will keep this one in the back on my mind.
No description provided.