Skip to content

Commit

Permalink
Security: Spawn a separate task for each initial handshake
Browse files Browse the repository at this point in the history
This fix prevents hangs and deadlocks during initialization, particularly
when there are a small number of valid peers in the initial peer config
(or from the DNS seeders).
  • Loading branch information
teor2345 committed May 21, 2021
1 parent 2685fc7 commit 344d118
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 59 deletions.
1 change: 1 addition & 0 deletions zebra-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ impl Config {
return HashSet::new();
}

info!(config_peers_len = peers.len(), config_peers = ?peers, "resolving configured peers");
loop {
// We retry each peer individually, as well as retrying if there are
// no peers in the combined list. DNS failures are correlated, so all
Expand Down
14 changes: 14 additions & 0 deletions zebra-network/src/meta_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,20 @@ pub struct MetaAddr {
}

impl MetaAddr {
/// Create a new seed [`MetaAddr`, based on the configured seed addresses.
pub fn new_seed_meta_addr(addr: SocketAddr) -> MetaAddr {
MetaAddr {
addr,
// TODO: stop guessing this field
services: PeerServices::NODE_NETWORK,
// TODO: remove this time, it's far too optimistic. Using `now` can
// keep invalid peers in the consensus peer set for a long time.
last_seen: Utc::now(),
// TODO: replace with NeverAttemptedSeed
last_connection_state: NeverAttemptedGossiped,
}
}

/// Create a new gossiped [`MetaAddr`], based on the deserialized fields from
/// a gossiped peer [`Addr`][crate::protocol::external::Message::Addr] message.
pub fn new_gossiped_meta_addr(
Expand Down
193 changes: 134 additions & 59 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use tracing::Span;
use tracing_futures::Instrument;

use crate::{
constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector, AddressBook,
BoxError, Config, Request, Response,
constants, meta_addr::MetaAddr, peer, timestamp_collector::TimestampCollector,
types::PeerServices, AddressBook, BoxError, Config, Request, Response,
};

use zebra_chain::parameters::Network;

use super::CandidateSet;
use super::PeerSet;
use peer::Client;

type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;

Expand Down Expand Up @@ -74,7 +73,6 @@ where
let (listen_handshaker, outbound_connector) = {
use tower::timeout::TimeoutLayer;
let hs_timeout = TimeoutLayer::new(constants::HANDSHAKE_TIMEOUT);
use crate::protocol::external::types::PeerServices;
let hs = peer::Handshake::builder()
.with_config(config.clone())
.with_inbound_service(inbound_service)
Expand Down Expand Up @@ -114,6 +112,8 @@ where
);
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);

// Connect the tx end to the 3 peer sources:
//
// 1. Incoming peer connections, via a listener.

// Warn if we're configured using the wrong network port.
Expand Down Expand Up @@ -151,7 +151,7 @@ where
.boxed()
};

let add_guard = tokio::spawn(initial_peers_fut.instrument(Span::current()));
let initial_peers_guard = tokio::spawn(initial_peers_fut.instrument(Span::current()));

// 3. Outgoing peers we connect to in response to load.
let mut candidates = CandidateSet::new(address_book.clone(), peer_set.clone());
Expand Down Expand Up @@ -182,47 +182,114 @@ where
);

handle_tx
.send(vec![add_guard, listen_guard, crawl_guard])
.send(vec![initial_peers_guard, listen_guard, crawl_guard])
.unwrap();

(peer_set, address_book)
}

