Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock between the crawler and dialer, and other hangs #1950

Merged
merged 7 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(60 + 20 + 20 + 20);
/// connected peer.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);

/// The number of GetAddr requests sent when crawling for new peers.
///
/// ## SECURITY
///
/// The fanout should be greater than 1, to ensure that Zebra's address book is
/// not dominated by a single peer.
pub const GET_ADDR_FANOUT: usize = 2;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

/// Truncate timestamps in outbound address messages to this time interval.
///
/// This is intended to prevent a peer from learning exactly when we received
Expand Down
37 changes: 30 additions & 7 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,22 @@ where
match self.state {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// The peer can starve client requests if it sends an
// uninterrupted series of messages. But this is unlikely in
// practice, due to network delays.
//
// If both futures are ready, there's no particular reason
// to prefer one over the other.
//
// TODO: use `futures::select!`, which chooses a ready future
// at random, avoiding starvation
// (To use `select!`, we'll need to map the different
// results to a new enum types.)
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
match future::select(peer_rx.next(), self.client_rx.next()).await {
Either::Left((None, _)) => {
self.fail_with(PeerError::ConnectionClosed);
Expand Down Expand Up @@ -404,14 +420,21 @@ where
.request_timer
.as_mut()
.expect("timeout must be set while awaiting response");
let cancel = future::select(timer_ref, tx.cancellation());
match future::select(peer_rx.next(), cancel)
// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// If multiple futures are ready, we want the cancellation
// to take priority, then the timeout, then peer responses.
let cancel = future::select(tx.cancellation(), timer_ref);
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
match future::select(cancel, peer_rx.next())
.instrument(span.clone())
.await
{
Either::Left((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Left((Some(Err(e)), _)) => self.fail_with(e),
Either::Left((Some(Ok(peer_msg)), _cancel)) => {
Either::Right((None, _)) => self.fail_with(PeerError::ConnectionClosed),
Either::Right((Some(Err(e)), _)) => self.fail_with(e),
Either::Right((Some(Ok(peer_msg)), _cancel)) => {
// Try to process the message using the handler.
// This extremely awkward construction avoids
// keeping a live reference to handler across the
Expand Down Expand Up @@ -455,7 +478,7 @@ where
};
}
}
Either::Right((Either::Left(_), _peer_fut)) => {
Either::Left((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
self.state = match self.state {
Expand All @@ -478,7 +501,7 @@ where
),
};
}
Either::Right((Either::Right(_), _peer_fut)) => {
Either::Left((Either::Left(_), _peer_fut)) => {
trace!(parent: &span, "client request was cancelled");
self.state = State::AwaitingRequest;
}
Expand Down
9 changes: 9 additions & 0 deletions zebra-network/src/peer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,13 @@ pub enum HandshakeError {
/// The remote peer offered a version older than our minimum version.
#[error("Peer offered obsolete version: {0:?}")]
ObsoleteVersion(crate::protocol::external::types::Version),
/// Sending or receiving a message timed out.
#[error("Timeout when sending or receiving a message to peer")]
Timeout,
}

impl From<tokio::time::error::Elapsed> for HandshakeError {
fn from(_source: tokio::time::error::Elapsed) -> Self {
HandshakeError::Timeout
}
}
168 changes: 128 additions & 40 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use tokio::{net::TcpStream, sync::broadcast};
use tokio::{net::TcpStream, sync::broadcast, time::timeout};
use tokio_util::codec::Framed;
use tower::Service;
use tracing::{span, Level, Span};
Expand All @@ -34,6 +34,12 @@ use super::{Client, Connection, ErrorSlot, HandshakeError, PeerError};

/// A [`Service`] that handshakes with a remote peer and constructs a
/// client/server pair.
///
/// CORRECTNESS
///
/// To avoid hangs, each handshake (or its connector) should be:
/// - launched in a separate task, and
/// - wrapped in a timeout.
#[derive(Clone)]
pub struct Handshake<S> {
config: Config,
Expand Down Expand Up @@ -211,6 +217,10 @@ where
let fut = async move {
debug!("connecting to remote peer");

// CORRECTNESS
//
// As a defence-in-depth against hangs, every send or next on stream
// should be wrapped in a timeout.
let mut stream = Framed::new(
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
tcp_stream,
Codec::builder()
Expand Down Expand Up @@ -260,11 +270,10 @@ where
};

debug!(?version, "sending initial version message");
stream.send(version).await?;
timeout(constants::REQUEST_TIMEOUT, stream.send(version)).await??;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

let remote_msg = stream
.next()
.await
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;

// Check that we got a Version and destructure its fields into the local scope.
Expand Down Expand Up @@ -293,11 +302,10 @@ where
return Err(HandshakeError::NonceReuse);
}

stream.send(Message::Verack).await?;
timeout(constants::REQUEST_TIMEOUT, stream.send(Message::Verack)).await??;

let remote_msg = stream
.next()
.await
let remote_msg = timeout(constants::REQUEST_TIMEOUT, stream.next())
.await?
.ok_or(HandshakeError::ConnectionClosed)??;
if let Message::Verack = remote_msg {
debug!("got verack from remote peer");
Expand Down Expand Up @@ -376,22 +384,42 @@ where
future::ready(Ok(msg))
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
});

// CORRECTNESS
//
// Every message and error must update the peer address state via
// the inbound_ts_collector.
let inbound_ts_collector = timestamp_collector.clone();
let peer_rx = peer_rx
.then(move |msg| {
// Add a metric for inbound messages and fire a timestamp event.
let mut timestamp_collector = timestamp_collector.clone();
// Add a metric for inbound messages and errors.
// Fire a timestamp or failure event.
let mut inbound_ts_collector = inbound_ts_collector.clone();
async move {
if let Ok(msg) = &msg {
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
);
use futures::sink::SinkExt;
let _ = timestamp_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
match &msg {
Ok(msg) => {
metrics::counter!(
"zcash.net.in.messages",
1,
"command" => msg.to_string(),
"addr" => addr.to_string(),
);
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = inbound_ts_collector
.send(MetaAddr::new_responded(&addr, &remote_services))
.await;
}
Err(err) => {
metrics::counter!(
"zebra.net.in.errors",
1,
"error" => err.to_string(),
"addr" => addr.to_string(),
);
let _ = inbound_ts_collector
.send(MetaAddr::new_errored(&addr, &remote_services))
.await;
}
}
msg
}
Expand Down Expand Up @@ -452,6 +480,16 @@ where
.boxed(),
);

// CORRECTNESS
//
// To prevent hangs:
// - every await that depends on the network must have a timeout (or interval)
// - every error/shutdown must update the address book state and return
//
// The address book state can be updated via `ClientRequest.tx`, or the
// timestamp_collector.
//
// Returning from the spawned closure terminates the connection's heartbeat task.
let heartbeat_span = tracing::debug_span!(parent: connection_span, "heartbeat");
tokio::spawn(
async move {
Expand All @@ -460,11 +498,23 @@ where

let mut shutdown_rx = shutdown_rx;
let mut server_tx = server_tx;
let mut timestamp_collector = timestamp_collector.clone();
let mut interval_stream = tokio::time::interval(constants::HEARTBEAT_INTERVAL);
loop {
let shutdown_rx_ref = Pin::new(&mut shutdown_rx);
match future::select(interval_stream.next(), shutdown_rx_ref).await {
Either::Left(_) => {
let mut send_addr_err = false;

// CORRECTNESS
//
// Currently, select prefers the first future if multiple
// futures are ready.
//
// Starvation is impossible here, because interval has a
// slow rate, and shutdown is a oneshot. If both futures
// are ready, we want the shutdown to take priority over
// sending a useless heartbeat.
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
match future::select(shutdown_rx_ref, interval_stream.next()).await {
Either::Right(_) => {
let (tx, rx) = oneshot::channel();
let request = Request::Ping(Nonce::default());
tracing::trace!(?request, "queueing heartbeat request");
Expand All @@ -474,19 +524,28 @@ where
span: tracing::Span::current(),
}) {
Ok(()) => {
match server_tx.flush().await {
Ok(()) => {}
// TODO: also wait on the shutdown_rx here
match timeout(
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
constants::HEARTBEAT_INTERVAL,
server_tx.flush(),
)
.await
{
Ok(Ok(())) => {
}
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
Ok(Err(e)) => {
tracing::warn!(
?e,
"flushing client request failed, shutting down"
);
send_addr_err = true;
}
Err(e) => {
// We can't get the client request for this failure,
// so we can't send an error back here. But that's ok,
// because:
// - this error never happens (or it's very rare)
// - if the flush() fails, the server hasn't
// received the request
tracing::warn!(
"flushing client request failed: {:?}",
e
?e,
"flushing client request timed out, shutting down"
);
send_addr_err = true;
}
}
}
Expand Down Expand Up @@ -514,17 +573,46 @@ where
// Heartbeats are checked internally to the
// connection logic, but we need to wait on the
// response to avoid canceling the request.
match rx.await {
Ok(_) => tracing::trace!("got heartbeat response"),
Err(_) => {
tracing::trace!(
//
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: also wait on the shutdown_rx here
match timeout(constants::HEARTBEAT_INTERVAL, rx).await {
Ok(Ok(_)) => tracing::trace!("got heartbeat response"),
Ok(Err(e)) => {
tracing::warn!(
?e,
"error awaiting heartbeat response, shutting down"
);
return;
send_addr_err = true;
}
Err(e) => {
tracing::warn!(
?e,
"heartbeat response timed out, shutting down"
);
send_addr_err = true;
}
}
}
Either::Right(_) => return, // got shutdown signal
Either::Left(_) => {
tracing::trace!("shutting down due to Client shut down");
// awaiting a local task won't hang
let _ = timestamp_collector
.send(MetaAddr::new_shutdown(&addr, &remote_services))
.await;
return;
}
}
if send_addr_err {
// We can't get the client request for this failure,
// so we can't send an error back on `tx`. So
// we just update the address book with a failure.
let _ = timestamp_collector
.send(MetaAddr::new_errored(
&addr,
&remote_services,
))
.await;
return;
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
Loading