Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock between the crawler and dialer, and other hangs #1950

Merged
merged 7 commits into from
Apr 7, 2021
125 changes: 76 additions & 49 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use futures::{
use tokio::{
net::{TcpListener, TcpStream},
sync::broadcast,
time::Instant,
};
use tower::{
buffer::Buffer, discover::Change, layer::Layer, load::peak_ewma::PeakEwmaDiscover,
Expand All @@ -26,14 +27,15 @@ use tracing::Span;
use tracing_futures::Instrument;

use crate::{
constants, peer, timestamp_collector::TimestampCollector, AddressBook, BoxError, Config,
Request, Response,
constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, AddressBook,
BoxError, Config, Request, Response,
};

use zebra_chain::parameters::Network;

use super::CandidateSet;
use super::PeerSet;
use peer::Client;

type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;

Expand Down Expand Up @@ -267,6 +269,21 @@ where
}
}

/// An action that the peer crawler can take.
#[allow(dead_code)]
enum CrawlerAction {
/// Handle a demand signal.
Demand,
/// Crawl existing peers for more peers in response to a timer `tick`.
TimerCrawl { tick: Instant },
/// Handle a successfully connected handshake `peer_set_change`.
HandshakeConnected {
peer_set_change: Change<SocketAddr, Client>,
},
/// Handle a handshake failure to `failed_addr`.
HandshakeFailed { failed_addr: MetaAddr },
}

/// Given a channel that signals a need for new peers, try to connect to a peer
/// and send the resulting `peer::Client` through a channel.
#[instrument(skip(
Expand All @@ -280,7 +297,7 @@ where
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
mut demand_tx: mpsc::Sender<()>,
mut demand_rx: mpsc::Receiver<()>,
demand_rx: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>,
mut connector: C,
mut success_tx: mpsc::Sender<PeerChange>,
Expand All @@ -291,21 +308,27 @@ where
S: Service<Request, Response = Response, Error = BoxError>,
S::Future: Send + 'static,
{
use futures::{
future::{
select,
Either::{Left, Right},
},
TryFutureExt,
};
use CrawlerAction::*;

// CORRECTNESS
//
// To avoid hangs and starvation, the crawler must:
// - spawn a separate task for each crawl and handshake, so they can make
// progress independently (and avoid deadlocking each other)
// - use the `select!` macro for all actions, because the `select` function
// is biased towards the first ready future

let mut handshakes = FuturesUnordered::new();
// <FuturesUnordered as Stream> returns None when empty.
// Keeping an unresolved future in the pool means the stream
// never terminates.
// We could use StreamExt::select_next_some and StreamExt::fuse, but `fuse`
// prevents us from adding items to the stream and checking its length.
handshakes.push(future::pending().boxed());

let mut crawl_timer = tokio::time::interval(crawl_new_peer_interval);
let mut crawl_timer =
tokio::time::interval(crawl_new_peer_interval).map(|tick| TimerCrawl { tick });
let mut demand_rx = demand_rx.map(|()| Demand);

loop {
metrics::gauge!(
Expand All @@ -315,74 +338,78 @@ where
.checked_sub(1)
.expect("the pool always contains an unresolved future") as f64
);
// This is a little awkward because there's no select3.
match select(
select(demand_rx.next(), crawl_timer.next()),
handshakes.next(),
)
.await
{
Left((Left((Some(_demand), _)), _)) => {

let crawler_action = tokio::select! {
a = handshakes.next() => a.expect("handshakes never terminates, because it contains a future that never resolves"),
a = crawl_timer.next() => a.expect("crawl_timer never terminates"),
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
a = demand_rx.next() => a.expect("demand_rx never fails, because we hold demand_tx"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename a to represent what is actually being returned

};

match crawler_action {
Demand => {
if handshakes.len() > 50 {
// This is set to trace level because when the peerset is
// congested it can generate a lot of demand signal very rapidly.
// congested it can generate a lot of demand signal very
// rapidly.
trace!("too many in-flight handshakes, dropping demand signal");
continue;
}
// TODO: spawn independent task to avoid deadlocks
// candidates has a short delay, and briefly holds the address
// book lock, so it shouldn't hang
if let Some(candidate) = candidates.next().await {
debug!(?candidate.addr, "attempting outbound connection in response to demand");
// the connector is always ready, so this can't hang
connector.ready_and().await?;
handshakes.push(
connector
.call(candidate.addr)
.map_err(move |e| {
// the handshake has timeouts, so it shouldn't hang
handshakes.push(Box::pin(connector.call(candidate.addr).map(
move |res| match res {
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
Err(e) => {
debug!(?candidate.addr, ?e, "failed to connect to candidate");
candidate
})
.boxed(),
);
HandshakeFailed {
failed_addr: candidate,
}
}
},
)));
} else {
debug!("demand for peers but no available candidates");
// update has timeouts, and briefly holds the address book
// lock, so it shouldn't hang
candidates.update().await?;
// Try to connect to a new peer.
let _ = demand_tx.try_send(());
}
}
// did a drill sergeant write this? no there's just no Either3
Left((Right((Some(_timer), _)), _)) => {
debug!("crawling for more peers");
TimerCrawl { tick } => {
debug!(
?tick,
"crawling for more peers in response to the crawl timer"
);
// TODO: spawn independent task to avoid deadlocks
candidates.update().await?;
// Try to connect to a new peer.
let _ = demand_tx.try_send(());
}
Right((Some(Ok(change)), _)) => {
if let Change::Insert(ref addr, _) = change {
HandshakeConnected { peer_set_change } => {
if let Change::Insert(ref addr, _) = peer_set_change {
debug!(candidate.addr = ?addr, "successfully dialed new peer");
} else {
unreachable!("unexpected handshake result: all changes should be Insert");
}
success_tx.send(Ok(change)).await?;
// successes are handled by an independent task, so they
// shouldn't hang
success_tx.send(Ok(peer_set_change)).await?;
}
Right((Some(Err(candidate)), _)) => {
debug!(?candidate.addr, "marking candidate as failed");
candidates.report_failed(&candidate);
HandshakeFailed { failed_addr } => {
debug!(?failed_addr.addr, "marking candidate as failed");
candidates.report_failed(&failed_addr);
// The demand signal that was taken out of the queue
// to attempt to connect to the failed candidate never
// turned into a connection, so add it back:
let _ = demand_tx.try_send(());
}
// We don't expect to see these patterns during normal operation
Left((Left((None, _)), _)) => {
unreachable!("demand_rx never fails, because we hold demand_tx");
}
Left((Right((None, _)), _)) => {
unreachable!("crawl_timer never terminates");
}
Right((None, _)) => {
unreachable!(
"handshakes never terminates, because it contains a future that never resolves"
);
}
}
}
}