/// Use the provided `handshaker` to connect to `initial_peers`, then send
/// the results over `tx`.
#[instrument(skip(initial_peers, outbound_connector, tx))]
async fn add_initial_peers<S>(
/// Use the provided `outbound_connector` to connect to `initial_peers`, then
/// send the results over `peerset_tx`.
#[instrument(skip(initial_peers, outbound_connector, peerset_tx))]
async fn add_initial_peers<C>(
initial_peers: std::collections::HashSet<SocketAddr>,
outbound_connector: S,
mut tx: mpsc::Sender<PeerChange>,
outbound_connector: C,
mut peerset_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError> + Clone,
S::Future: Send + 'static,
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
+ Clone
+ Send
+ 'static,
C::Future: Send + 'static,
{
info!(?initial_peers, "connecting to initial peer set");
// ## Correctness:
let initial_peers_len = initial_peers.len();
info!(
?initial_peers_len,
?initial_peers,
"connecting to initial peer set"
);

let initial_meta_addr = initial_peers.into_iter().map(MetaAddr::new_seed_meta_addr);

// # Correctness
//
// ## Concurrency
//
// We spawn each handshake in a separate task. This avoids:
// - sequentially waiting on each handshake's timeout:
// `4 seconds * initial peer count` maximum delay
// - dependencies between the first successful initial peer and other tasks
//
// ## Buffer Reservations
//
// Each `CallAll` can hold one `Buffer` or `Batch` reservation for
// an indefinite period. We can use `CallAllUnordered` without filling
// the underlying `Inbound` buffer, because we immediately drive this
// single `CallAll` to completion, and handshakes have a short timeout.
let mut handshakes: FuturesUnordered<_> = initial_peers
.into_iter()
.map(|addr| {
outbound_connector
.clone()
.oneshot(addr)
.map_err(move |e| (addr, e))
})
.collect();

while let Some(handshake_result) = handshakes.next().await {
// this is verbose, but it's better than just hanging with no output
if let Err((addr, ref e)) = handshake_result {
info!(?addr, ?e, "an initial peer connection failed");
// Each `FuturesUnordered` can hold one `Buffer` reservation for an
// indefinite period. We can use `FuturesUnordered` without filling the
// underlying `Handshake` buffer, because we immediately drive this single
// `FuturesUnordered` to completion, and handshakes have a short timeout.
let mut handshakes = FuturesUnordered::new();
for candidate in initial_meta_addr {
let hs_join = tokio::spawn(dial(candidate, outbound_connector.clone()))
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!(
"panic during initial handshake with {:?}: {:?} ",
candidate, e
);
}
})
.instrument(Span::current());
handshakes.push(Box::pin(hs_join));
}

// TODO: replace with *success_count_tx.borrow() in Tokio 1.6
let mut success_count = 0;
while let Some(handshake_action) = handshakes.next().await {
use CrawlerAction::*;
match handshake_action {
HandshakeConnected { peer_set_change } => {
success_count += 1;
if let Change::Insert(ref addr, _) = peer_set_change {
debug!(?addr, ?success_count, "successfully dialed initial peer");
} else {
unreachable!("unexpected handshake result: all changes should be Insert");
}
// the peer set is handled by an independent task, so this send
// shouldn't hang
peerset_tx.send(Ok(peer_set_change)).await?;
}
HandshakeFailed { failed_addr, error } => {
if success_count <= constants::GET_ADDR_FANOUT {
// this creates verbose logs, but it's better than just hanging on
// startup with no output
info!(addr = ?failed_addr.addr,
?error,
?success_count,
"an initial peer connection failed");
} else {
// switch to debug when we have enough peers
debug!(addr = ?failed_addr.addr,
?error,
?success_count,
"an initial peer connection failed");
}
continue;
}
DemandCrawl | DemandDrop | DemandHandshake { .. } | TimerCrawl { .. } => {
unreachable!("unexpected CrawlerAction: should be handshake result")
}
}
tx.send(handshake_result.map_err(|(_addr, e)| e)).await?;
}

if success_count > 0 {
info!(
?success_count,
?initial_peers_len,
"finished connection attempts to initial peer set"
);
} else {
warn!(?initial_peers_len, "no successful initial peer connections");
}

Ok(())
Expand All @@ -232,12 +299,12 @@ where
/// Zcash peer.
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the `Client` result over `tx`.
#[instrument(skip(tx, handshaker))]
/// the [`peer::Client`] result over `peerset_tx`.
#[instrument(skip(peerset_tx, handshaker))]
async fn listen<S>(
addr: SocketAddr,
mut handshaker: S,
tx: mpsc::Sender<PeerChange>,
peerset_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
Expand Down Expand Up @@ -271,11 +338,11 @@ where
// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call((tcp_stream, connected_addr));
// ... instead, spawn a new task to handle this connection
let mut tx2 = tx.clone();
let mut peerset_tx2 = peerset_tx.clone();
tokio::spawn(
async move {
if let Ok(client) = handshake.await {
let _ = tx2.send(Ok(Change::Insert(addr, client))).await;
let _ = peerset_tx2.send(Ok(Change::Insert(addr, client))).await;
}
}
.instrument(handshaker_span),
Expand All @@ -285,7 +352,6 @@ where
}

/// An action that the peer crawler can take.
#[allow(dead_code)]
enum CrawlerAction {
/// Drop the demand signal because there are too many pending handshakes.
DemandDrop,
Expand All @@ -298,15 +364,18 @@ enum CrawlerAction {
TimerCrawl { tick: Instant },
/// Handle a successfully connected handshake `peer_set_change`.
HandshakeConnected {
peer_set_change: Change<SocketAddr, Client>,
peer_set_change: Change<SocketAddr, peer::Client>,
},
/// Handle a handshake failure to `failed_addr`.
HandshakeFailed { failed_addr: MetaAddr },
HandshakeFailed {
failed_addr: MetaAddr,
error: BoxError,
},
}

/// Given a channel `demand_rx` that signals a need for new peers, try to find
/// and connect to new peers, and send the resulting `peer::Client`s through the
/// `success_tx` channel.
/// and connect to new peers, and send the resulting [`peer::Client`]s through the
/// `peerset_tx` channel.
///
/// Crawl for new peers every `crawl_new_peer_interval`, and whenever there is
/// demand, but no new peers in `candidates`. After crawling, try to connect to
Expand All @@ -315,17 +384,17 @@ enum CrawlerAction {
/// If a handshake fails, restore the unused demand signal by sending it to
/// `demand_tx`.
///
/// The crawler terminates when `candidates.update()` or `success_tx` returns a
/// The crawler terminates when [`CandidateSet.update`] or `peerset_tx` returns a
/// permanent internal error. Transient errors and individual peer errors should
/// be handled within the crawler.
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, success_tx))]
#[instrument(skip(demand_tx, demand_rx, candidates, outbound_connector, peerset_tx))]
async fn crawl_and_dial<C, S>(
crawl_new_peer_interval: std::time::Duration,
mut demand_tx: mpsc::Sender<()>,
mut demand_rx: mpsc::Receiver<()>,
mut candidates: CandidateSet<S>,
outbound_connector: C,
mut success_tx: mpsc::Sender<PeerChange>,
mut peerset_tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
Expand All @@ -346,6 +415,8 @@ where
// - use the `select!` macro for all actions, because the `select` function
// is biased towards the first ready future

info!("starting the peer crawler");

let mut handshakes = FuturesUnordered::new();
// <FuturesUnordered as Stream> returns None when empty.
// Keeping an unresolved future in the pool means the stream
Expand Down Expand Up @@ -430,16 +501,16 @@ where
}
HandshakeConnected { peer_set_change } => {
if let Change::Insert(ref addr, _) = peer_set_change {
debug!(candidate.addr = ?addr, "successfully dialed new peer");
debug!(?addr, "successfully dialed new peer");
} else {
unreachable!("unexpected handshake result: all changes should be Insert");
}
// successes are handled by an independent task, so they
// the peer set is handled by an independent task, so this send
// shouldn't hang
success_tx.send(Ok(peer_set_change)).await?;
peerset_tx.send(Ok(peer_set_change)).await?;
}
HandshakeFailed { failed_addr } => {
debug!(?failed_addr.addr, "marking candidate as failed");
HandshakeFailed { failed_addr, error } => {
debug!(addr = ?failed_addr.addr, ?error, "marking candidate as failed");
candidates.report_failed(&failed_addr);
// The demand signal that was taken out of the queue
// to attempt to connect to the failed candidate never
Expand All @@ -452,8 +523,8 @@ where

/// Try to connect to `candidate` using `outbound_connector`.
///
/// Returns a `HandshakeConnected` action on success, and a
/// `HandshakeFailed` action on error.
/// Returns a [`HandshakeConnected`] action on success, and a
/// [`HandshakeFailed`] action on error.
#[instrument(skip(outbound_connector,))]
async fn dial<C>(candidate: MetaAddr, mut outbound_connector: C) -> CrawlerAction
where
Expand Down Expand Up @@ -485,15 +556,19 @@ where
.await
}

impl From<Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>> for CrawlerAction {
fn from(dial_result: Result<Change<SocketAddr, Client>, (MetaAddr, BoxError)>) -> Self {
/// Convert from a connector result to a Crawler action
impl From<Result<Change<SocketAddr, peer::Client>, (MetaAddr, BoxError)>> for CrawlerAction {
fn from(
connector_result: Result<Change<SocketAddr, peer::Client>, (MetaAddr, BoxError)>,
) -> Self {
use CrawlerAction::*;
match dial_result {
match connector_result {
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
Err((candidate, e)) => {
debug!(?candidate.addr, ?e, "failed to connect to candidate");
debug!(addr = ?candidate.addr, ?e, "failed to connect to candidate");
HandshakeFailed {
failed_addr: candidate,
error: e,
}
}
}
Expand Down

0 comments on commit 344d118

Please sign in to comment.