Skip to content

Commit

Permalink
Fix buggy future::select function usage
Browse files Browse the repository at this point in the history
And document the correctness of the new code.
  • Loading branch information
teor2345 committed Apr 7, 2021
1 parent e6fa9bc commit b51070e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 14 deletions.
37 changes: 30 additions & 7 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,22 @@ where
match self.state {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// The peer can starve client requests if it sends an
// uninterrupted series of messages. But this is unlikely in
// practice, due to network delays.
//
// If both futures are ready, there's no particular reason
// to prefer one over the other.
//
// TODO: use `futures::select!`, which chooses a ready future
// at random, avoiding starvation
// (To use `select!`, we'll need to map the different
// results to a new enum types.)
match future::select(peer_rx.next(), self.client_rx.next()).await {
Either::Left((None, _)) => {
self.fail_with(PeerError::ConnectionClosed);
Expand Down Expand Up @@ -404,14 +420,21 @@ where
.request_timer
.as_mut()
.expect("timeout must be set while awaiting response");
let cancel = future::select(timer_ref, tx.cancellation());
match future::select(peer_rx.next(), cancel)
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// If multiple futures are ready, we want the cancellation
// to take priority, then the timeout, then peer responses.
let cancel = future::select(tx.cancellation(), timer_ref);
match future::select(cancel, peer_rx.next())
.instrument(span.clone())
.await
{
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
Either::Right((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Right((Some(Err(e)), _)) => self.fail_with(e),
Either::Right((Some(Ok(peer_msg)), _cancel)) => {
// Try to process the message using the handler.
// This extremely awkward construction avoids
// keeping a live reference to handler across the
Expand Down Expand Up @@ -455,7 +478,7 @@ where
};
}
}
Either::Right((Either::Left(_), _peer_fut)) => {
Either::Left((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
self.state = match self.state {
Expand All @@ -478,7 +501,7 @@ where
),
};
}
Either::Right((Either::Right(_), _peer_fut)) => {
Either::Left((Either::Left(_), _peer_fut)) => {
trace!(parent: &span, "client request was cancelled");
self.state = State::AwaitingRequest;
}
Expand Down
19 changes: 12 additions & 7 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,17 @@ where
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);
let mut send_addr_err = false;

// Currently, select prefers the first future.
// There is no starvation risk here, because
// interval has a limited rate, and shutdown
// is a oneshot.
match future::select(interval_stream.next(), shutdown_rx_ref).await {
Either::Left(_) => {
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// Starvation is impossible here, because interval has a
// slow rate, and shutdown is a oneshot. If both futures
// are ready, we want the shutdown to take priority over
// sending a useless heartbeat.
match future::select(shutdown_rx_ref, interval_stream.next()).await {
Either::Right(_) => {
let (tx, rx) = oneshot::channel();
let request = Request::Ping(Nonce::default());
tracing::trace!(?request, "queueing heartbeat request");
Expand Down Expand Up @@ -588,7 +593,7 @@ where
}
}
}
Either::Right(_) => {
Either::Left(_) => {
tracing::trace!("shutting down due to Client shut down");
// awaiting a local task won't hang
let _ = timestamp_collector
Expand Down

0 comments on commit b51070e

Please sign in to comment.