Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report RPC Errors to the application on peer disconnections #5680

Merged
merged 6 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,31 @@ where
!matches!(self.state, HandlerState::Deactivated)
}

// NOTE: This function gets polled to completion upon a connection close.
fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<Option<Self::ToBehaviour>> {
// Inform the network behaviour of any failed requests

while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() {
let outbound_info = self
.outbound_substreams
.remove(&substream_id)
.expect("The value must exist for a key");
// If the state of the connection is closing, we do not need to report this case to
// the behaviour, as the connection has just closed non-gracefully
if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) {
continue;
}

// Register this request as an RPC Error
return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: outbound_info.proto,
id: outbound_info.req_id,
})));
}
Poll::Ready(None)
}

fn poll(
&mut self,
cx: &mut Context<'_>,
Expand Down
24 changes: 18 additions & 6 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,12 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
.goodbye_peer(peer_id, reason, source);
}

/// Hard (ungraceful) disconnect for testing purposes only
/// Use goodbye_peer for disconnections, do not use this function.
pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) {
let _ = self.swarm.disconnect_peer_id(peer_id);
}

/// Returns an iterator over all enr entries in the DHT.
pub fn enr_entries(&self) -> Vec<Enr> {
self.discovery().table_entries_enr()
Expand Down Expand Up @@ -1373,12 +1379,18 @@ impl<AppReqId: ReqId, E: EthSpec> Network<AppReqId, E> {
let peer_id = event.peer_id;

if !self.peer_manager().is_connected(&peer_id) {
debug!(
self.log,
"Ignoring rpc message of disconnecting peer";
event
);
return None;
// Sync expects a RPCError::Disconnected to drop associated lookups with this peer.
// Silencing this event breaks the API contract with RPC where every request ends with
// - A stream termination event, or
// - An RPCError event
if !matches!(event.event, HandlerEvent::Err(HandlerErr::Outbound { .. })) {
debug!(
self.log,
"Ignoring rpc message of disconnecting peer";
event
);
return None;
}
}

let handler_id = event.conn_id;
Expand Down
92 changes: 91 additions & 1 deletion beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod common;

use common::Protocol;
use lighthouse_network::rpc::methods::*;
use lighthouse_network::rpc::{methods::*, RPCError};
use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response};
use slog::{debug, warn, Level};
use ssz::Encode;
Expand Down Expand Up @@ -996,6 +996,96 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() {
})
}

#[test]
fn test_disconnect_triggers_rpc_error() {
// set up the logging. The level and enabled logging or not
let log_level = Level::Debug;
let enable_logging = false;

let log = common::build_log(log_level, enable_logging);
let spec = E::default_spec();

let rt = Arc::new(Runtime::new().unwrap());
// get sender/receiver
rt.block_on(async {
let (mut sender, mut receiver) = common::build_node_pair(
Arc::downgrade(&rt),
&log,
ForkName::Base,
&spec,
Protocol::Tcp,
)
.await;

// BlocksByRoot Request
let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new(
// Must have at least one root for the request to create a stream
vec![Hash256::from_low_u64_be(0)],
&spec,
));

// build the sender future
let sender_future = async {
loop {
match sender.next_event().await {
NetworkEvent::PeerConnectedOutgoing(peer_id) => {
// Send a STATUS message
debug!(log, "Sending RPC");
sender.send_request(peer_id, 42, rpc_request.clone());
}
NetworkEvent::RPCFailed { error, id: 42, .. } => match error {
RPCError::Disconnected => return,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test should make sure we only get to this branch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this branch is the only way to break out of the loop, not hitting this branch will timeout the test. I considered adding something explicit but it feels redundant

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right.

other => panic!("received unexpected error {:?}", other),
},
other => {
warn!(log, "Ignoring other event {:?}", other);
}
}
}
};

// determine messages to send (PeerId, RequestId). If some, indicates we still need to send
// messages
let mut sending_peer = None;
let receiver_future = async {
loop {
// this future either drives the sending/receiving or times out allowing messages to be
// sent in the timeout
match futures::future::select(
Box::pin(receiver.next_event()),
Box::pin(tokio::time::sleep(Duration::from_secs(1))),
)
.await
{
futures::future::Either::Left((ev, _)) => match ev {
NetworkEvent::RequestReceived { peer_id, .. } => {
sending_peer = Some(peer_id);
}
other => {
warn!(log, "Ignoring other event {:?}", other);
}
},
futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required
}

// if we need to send messages send them here. This will happen after a delay
if let Some(peer_id) = sending_peer.take() {
warn!(log, "Receiver got request, disconnecting peer");
receiver.__hard_disconnect_testing_only(peer_id);
}
}
};

tokio::select! {
_ = sender_future => {}
_ = receiver_future => {}
_ = sleep(Duration::from_secs(30)) => {
panic!("Future timed out");
}
}
})
}

/// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC
/// Goodbye message.
fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {
Expand Down
38 changes: 2 additions & 36 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,49 +307,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// A peer has disconnected.
/// If the peer has active batches, those are considered failed and re-requested.
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn peer_disconnected(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> {
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
if matches!(
self.state(),
BackFillState::Failed | BackFillState::NotRequired
) {
return Ok(());
}

if let Some(batch_ids) = self.active_requests.remove(peer_id) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great simplification. The repeated logic was a source of many seen/unseen bugs earlier. Kudos for having the big picture and spotting that this can be removed 🙌

My only concern with this is that in inject_error, we call batch.download_failed(true) instead of false which we shouldn't be doing for disconnections maybe? Repeated peer disconnections for the same batch might end up marking the entire chain as invalid and redoing a bunch of stuff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Should also be noted that these disconnects are ungraceful.

i.e When lighthouse disconnects from a peer, it will wait to try and fulfill all its requests. It wont just drop the connection. In fact, a stream timeout will occur before a disconnection in a graceful disconnect.

The error peer disconnect should only happen when a peer drops the connection without fulfilling a request (which lighthouse doesn't do unless there is a network error).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, current stable assumes that peer disconnection == failed download. But this RPCError::Disconnect error will only fire if there is an active outgoing request that gets terminated ungracefully.

Repeated peer disconnections for the same batch might end up marking the entire chain as invalid and redoing a bunch of stuff.

Should only apply if:

  • we initiate request to peer A
  • peer A disconnects ungracefully before completing request
  • we initiate a retry request to peer B
  • peer B disconnects ungracefully before completing request

This should not happen frequently

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
match batch.download_failed(false) {
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
}
Ok(BatchOperationOutcome::Continue) => {}
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
}
}
// If we have run out of peers in which to retry this batch, the backfill state
// transitions to a paused state.
// We still need to reset the state for all the affected batches, so we should not
// short circuit early
if self.retry_batch_download(network, id).is_err() {
debug!(
self.log,
"Batch could not be retried";
"batch_id" => id,
"error" => "no synced peers"
);
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
self.active_requests.remove(peer_id);

// Remove the peer from the participation list
self.participating_peers.remove(peer_id);
Expand Down
15 changes: 6 additions & 9 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,16 +382,13 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
/* Error responses */

pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
/* Check disconnection for single lookups */
self.single_block_lookups.retain(|_, req| {
let should_drop_lookup =
req.should_drop_lookup_on_disconnected_peer(peer_id );

if should_drop_lookup {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?req.block_root());
self.single_block_lookups.retain(|_, lookup| {
if lookup.remove_peer(peer_id) {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => ?lookup.block_root());
false
} else {
true
}

!should_drop_lookup
});
}

Expand Down
26 changes: 9 additions & 17 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,11 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
&& self.blob_request_state.state.is_processed()
}

/// Checks both the block and blob request states to see if the peer is disconnected.
///
/// Returns true if the lookup should be dropped.
pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id);
self.blob_request_state.state.remove_peer(peer_id);

if self.all_available_peers().count() == 0 {
return true;
}

// Note: if the peer disconnected happens to have an on-going request associated with this
// lookup we will receive an RPCError and the lookup will fail. No need to manually retry
// now.
false
/// Remove peer from available peers. Return true if there are no more available peers and all
/// requests are not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id)
&& self.blob_request_state.state.remove_peer(peer_id)
}
}

Expand Down Expand Up @@ -465,9 +455,11 @@ impl<T: Clone> SingleLookupRequestState<T> {
self.available_peers.insert(*peer_id)
}

/// If a peer disconnects, this request could be failed. If so, an error is returned
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) {
/// Remove peer from available peers. Return true if there are no more available peers and the
/// request is not expecting any future event (AwaitingDownload).
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) -> bool {
self.available_peers.remove(disconnected_peer_id);
self.available_peers.is_empty() && self.is_awaiting_download()
}

pub fn get_used_peers(&self) -> impl Iterator<Item = &PeerId> {
Expand Down
21 changes: 19 additions & 2 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,25 @@ impl TestRig {
})
}

fn peer_disconnected(&mut self, peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(peer_id));
fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));

// Return RPCErrors for all active requests of peer
self.drain_network_rx();
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Sync(id),
..
} if *peer_id == disconnected_peer_id => Some(*id),
_ => None,
}) {
self.send_sync_message(SyncMessage::RpcError {
peer_id: disconnected_peer_id,
request_id,
error: RPCError::Disconnected,
});
}
}

fn drain_network_rx(&mut self) {
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.range_sync.peer_disconnect(&mut self.network, peer_id);
self.block_lookups.peer_disconnected(peer_id);
// Regardless of the outcome, we update the sync status.
let _ = self
.backfill_sync
.peer_disconnected(peer_id, &mut self.network);
let _ = self.backfill_sync.peer_disconnected(peer_id);
self.update_sync_state();
}

Expand Down
26 changes: 2 additions & 24 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,30 +174,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {

/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> ProcessingResult {
if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
if let BatchOperationOutcome::Failed { blacklist } =
batch.download_failed(true)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment as above regarding marking the batch as failed/not failed.

In forward sync, this might mean potentially not retrying a valid chain because the peers on the good chain are disconnecting.

{
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: id,
});
}
self.retry_batch_download(network, id)?;
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
self.peers.remove(peer_id);

if self.peers.is_empty() {
Err(RemoveChain::EmptyPeerPool)
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/network/src/sync/range_sync/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,8 @@ where
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
/// retries. In this case, we need to remove the chain.
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
for (removed_chain, sync_type, remove_reason) in self
.chains
.call_all(|chain| chain.remove_peer(peer_id, network))
for (removed_chain, sync_type, remove_reason) in
self.chains.call_all(|chain| chain.remove_peer(peer_id))
{
self.on_chain_removed(
removed_chain,
Expand Down