diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index cb1ec52fa5a..fd974f06f54 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -372,6 +372,22 @@ where match self.state { State::AwaitingRequest => { trace!("awaiting client request or peer message"); + // CORRECTNESS + // + // Currently, select prefers the first future if multiple + // futures are ready. + // + // The peer can starve client requests if it sends an + // uninterrupted series of messages. But this is unlikely in + // practice, due to network delays. + // + // If both futures are ready, there's no particular reason + // to prefer one over the other. + // + // TODO: use `futures::select!`, which chooses a ready future + // at random, avoiding starvation + // (To use `select!`, we'll need to map the different + // results to a new enum types.) match future::select(peer_rx.next(), self.client_rx.next()).await { Either::Left((None, _)) => { self.fail_with(PeerError::ConnectionClosed); @@ -404,14 +420,21 @@ where .request_timer .as_mut() .expect("timeout must be set while awaiting response"); - let cancel = future::select(timer_ref, tx.cancellation()); - match future::select(peer_rx.next(), cancel) + // CORRECTNESS + // + // Currently, select prefers the first future if multiple + // futures are ready. + // + // If multiple futures are ready, we want the cancellation + // to take priority, then the timeout, then peer responses. + let cancel = future::select(tx.cancellation(), timer_ref); + match future::select(cancel, peer_rx.next()) .instrument(span.clone()) .await { - Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed), - Either::Left((Some(Err(e)), _)) => self.fail_with(e), - Either::Left((Some(Ok(peer_msg)), _cancel)) => { + Either::Right((None, _)) => self.fail_with(PeerError::ConnectionClosed), + Either::Right((Some(Err(e)), _)) => self.fail_with(e), + Either::Right((Some(Ok(peer_msg)), _cancel)) => { // Try to process the message using the handler. // This extremely awkward construction avoids // keeping a live reference to handler across the @@ -455,7 +478,7 @@ where }; } } - Either::Right((Either::Left(_), _peer_fut)) => { + Either::Left((Either::Right(_), _peer_fut)) => { trace!(parent: &span, "client request timed out"); let e = PeerError::ClientRequestTimeout; self.state = match self.state { @@ -478,7 +501,7 @@ where ), }; } - Either::Right((Either::Right(_), _peer_fut)) => { + Either::Left((Either::Left(_), _peer_fut)) => { trace!(parent: &span, "client request was cancelled"); self.state = State::AwaitingRequest; } diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index f04bbee1224..c35bfbae47f 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -504,12 +504,17 @@ where let shutdown_rx_ref = Pin::new(&mut shutdown_rx); let mut send_addr_err = false; - // Currently, select prefers the first future. - // There is no starvation risk here, because - // interval has a limited rate, and shutdown - // is a oneshot. - match future::select(interval_stream.next(), shutdown_rx_ref).await { - Either::Left(_) => { + // CORRECTNESS + // + // Currently, select prefers the first future if multiple + // futures are ready. + // + // Starvation is impossible here, because interval has a + // slow rate, and shutdown is a oneshot. If both futures + // are ready, we want the shutdown to take priority over + // sending a useless heartbeat. + match future::select(shutdown_rx_ref, interval_stream.next()).await { + Either::Right(_) => { let (tx, rx) = oneshot::channel(); let request = Request::Ping(Nonce::default()); tracing::trace!(?request, "queueing heartbeat request"); @@ -588,7 +593,7 @@ where } } } - Either::Right(_) => { + Either::Left(_) => { tracing::trace!("shutting down due to Client shut down"); // awaiting a local task won't hang let _ = timestamp_collector