Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a deadlock between the crawler and dialer, and other hangs #1950

Merged
merged 7 commits into from
Apr 7, 2021
77 changes: 47 additions & 30 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,39 +378,15 @@ where
continue;
}
DemandHandshake { candidate } => {
let mut hs_connector = connector.clone();
// spawn each handshake into an independent task, so it can make
// progress independently of the crawls
let hs_join = tokio::spawn(async move {
debug!(?candidate.addr, "attempting outbound connection in response to demand");
// the connector is always ready, so this can't hang
let hs_connector = hs_connector
.ready_and()
.await
.expect("connector never errors");
// the handshake has timeouts, so it shouldn't hang
hs_connector
.call(candidate.addr)
.map(move |res| match res {
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
Err(e) => {
debug!(?candidate.addr, ?e, "failed to connect to candidate");
HandshakeFailed {
failed_addr: candidate,
}
}
})
.await
})
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
debug!(?candidate.addr, ?e, "failed to connect to candidate");
HandshakeFailed {
failed_addr: candidate,
let hs_join =
tokio::spawn(dial(candidate, connector.clone())).map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e) => {
panic!("panic during handshaking with {:?}: {:?} ", candidate, e);
}
}
});
});
handshakes.push(Box::pin(hs_join));
}
DemandCrawl => {
Expand Down Expand Up @@ -455,3 +431,44 @@ where
}
}
}

/// Try to connect to `candidate` using `connector`.
///
/// Returns a `HandshakeConnected` action on success, and a
/// `HandshakeFailed` action on error.
#[instrument(skip(connector,))]
async fn dial<C>(candidate: MetaAddr, mut connector: C) -> CrawlerAction
where
C: Service<SocketAddr, Response = Change<SocketAddr, peer::Client>, Error = BoxError>
+ Clone
+ Send
+ 'static,
C::Future: Send + 'static,
{
use CrawlerAction::*;

// CORRECTNESS
//
// To avoid hangs, the dialer must only await:
// - functions that return immediately, or
// - functions that have a reasonable timeout

debug!(?candidate.addr, "attempting outbound connection in response to demand");

// the connector is always ready, so this can't hang
let connector = connector.ready_and().await.expect("connector never errors");

// the handshake has timeouts, so it shouldn't hang
connector
.call(candidate.addr)
.map(move |res| match res {
Ok(peer_set_change) => HandshakeConnected { peer_set_change },
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
Err(e) => {
debug!(?candidate.addr, ?e, "failed to connect to candidate");
HandshakeFailed {
failed_addr: candidate,
}
}
})
.await
}