Skip to content

Commit

Permalink
Add some comments to FutureQueue and the subscribe route (#378)
Browse files Browse the repository at this point in the history
  • Loading branch information
gefjon authored Oct 6, 2023
1 parent eb621fd commit 77065a8
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 28 deletions.
30 changes: 29 additions & 1 deletion crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,12 @@ const LIVELINESS_TIMEOUT: Duration = Duration::from_secs(60);
async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut sendrx: mpsc::Receiver<DataMessage>) {
let mut liveness_check_interval = tokio::time::interval(LIVELINESS_TIMEOUT);
let mut got_pong = true;

// Build a queue of incoming messages to handle,
// to be processed one at a time, in the order they're received.
// TODO: do we want this to have a fixed capacity? or should it be unbounded
let mut handle_queue = pin!(future_queue(|message| client.handle_message(message)));

let mut closed = false;
loop {
enum Item {
Expand All @@ -180,7 +184,13 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
let message = tokio::select! {
// NOTE: all of the futures for these branches **must** be cancel safe. do not
// change this if you don't know what that means.

// If we have a result from handling a past message to report,
// grab it to handle in the next `match`.
Some(res) = handle_queue.next() => Item::HandleResult(res),

// If we've received an incoming message,
// grab it to handle in the next `match`.
message = ws.next() => match message {
Some(Ok(m)) => Item::Message(ClientMessage::from_message(m)),
Some(Err(error)) => {
Expand All @@ -190,6 +200,9 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
// the client sent us a close frame
None => break,
},

// If we have an outgoing message to send, send it off.
// No incoming `message` to handle, so `continue`.
Some(message) = sendrx.recv() => {
if closed {
// TODO: this isn't great. when we receive a close request from the peer,
Expand All @@ -204,14 +217,19 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
}
continue;
}

// If the module has exited, close the websocket.
() = client.module.exited(), if !closed => {
if let Err(e) = ws.close(Some(CloseFrame { code: CloseCode::Away, reason: "module exited".into() })).await {
log::warn!("error closing: {e:#}")
}
closed = true;
continue;
}

// If it's time to send a ping...
_ = liveness_check_interval.tick() => {
// If we received a pong at some point, send a fresh ping.
if mem::take(&mut got_pong) {
if let Err(e) = ws.send(WsMessage::Ping(Vec::new())).await {
log::warn!("error sending ping: {e:#}");
Expand All @@ -224,6 +242,13 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
}
}
};

// Handle the incoming message we grabbed in the previous `select!`.

// TODO: Data flow appears to not require `enum Item` or this distinct `match`,
// since `Item::HandleResult` comes from exactly one `select!` branch,
// and `Item::Message` comes from exactly one distinct `select!` branch.
// Consider merging this `match` with the previous `select!`.
match message {
Item::Message(ClientMessage::Message(message)) => handle_queue.as_mut().push(message),
Item::HandleResult(res) => {
Expand All @@ -250,6 +275,7 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
}
Item::Message(ClientMessage::Ping(_message)) => {
log::trace!("Received ping from client {}", client.id);
// TODO: should we respond with a `Pong`?
}
Item::Message(ClientMessage::Pong(_message)) => {
log::trace!("Received heartbeat from client {}", client.id);
Expand All @@ -274,7 +300,9 @@ async fn ws_client_actor(client: ClientConnection, mut ws: WebSocketStream, mut
log::debug!("Client connection ended");
sendrx.close();

// clear the queue before we go on to clean up, or else we can get a situation like:
// Clear the incoming message queue before we go to clean up.
// Otherwise, we can be left with a stale future which never gets awaited,
// which can lead to bugs like:
// https://rust-lang.github.io/wg-async/vision/submitted_stories/status_quo/aws_engineer/solving_a_deadlock.html
handle_queue.clear();

Expand Down
87 changes: 60 additions & 27 deletions crates/core/src/util/future_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,105 @@ use std::pin::Pin;
use std::task::{self, Poll};

pin_project! {
pub struct FutureQueue<T, F, Fut> {
queue: VecDeque<T>,
f: F,
/// A FIFO queue into which `Job`s can be pushed, which maintains at most one running `Fut` at a time.
///
/// Each subscribed/connected WebSocket maintains a `FutureQueue` of incoming messages to handle.
///
/// `Fut` should implement `Future`.
/// `StartFn` should implement `FnMut(Job) -> Fut`.
pub struct FutureQueue<Job, StartFn, Fut> {
job_queue: VecDeque<Job>,
start_fn: StartFn,
#[pin]
fut: Fuse<Fut>,
running_job: Fuse<Fut>,
}
}

pub fn future_queue<T, F, Fut>(f: F) -> FutureQueue<T, F, Fut>
/// Construct a `FutureQueue` which uses `start_fn` to run its frontmost job.
pub fn future_queue<Job, StartFn, Fut>(start_fn: StartFn) -> FutureQueue<Job, StartFn, Fut>
where
F: FnMut(T) -> Fut,
StartFn: FnMut(Job) -> Fut,
Fut: Future,
{
FutureQueue {
queue: VecDeque::new(),
f,
fut: Fuse::terminated(),
job_queue: VecDeque::new(),
start_fn,
running_job: Fuse::terminated(),
}
}

impl<T, F, Fut> FutureQueue<T, F, Fut>
impl<Job, StartFn, Fut> FutureQueue<Job, StartFn, Fut>
where
F: FnMut(T) -> Fut,
StartFn: FnMut(Job) -> Fut,
Fut: Future,
{
pub fn push(self: Pin<&mut Self>, item: T) {
self.project().queue.push_back(item)
/// Insert a job into the FIFO queue.
///
/// When the job reaches the front of the queue and this queue is awaited,
/// `self.start_fn` will be applied to `job` to start it,
/// and awaiting this queue will await that future.
///
/// As with all futures, the job will not run unless awaited.
/// In addition, `FutureQueue` will not start a new job until the previous job has finished,
/// so `self.start_fn` will not be called until `self` is polled
/// enough times to consume all earlier entries in the queue.
pub fn push(self: Pin<&mut Self>, job: Job) {
self.project().job_queue.push_back(job)
}
pub fn push_unpin(&mut self, item: T) {
self.queue.push_back(item)

/// Insert a job into the FIFO queue.
///
/// When the job reaches the front of the queue and this queue is awaited,
/// `self.start_fn` will be applied to `job` to start it,
/// and awaiting this queue will await that future.
///
/// As with all futures, the job will not run unless awaited.
/// In addition, `FutureQueue` will not start a new job until the previous job has finished,
/// so `self.start_fn` will not be called until `self` is polled
/// enough times to consume all earlier entries in the queue.
pub fn push_unpin(&mut self, job: Job) {
self.job_queue.push_back(job)
}

/// Remove all jobs from the queue without running them, and cancel the current job if one is running.
///
/// Subscriptions clear their queue upon disconnecting,
/// to avoid leaving stale jobs that will never be started or awaited.
pub fn clear(self: Pin<&mut Self>) {
let mut me = self.project();
me.queue.clear();
me.fut.set(Fuse::terminated());
me.job_queue.clear();
me.running_job.set(Fuse::terminated());
}
}

impl<T, F, Fut> Stream for FutureQueue<T, F, Fut>
impl<Job, StartFn, Fut> Stream for FutureQueue<Job, StartFn, Fut>
where
F: FnMut(T) -> Fut,
StartFn: FnMut(Job) -> Fut,
Fut: Future,
{
type Item = Fut::Output;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
loop {
if !me.fut.is_terminated() {
return me.fut.poll(cx).map(Some);
if !me.running_job.is_terminated() {
return me.running_job.poll(cx).map(Some);
}
let Some(item) = me.queue.pop_front() else {
let Some(item) = me.job_queue.pop_front() else {
return Poll::Ready(None);
};
let fut = (me.f)(item);
me.fut.as_mut().set(fut.fuse());
let fut = (me.start_fn)(item);
me.running_job.as_mut().set(fut.fuse());
}
}
}

impl<T, F, Fut> FusedStream for FutureQueue<T, F, Fut>
impl<Job, StartFn, Fut> FusedStream for FutureQueue<Job, StartFn, Fut>
where
F: FnMut(T) -> Fut,
StartFn: FnMut(Job) -> Fut,
Fut: Future,
{
fn is_terminated(&self) -> bool {
self.fut.is_terminated() && self.queue.is_empty()
self.running_job.is_terminated() && self.job_queue.is_empty()
}
}

0 comments on commit 77065a8

Please sign in to comment.