Skip to content

Commit

Permalink
When there are no peers, wake on newly ready peers, or new peers, in …
Browse files Browse the repository at this point in the history
…that order
  • Loading branch information
teor2345 committed Oct 27, 2023
1 parent 378703c commit 327e4a7
Showing 1 changed file with 47 additions and 17 deletions.
64 changes: 47 additions & 17 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ use std::{
collections::{HashMap, HashSet},
convert,
fmt::Debug,
future::Future,
marker::PhantomData,
net::IpAddr,
pin::Pin,
Expand All @@ -107,7 +106,7 @@ use std::{

use futures::{
channel::{mpsc, oneshot},
future::{FutureExt, TryFutureExt},
future::{Future, FutureExt, TryFutureExt},
prelude::*,
stream::FuturesUnordered,
task::noop_waker,
Expand Down Expand Up @@ -329,7 +328,7 @@ where
/// Check background task handles to make sure they're still running.
///
/// If any background task exits, shuts down all other background tasks,
/// and returns an error. Otherwise, returns `Poll::Pending`, and registers a wakeup for
/// and returns an error. Otherwise, returns `Pending`, and registers a wakeup for
/// receiving the background tasks, or the background tasks exiting.
fn poll_background_errors(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
futures::ready!(self.receive_tasks_if_needed(cx))?;
Expand Down Expand Up @@ -426,20 +425,23 @@ where
/// Move newly ready services to the ready list if they are for peers with supported protocol
/// versions, otherwise they are dropped. Also drop failed services.
///
/// Never returns an error. If there are no unready tasks, returns `Ok(())`
/// Never returns an error. If there are no unready peers, returns `Ok` immediately.
/// Otherwise, returns `Poll::Pending`, and registers a wakeup for the next task that becomes
/// ready.
fn poll_unready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
if self.unready_services.is_empty() {
// Don't replace the previous waker, because we have no peers to wake the task.
return Poll::Ready(Ok(()));
}

loop {
// Return Pending if we've finished processing the unready service changes,
// but there are still some unready services.
match futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx)) {
// There are no unready services, so the task won't be woken by this waker until
// some are added by code woken by other wakers.
None => return Poll::Ready(Ok(())),

match futures::ready!(Pin::new(&mut self.unready_services).poll_next(cx))
.expect("already checked for an empty list")
{
// Unready -> Ready
Some(Ok((key, svc))) => {
Ok((key, svc)) => {
trace!(?key, "service became ready");
let cancel = self.cancel_handles.remove(&key);
assert!(cancel.is_some(), "missing cancel handle");
Expand All @@ -450,7 +452,7 @@ where
}

// Unready -> Canceled
Some(Err((key, UnreadyError::Canceled))) => {
Err((key, UnreadyError::Canceled)) => {
// A service be canceled because we've connected to the same service twice.
// In that case, there is a cancel handle for the peer address,
// but it belongs to the service for the newer connection.
Expand All @@ -460,7 +462,7 @@ where
"service was canceled, dropping service"
);
}
Some(Err((key, UnreadyError::CancelHandleDropped(_)))) => {
Err((key, UnreadyError::CancelHandleDropped(_))) => {
// Similarly, services with dropped cancel handes can have duplicates.
trace!(
?key,
Expand All @@ -470,7 +472,7 @@ where
}

// Unready -> Errored
Some(Err((key, UnreadyError::Inner(error)))) => {
Err((key, UnreadyError::Inner(error))) => {
debug!(%error, "service failed while unready, dropping service");

let cancel = self.cancel_handles.remove(&key);
Expand Down Expand Up @@ -996,10 +998,38 @@ where

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Update service and peer statuses.
self.poll_background_errors(cx)?;
self.poll_discover(cx)?;
self.inventory_registry.poll_inventory(cx)?;
self.poll_unready(cx);
//
// # Correctness
//
// If we poll these futures in series, then only the last polled future will wake
// the task. This can cause hangs when there are no new peers, no newly ready peers,
// or no peers in the peer set at all.
//
// As a temporary fix, we try to pick the most likely waker, and skip wakers that will
// never wake.

// We can tolerate not being woken for these tasks, because we only return `Pending`
// when we have no ready peers. Delays in background error checking are acceptable,
// and we can't do anything useful with inventory until we have peers. (The oldest
// inventory will get automatically dropped.)
let _ = self.poll_background_errors(cx)?;
let _ = self.inventory_registry.poll_inventory(cx)?;

// We must be woken if there are new peers, but new peers can be infrequent if our
// connection slots are full, or we're connected to all available/useful peers.
//
// TODO: wake waiting tasks if there are new peers or unready peers become ready (#7858)
let _ = self.poll_discover(cx)?;
// This waker skips itself if it is empty and will never wake.
// Otherwise, each connected peer should become ready or timeout within a few minutes.
//
// TODO: drop peers that overload us with inbound messages and never become ready (#7822)
let _ = self.poll_unready(cx)?;

// Correctness: the waker in the context is waiting on peers. If it is replaced later in
// the function, the network could hang. We drop the reference here so it can't be used.
#[allow(dropping_references)]
std::mem::drop(cx);

// Cleanup and metrics
self.disconnect_from_outdated_peers();
Expand Down

0 comments on commit 327e4a7

Please sign in to comment.