Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock between the crawler and dialer, and other hangs #1950

Merged
merged 7 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(60 + 20 + 20 + 20);
/// connected peer.
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(60);

/// The number of GetAddr requests sent when crawling for new peers.
///
/// ## SECURITY
///
/// The fanout should be greater than 1, to ensure that Zebra's address book is
/// not dominated by a single peer.
pub const GET_ADDR_FANOUT: usize = 2;

/// Truncate timestamps in outbound address messages to this time interval.
///
/// This is intended to prevent a peer from learning exactly when we received
Expand Down
88 changes: 63 additions & 25 deletions zebra-network/src/peer_set/candidate_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::{
};

use futures::stream::{FuturesUnordered, StreamExt};
use tokio::time::{sleep, sleep_until, Sleep};
use tokio::time::{sleep, sleep_until, timeout, Sleep};
use tower::{Service, ServiceExt};

use crate::{types::MetaAddr, AddressBook, BoxError, Request, Response};
use crate::{constants, types::MetaAddr, AddressBook, BoxError, Request, Response};

/// The `CandidateSet` manages the `PeerSet`'s peer reconnection attempts.
///
Expand Down Expand Up @@ -140,6 +140,9 @@ where
///
/// ## Correctness
///
/// The crawler exits when update returns an error, so it must only return
/// errors on permanent failures.
///
/// The handshaker sets up the peer message receiver so it also sends a
/// `Responded` peer address update.
///
Expand All @@ -150,37 +153,62 @@ where
// Opportunistically crawl the network on every update call to ensure
// we're actively fetching peers. Continue independently of whether we
// actually receive any peers, but always ask the network for more.
//
// Because requests are load-balanced across existing peers, we can make
// multiple requests concurrently, which will be randomly assigned to
// existing peers, but we don't make too many because update may be
// called while the peer set is already loaded.
let mut responses = FuturesUnordered::new();
trace!("sending GetPeers requests");
// Yes this loops only once (for now), until we add fanout back.
for _ in 0..1usize {
self.peer_service.ready_and().await?;
responses.push(self.peer_service.call(Request::Peers));
for _ in 0..constants::GET_ADDR_FANOUT {
// CORRECTNESS
//
// avoid deadlocks when there are no connected peers, and:
// - we're waiting on a handshake to complete so there are peers, or
// - another task that handles or adds peers is waiting on this task to complete.
let peer_service =
match timeout(constants::REQUEST_TIMEOUT, self.peer_service.ready_and()).await {
// update must only return an error for permanent failures
Err(temporary_error) => {
info!(
?temporary_error,
"timeout waiting for the peer service to become ready"
);
return Ok(());
}
Ok(Err(permanent_error)) => Err(permanent_error)?,
Ok(Ok(peer_service)) => peer_service,
};
responses.push(peer_service.call(Request::Peers));
}
while let Some(rsp) = responses.next().await {
if let Ok(Response::Peers(rsp_addrs)) = rsp {
// Filter new addresses to ensure that gossiped addresses are actually new
let peer_set = &self.peer_set;
let new_addrs = rsp_addrs
.iter()
.filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr))
.collect::<Vec<_>>();
trace!(
?rsp_addrs,
new_addr_count = ?new_addrs.len(),
"got response to GetPeers"
);
// New addresses are deserialized in the `NeverAttempted` state
peer_set
.lock()
.unwrap()
.extend(new_addrs.into_iter().cloned());
} else {
trace!("got error in GetPeers request");
match rsp {
Ok(Response::Peers(rsp_addrs)) => {
// Filter new addresses to ensure that gossiped addresses are actually new
let peer_set = &self.peer_set;
// TODO: reduce mutex contention by moving the filtering into
// the address book itself
let new_addrs = rsp_addrs
.iter()
.filter(|meta| !peer_set.lock().unwrap().contains_addr(&meta.addr))
.collect::<Vec<_>>();
trace!(
?rsp_addrs,
new_addr_count = ?new_addrs.len(),
"got response to GetPeers"
);
// New addresses are deserialized in the `NeverAttempted` state
peer_set
.lock()
.unwrap()
.extend(new_addrs.into_iter().cloned());
}
Err(e) => {
// since we do a fanout, and new updates are triggered by
// each demand, we can ignore errors in individual responses
trace!(?e, "got error in GetPeers request");
}
Ok(_) => unreachable!("Peers requests always return Peers responses"),
}
}

Expand Down Expand Up @@ -214,6 +242,16 @@ where
let mut sleep = sleep_until(current_deadline + Self::MIN_PEER_CONNECTION_INTERVAL);
mem::swap(&mut self.next_peer_min_wait, &mut sleep);

// CORRECTNESS
//
// In this critical section, we hold the address mutex.
//
// To avoid deadlocks, the critical section:
// - must not acquire any other locks
// - must not await any futures
//
// To avoid hangs, any computation in the critical section should
// be kept to a minimum.
let reconnect = {
let mut peer_set_guard = self.peer_set.lock().unwrap();
// It's okay to early return here because we're returning None
Expand Down