-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): add TowerService::on_session_close
#1284
Conversation
TowerServiceBuilder::build_and_notify_on_session_close
TowerServiceBuilder::build_and_notify_on_session_close
TowerService::on_session_close
pub(crate) fn session_close() -> (SessionClose, SessionClosedFuture) { | ||
// SessionClosedFuture is closed after one message has been recevied | ||
// and max one message is handled then it's closed. | ||
let (tx, rx) = tokio::sync::broadcast::channel(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tokio's API is a bit strange I feel here; you can only create a pair, but then you can also call subscribe()
on a tx to get another rx!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yepp, I think the implementation is quite complicated/clever as the Receiver is not clone.
I think it just clone the message(s) to other receivers which is probably quite nice to avoid having a separate state for each receiver
n.closed() | ||
} else { | ||
let (session_close, fut) = session_close(); | ||
self.rpc_middleware.on_session_close = Some(session_close); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any likelihood of a race where the session is closing already or something, and only then you cann on_session_closed
and then get back a future that's never called or something?
I don't think it really matters though because why would you subscribe to this so late on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some extra docs on it, it's possible.
Also it's a little bit weird that TowerService::call
can be used several times, in practice I don't think that's the case but a footgun.
That on_session_close
on works the subsequent TowerService::call
as it does Option::take
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM I think! Is there a way to test this?
server/src/future.rs
Outdated
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
match self.0.poll_next_unpin(cx) { | ||
Poll::Pending => Poll::Pending, | ||
// A message is only sent when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: when the connection is closed
|
||
impl SessionClose { | ||
pub(crate) fn close(self) { | ||
let _ = self.0.send(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could this be implemented also on Drop
? And if we already called close()
we'd do nothing on drop, otherwise, we'll call self.0.send()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skipped it because I'm scared of that as the tower stuff requires Clone and I'm not sure whether something is dropped at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks!
This PR adds another builder option to the
TowerServiceBuilder
get subscribe to when the connection/session has closed because whenTowerService::call
resolves it means that HTTP call has been finished but doesn't mean that WebSocket connection has closed.Thus, to know that we need another future sadly but worth the effort because otherwise the only way for folks it use the low-level API.
Resolves #1264