Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock between the crawler and dialer, and other hangs #1950

Merged
merged 7 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,22 @@ where
match self.state {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// The peer can starve client requests if it sends an
// uninterrupted series of messages. But this is unlikely in
// practice, due to network delays.
//
// If both futures are ready, there's no particular reason
// to prefer one over the other.
//
// TODO: use `futures::select!`, which chooses a ready future
// at random, avoiding starvation
// (To use `select!`, we'll need to map the different
// results to a new enum types.)
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
match future::select(peer_rx.next(), self.client_rx.next()).await {
Either::Left((None, _)) => {
self.fail_with(PeerError::ConnectionClosed);
Expand Down Expand Up @@ -404,14 +420,21 @@ where
.request_timer
.as_mut()
.expect("timeout must be set while awaiting response");
let cancel = future::select(timer_ref, tx.cancellation());
match future::select(peer_rx.next(), cancel)
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// If multiple futures are ready, we want the cancellation
// to take priority, then the timeout, then peer responses.
let cancel = future::select(tx.cancellation(), timer_ref);
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
match future::select(cancel, peer_rx.next())
.instrument(span.clone())
.await
{
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
Either::Right((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Right((Some(Err(e)), _)) => self.fail_with(e),
Either::Right((Some(Ok(peer_msg)), _cancel)) => {
// Try to process the message using the handler.
// This extremely awkward construction avoids
// keeping a live reference to handler across the
Expand Down Expand Up @@ -455,7 +478,7 @@ where
};
}
}
Either::Right((Either::Left(_), _peer_fut)) => {
Either::Left((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
self.state = match self.state {
Expand All @@ -478,7 +501,7 @@ where
),
};
}
Either::Right((Either::Right(_), _peer_fut)) => {
Either::Left((Either::Left(_), _peer_fut)) => {
trace!(parent: &span, "client request was cancelled");
self.state = State::AwaitingRequest;
}
Expand Down
19 changes: 12 additions & 7 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,12 +504,17 @@ where
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);
let mut send_addr_err = false;

// Currently, select prefers the first future.
// There is no starvation risk here, because
// interval has a limited rate, and shutdown
// is a oneshot.
match future::select(interval_stream.next(), shutdown_rx_ref).await {
Either::Left(_) => {
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// Starvation is impossible here, because interval has a
// slow rate, and shutdown is a oneshot. If both futures
// are ready, we want the shutdown to take priority over
// sending a useless heartbeat.
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
match future::select(shutdown_rx_ref, interval_stream.next()).await {
Either::Right(_) => {
let (tx, rx) = oneshot::channel();
let request = Request::Ping(Nonce::default());
tracing::trace!(?request, "queueing heartbeat request");
Expand Down Expand Up @@ -588,7 +593,7 @@ where
}
}
}
Either::Right(_) => {
Either::Left(_) => {
tracing::trace!("shutting down due to Client shut down");
// awaiting a local task won't hang
let _ = timestamp_collector
Expand Down