Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(net): Add outer timeouts for critical network operations to avoid hangs #7869

Merged
merged 16 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,11 +1289,15 @@ where
// <https://docs.rs/tower/latest/tower/buffer/struct.Buffer.html#a-note-on-choosing-a-bound>
//
// The inbound service must be called immediately after a buffer slot is reserved.
//
// The inbound service never times out in readiness, because the load shed layer is always
// ready, and returns an error in response to the request instead.
if self.svc.ready().await.is_err() {
self.fail_with(PeerError::ServiceShutdown).await;
return;
}

// Inbound service request timeouts are handled by the timeout layer in `start::start()`.
let rsp = match self.svc.call(req.clone()).await {
Err(e) => {
if e.is::<tower::load_shed::error::Overloaded>() {
Expand Down
1 change: 1 addition & 0 deletions zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ toml = "0.8.3"
futures = "0.3.29"
rayon = "1.7.0"
tokio = { version = "1.33.0", features = ["time", "rt-multi-thread", "macros", "tracing", "signal"] }
tokio-stream = { version = "0.1.14", features = ["time"] }
upbqdn marked this conversation as resolved.
Show resolved Hide resolved
tower = { version = "0.4.13", features = ["hedge", "limit"] }
pin-project = "1.1.3"

Expand Down
58 changes: 47 additions & 11 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::Arc,
sync::{Arc, TryLockError},
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -278,7 +278,11 @@ impl Service<zn::Request> for Inbound {
}
}
Err(TryRecvError::Empty) => {
// There's no setup data yet, so keep waiting for it
// There's no setup data yet, so keep waiting for it.
//
// We could use Future::poll() to get a waker and return Poll::Pending here.
// But we want to drop excess requests during startup instead. Otherwise,
// the inbound service gets overloaded, and starts disconnecting peers.
result = Ok(());
Setup::Pending {
full_verify_concurrency_limit,
Expand Down Expand Up @@ -307,6 +311,11 @@ impl Service<zn::Request> for Inbound {
mempool,
state,
} => {
// # Correctness
//
// Clear the stream but ignore the final Pending return value.
// If we returned Pending here, and there were no waiting block downloads,
// then inbound requests would wait for the next block download, and hang forever.
while let Poll::Ready(Some(_)) = block_downloads.as_mut().poll_next(cx) {}

result = Ok(());
Expand Down Expand Up @@ -366,29 +375,56 @@ impl Service<zn::Request> for Inbound {
//
// # Correctness
//
// Briefly hold the address book threaded mutex while
// cloning the address book. Then sanitize in the future,
// after releasing the lock.
let peers = address_book.lock().unwrap().clone();
// If the address book is busy, try again inside the future. If it can't be locked
// twice, ignore the request.
let address_book = address_book.clone();

let get_peers = move || match address_book.try_lock() {
Ok(address_book) => Some(address_book.clone()),
Err(TryLockError::WouldBlock) => None,
Err(TryLockError::Poisoned(_)) => panic!("previous thread panicked while holding the address book lock"),
};

let peers = get_peers();

async move {
// Correctness: get the current time after acquiring the address book lock.
// Correctness: get the current time inside the future.
//
// This time is used to filter outdated peers, so it doesn't really matter
// This time is used to filter outdated peers, so it doesn't matter much
// if we get it when the future is created, or when it starts running.
let now = Utc::now();

// If we didn't get the peers when the future was created, wait for other tasks
// to run, then try again when the future first runs.
if peers.is_none() {
tokio::task::yield_now().await;
}
let peers = peers.or_else(get_peers);
let is_busy = peers.is_none();

// Send a sanitized response
let mut peers = peers.sanitized(now);
let mut peers = peers.map_or_else(Vec::new, |peers| peers.sanitized(now));

// Truncate the list
let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR);
let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit);
peers.truncate(address_limit);

if peers.is_empty() {
// We don't know if the peer response will be empty until we've sanitized them.
debug!("ignoring `Peers` request from remote peer because our address book is empty");
// Sometimes we don't know if the peer response will be empty until we've
// sanitized them.
if is_busy {
debug!(
"ignoring `Peers` request from remote peer because our address \
book has no available peers"
);
} else {
info!(
"ignoring `Peers` request from remote peer because our address \
book is busy"
);
}
teor2345 marked this conversation as resolved.
Show resolved Hide resolved

Ok(zn::Response::Nil)
} else {
Ok(zn::Response::Peers(peers))
Expand Down
21 changes: 16 additions & 5 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ use std::{
collections::HashSet,
future::Future,
iter,
pin::Pin,
pin::{pin, Pin},
task::{Context, Poll},
};

use futures::{future::FutureExt, stream::Stream};
use tokio::sync::broadcast;
use tokio_stream::StreamExt;
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service};

use zebra_chain::{
Expand All @@ -42,7 +43,7 @@ use zebra_node_services::mempool::{Gossip, Request, Response};
use zebra_state as zs;
use zebra_state::{ChainTipChange, TipAction};

use crate::components::sync::SyncStatus;
use crate::components::{mempool::crawler::RATE_LIMIT_DELAY, sync::SyncStatus};

pub mod config;
mod crawler;
Expand Down Expand Up @@ -580,9 +581,11 @@ impl Service<Request> for Mempool {
let best_tip_height = self.latest_chain_tip.best_tip_height();

// Clean up completed download tasks and add to mempool if successful.
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
while let Poll::Ready(Some(r)) =
pin!(tx_downloads.timeout(RATE_LIMIT_DELAY)).poll_next(cx)
{
match r {
Ok((tx, expected_tip_height)) => {
Ok(Ok((tx, expected_tip_height))) => {
// # Correctness:
//
// It's okay to use tip height here instead of the tip hash since
Expand All @@ -609,12 +612,20 @@ impl Service<Request> for Mempool {
tx_downloads.download_if_needed_and_verify(tx.transaction.into());
}
}
Err((txid, error)) => {
Ok(Err((txid, error))) => {
tracing::debug!(?txid, ?error, "mempool transaction failed to verify");

metrics::counter!("mempool.failed.verify.tasks.total", 1, "reason" => error.to_string());
storage.reject_if_needed(txid, error);
}
Err(_elapsed) => {
// A timeout happens when the stream hangs waiting for another service,
// so there is no specific transaction ID.

tracing::info!("mempool transaction failed to verify due to timeout");

metrics::counter!("mempool.failed.verify.tasks.total", 1, "reason" => "timeout");
}
};
}

Expand Down
17 changes: 14 additions & 3 deletions zebrad/src/components/mempool/crawler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@
use std::{collections::HashSet, time::Duration};

use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt};
use tokio::{sync::watch, task::JoinHandle, time::sleep};
use tokio::{
sync::watch,
task::JoinHandle,
time::{sleep, timeout},
};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
use tracing_futures::Instrument;

Expand All @@ -77,7 +81,7 @@ const FANOUT: usize = 3;
///
/// Using a prime number makes sure that mempool crawler fanouts
/// don't synchronise with other crawls.
const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);
pub const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);

/// The time to wait for a peer response.
///
Expand Down Expand Up @@ -191,7 +195,14 @@ where

loop {
self.wait_until_enabled().await?;
self.crawl_transactions().await?;
// Avoid hangs when the peer service is not ready, or due to bugs in async code.
timeout(RATE_LIMIT_DELAY, self.crawl_transactions())
.await
.unwrap_or_else(|timeout| {
// Temporary errors just get logged and ignored.
info!("mempool crawl timed out: {timeout:?}");
Ok(())
})?;
sleep(RATE_LIMIT_DELAY).await;
}
}
Expand Down
Loading
Loading