From 77065a8f082949c8135d49518c7059f0a3a0e209 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Fri, 6 Oct 2023 15:18:29 -0400 Subject: [PATCH] Add some comments to `FutureQueue` and the `subscribe` route (#378) --- crates/client-api/src/routes/subscribe.rs | 30 +++++++- crates/core/src/util/future_queue.rs | 87 ++++++++++++++++------- 2 files changed, 89 insertions(+), 28 deletions(-) diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 9d614b8481..ffc4a28d7f 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -169,8 +169,12 @@ const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60); async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut sendrx: mpsc::Receiver) { let mut liveness_check_interval = tokio::time::interval(LIVELINESS_TIMEOUT); let mut got_pong = true; + + // Build a queue of incoming messages to handle, + // to be processed one at a time, in the order they're received. // TODO: do we want this to have a fixed capacity? or should it be unbounded let mut handle_queue = pin!(future_queue(|message| client.handle_message(message))); + let mut closed = false; loop { enum Item { @@ -180,7 +184,13 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut let message = tokio::select! { // NOTE: all of the futures for these branches **must** be cancel safe. do not // change this if you don't know what that means. + + // If we have a result from handling a past message to report, + // grab it to handle in the next `match`. Some(res) = handle_queue.next() => Item::HandleResult(res), + + // If we've received an incoming message, + // grab it to handle in the next `match`. message = ws.next() => match message { Some(Ok(m)) => Item::Message(ClientMessage::from_message(m)), Some(Err(error)) => { @@ -190,6 +200,9 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut // the client sent us a close frame None => break, }, + + // If we have an outgoing message to send, send it off. + // No incoming `message` to handle, so `continue`. Some(message) = sendrx.recv() => { if closed { // TODO: this isn't great. when we receive a close request from the peer, @@ -204,6 +217,8 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut } continue; } + + // If the module has exited, close the websocket. () = client.module.exited(), if !closed => { if let Err(e) = ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })).await { log::warn!("error closing: {e:#}") @@ -211,7 +226,10 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut closed = true; continue; } + + // If it's time to send a ping... _ = liveness_check_interval.tick() => { + // If we received a pong at some point, send a fresh ping. if mem::take(&mut got_pong) { if let Err(e) = ws.send(WsMessage::Ping(Vec::new())).await { log::warn!("error sending ping: {e:#}"); @@ -224,6 +242,13 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut } } }; + + // Handle the incoming message we grabbed in the previous `select!`. + + // TODO: Data flow appears to not require `enum Item` or this distinct `match`, + // since `Item::HandleResult` comes from exactly one `select!` branch, + // and `Item::Message` comes from exactly one distinct `select!` branch. + // Consider merging this `match` with the previous `select!`. match message { Item::Message(ClientMessage::Message(message)) => handle_queue.as_mut().push(message), Item::HandleResult(res) => { @@ -250,6 +275,7 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut } Item::Message(ClientMessage::Ping(_message)) => { log::trace!("Received ping from client {}", client.id); + // TODO: should we respond with a `Pong`? } Item::Message(ClientMessage::Pong(_message)) => { log::trace!("Received heartbeat from client {}", client.id); @@ -274,7 +300,9 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut log::debug!("Client connection ended"); sendrx.close(); - // clear the queue before we go on to clean up, or else we can get a situation like: + // Clear the incoming message queue before we go to clean up. + // Otherwise, we can be left with a stale future which never gets awaited, + // which can lead to bugs like: // https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/aws_engineer/solving_a_deadlock.html handle_queue.clear(); diff --git a/crates/core/src/util/future_queue.rs b/crates/core/src/util/future_queue.rs index 247b72a53b..908673be21 100644 --- a/crates/core/src/util/future_queue.rs +++ b/crates/core/src/util/future_queue.rs @@ -8,47 +8,80 @@ use std::pin::Pin; use std::task::{self, Poll}; pin_project! { - pub struct FutureQueue { - queue: VecDeque, - f: F, + /// A FIFO queue into which `Job`s can be pushed, which maintains at most one running `Fut` at a time. + /// + /// Each subscribed/connected WebSocket maintains a `FutureQueue` of incoming messages to handle. + /// + /// `Fut` should implement `Future`. + /// `StartFn` should implement `FnMut(Job) -> Fut`. + pub struct FutureQueue { + job_queue: VecDeque, + start_fn: StartFn, #[pin] - fut: Fuse, + running_job: Fuse, } } -pub fn future_queue(f: F) -> FutureQueue +/// Construct a `FutureQueue` which uses `start_fn` to run its frontmost job. +pub fn future_queue(start_fn: StartFn) -> FutureQueue where - F: FnMut(T) -> Fut, + StartFn: FnMut(Job) -> Fut, Fut: Future, { FutureQueue { - queue: VecDeque::new(), - f, - fut: Fuse::terminated(), + job_queue: VecDeque::new(), + start_fn, + running_job: Fuse::terminated(), } } -impl FutureQueue +impl FutureQueue where - F: FnMut(T) -> Fut, + StartFn: FnMut(Job) -> Fut, Fut: Future, { - pub fn push(self: Pin<&mut Self>, item: T) { - self.project().queue.push_back(item) + /// Insert a job into the FIFO queue. + /// + /// When the job reaches the front of the queue and this queue is awaited, + /// `self.start_fn` will be applied to `job` to start it, + /// and awaiting this queue will await that future. + /// + /// As with all futures, the job will not run unless awaited. + /// In addition, `FutureQueue` will not start a new job until the previous job has finished, + /// so `self.start_fn` will not be called until `self` is polled + /// enough times to consume all earlier entries in the queue. + pub fn push(self: Pin<&mut Self>, job: Job) { + self.project().job_queue.push_back(job) } - pub fn push_unpin(&mut self, item: T) { - self.queue.push_back(item) + + /// Insert a job into the FIFO queue. + /// + /// When the job reaches the front of the queue and this queue is awaited, + /// `self.start_fn` will be applied to `job` to start it, + /// and awaiting this queue will await that future. + /// + /// As with all futures, the job will not run unless awaited. + /// In addition, `FutureQueue` will not start a new job until the previous job has finished, + /// so `self.start_fn` will not be called until `self` is polled + /// enough times to consume all earlier entries in the queue. + pub fn push_unpin(&mut self, job: Job) { + self.job_queue.push_back(job) } + + /// Remove all jobs from the queue without running them, and cancel the current job if one is running. + /// + /// Subscriptions clear their queue upon disconnecting, + /// to avoid leaving stale jobs that will never be started or awaited. pub fn clear(self: Pin<&mut Self>) { let mut me = self.project(); - me.queue.clear(); - me.fut.set(Fuse::terminated()); + me.job_queue.clear(); + me.running_job.set(Fuse::terminated()); } } -impl Stream for FutureQueue +impl Stream for FutureQueue where - F: FnMut(T) -> Fut, + StartFn: FnMut(Job) -> Fut, Fut: Future, { type Item = Fut::Output; @@ -56,24 +89,24 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { let mut me = self.project(); loop { - if !me.fut.is_terminated() { - return me.fut.poll(cx).map(Some); + if !me.running_job.is_terminated() { + return me.running_job.poll(cx).map(Some); } - let Some(item) = me.queue.pop_front() else { + let Some(item) = me.job_queue.pop_front() else { return Poll::Ready(None); }; - let fut = (me.f)(item); - me.fut.as_mut().set(fut.fuse()); + let fut = (me.start_fn)(item); + me.running_job.as_mut().set(fut.fuse()); } } } -impl FusedStream for FutureQueue +impl FusedStream for FutureQueue where - F: FnMut(T) -> Fut, + StartFn: FnMut(Job) -> Fut, Fut: Future, { fn is_terminated(&self) -> bool { - self.fut.is_terminated() && self.queue.is_empty() + self.running_job.is_terminated() && self.job_queue.is_empty() } }