Skip to content

Commit

Permalink
Merge branch 'main' into prepared-groth-keys
Browse files Browse the repository at this point in the history
  • Loading branch information
dconnolly authored Nov 23, 2021
2 parents ae47bce + b39f4ca commit 9d1853e
Showing 1 changed file with 142 additions and 27 deletions.
169 changes: 142 additions & 27 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,37 +108,80 @@ pub struct CancelClientWork;
/// Otherwise, malicious peers could interfere with other peers' `PeerSet` state.
pub struct PeerSet<D>
where
D: Discover<Key = SocketAddr>,
D: Discover<Key = SocketAddr> + Unpin,
D::Service: Service<Request, Response = Response> + Load,
D::Error: Into<BoxError>,
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static,
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
/// Provides new and deleted peer [`Change`]s to the peer set,
/// via the [`Discover`] trait implementation.
discover: D,

/// Connected peers that are ready to receive requests from Zebra,
/// or send requests to Zebra.
ready_services: IndexMap<D::Key, D::Service>,

/// A preselected index for a ready service.
/// INVARIANT: If this is `Some(i)`, `i` must be a valid index for `ready_services`.
/// This means that every change to `ready_services` must invalidate or correct it.
preselected_p2c_index: Option<usize>,
ready_services: IndexMap<D::Key, D::Service>,
cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,

/// Stores gossiped inventory from connected peers.
/// Used to route inventory requests to peers that are likely to have it.
inventory_registry: InventoryRegistry,

/// Connected peers that are handling a Zebra request,
/// or Zebra is handling one of their requests.
unready_services: FuturesUnordered<UnreadyService<D::Key, D::Service, Request>>,

/// Channels used to cancel the request that an unready service is doing.
cancel_handles: HashMap<D::Key, oneshot::Sender<CancelClientWork>>,

/// A channel that asks the peer crawler task to connect to more peers.
demand_signal: mpsc::Sender<MorePeers>,

/// Channel for passing ownership of tokio JoinHandles from PeerSet's background tasks
///
/// The join handles passed into the PeerSet are used populate the `guards` member
handle_rx: tokio::sync::oneshot::Receiver<Vec<JoinHandle<Result<(), BoxError>>>>,

/// Unordered set of handles to background tasks associated with the `PeerSet`
///
/// These guards are checked for errors as part of `poll_ready` which lets
/// the `PeerSet` propagate errors from background tasks back to the user
guards: futures::stream::FuturesUnordered<JoinHandle<Result<(), BoxError>>>,
inventory_registry: InventoryRegistry,
/// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>,

/// A shared list of peer addresses.
///
/// Used for logging diagnostics.
address_book: Arc<std::sync::Mutex<AddressBook>>,

/// The last time we logged a message about the peer set size
last_peer_log: Option<Instant>,

/// The configured limit for inbound and outbound connections.
///
/// The peer set panics if this size is exceeded.
/// If that happens, our connection limit code has a bug.
peerset_total_connection_limit: usize,
}

impl<D> Drop for PeerSet<D>
where
D: Discover<Key = SocketAddr> + Unpin,
D::Service: Service<Request, Response = Response> + Load,
D::Error: Into<BoxError>,
<D::Service as Service<Request>>::Error: Into<BoxError> + 'static,
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
fn drop(&mut self) {
self.shut_down_tasks_and_channels()
}
}

impl<D> PeerSet<D>
where
D: Discover<Key = SocketAddr> + Unpin,
Expand Down Expand Up @@ -169,15 +212,22 @@ where
address_book: Arc<std::sync::Mutex<AddressBook>>,
) -> Self {
Self {
// Ready peers
discover,
preselected_p2c_index: None,
ready_services: IndexMap::new(),
cancel_handles: HashMap::new(),
preselected_p2c_index: None,
inventory_registry: InventoryRegistry::new(inv_stream),

// Unready peers
unready_services: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
demand_signal,
guards: futures::stream::FuturesUnordered::new(),

// Background tasks
handle_rx,
inventory_registry: InventoryRegistry::new(inv_stream),
guards: futures::stream::FuturesUnordered::new(),

// Metrics
last_peer_log: None,
address_book,
peerset_total_connection_limit: config.peerset_total_connection_limit(),
Expand All @@ -189,42 +239,107 @@ where
/// If any background task exits, shuts down all other background tasks,
/// and returns an error.
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
Ok(handles) => {
for handle in handles {
self.guards.push(handle);
}
}
Err(TryRecvError::Closed) => unreachable!(
"try_recv will never be called if the futures have already been received"
),
Err(TryRecvError::Empty) => return Ok(()),
}
if let Some(result) = self.receive_tasks_if_needed() {
return result;
}

let exit_error = match Pin::new(&mut self.guards).poll_next(cx) {
Poll::Pending => return Ok(()),
match Pin::new(&mut self.guards).poll_next(cx) {
// All background tasks are still running.
Poll::Pending => Ok(()),

Poll::Ready(Some(res)) => {
info!(
background_tasks = %self.guards.len(),
"a peer set background task exited, shutting down other peer set tasks"
);

self.shut_down_tasks_and_channels();

// Flatten the join result and inner result,
// then turn Ok() task exits into errors.
res.map_err(Into::into)
// TODO: replace with Result::flatten when it stabilises (#70142)
.and_then(convert::identity)
.and(Err("a peer set background task exited".into()))
}
Poll::Ready(None) => Err("all peer set background tasks have exited".into()),
};

Poll::Ready(None) => {
self.shut_down_tasks_and_channels();
Err("all peer set background tasks have exited".into())
}
}
}

/// Receive background tasks, if they've been sent on the channel,
/// but not consumed yet.
///
/// Returns a result representing the current task state,
/// or `None` if the background tasks should be polled to check their state.
fn receive_tasks_if_needed(&mut self) -> Option<Result<(), BoxError>> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
// The tasks haven't been sent yet.
Err(TryRecvError::Empty) => Some(Ok(())),

// The tasks have been sent, but not consumed.
Ok(handles) => {
// Currently, the peer set treats an empty background task set as an error.
//
// TODO: refactor `handle_rx` and `guards` into an enum
// for the background task state: Waiting/Running/Shutdown.
assert!(
!handles.is_empty(),
"the peer set requires at least one background task"
);

for handle in handles {
self.guards.push(handle);
}

None
}

// The tasks have been sent and consumed, but then they exited.
//
// Correctness: the peer set must receive at least one task.
//
// TODO: refactor `handle_rx` and `guards` into an enum
// for the background task state: Waiting/Running/Shutdown.
Err(TryRecvError::Closed) => {
Some(Err("all peer set background tasks have exited".into()))
}
}
} else {
None
}
}

/// Shut down:
/// - services by dropping the service lists
/// - background tasks via their join handles or cancel handles
/// - channels by closing the channel
fn shut_down_tasks_and_channels(&mut self) {
// Drop services and cancel their background tasks.
self.preselected_p2c_index = None;
self.ready_services = IndexMap::new();

for (_peer_key, handle) in self.cancel_handles.drain() {
let _ = handle.send(CancelClientWork);
}
self.unready_services = FuturesUnordered::new();

// Close the MorePeers channel for all senders,
// so we don't add more peers to a shut down peer set.
self.demand_signal.close_channel();

// Shut down background tasks.
self.handle_rx.close();
self.receive_tasks_if_needed();
for guard in self.guards.iter() {
guard.abort();
}

exit_error
// TODO: implement graceful shutdown for InventoryRegistry (#1678)
}

fn poll_unready(&mut self, cx: &mut Context<'_>) {
Expand Down

0 comments on commit 9d1853e

Please sign in to comment.