Skip to content

Commit

Permalink
Expect RPCError::Disconnect to fail ongoing requests
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 1, 2024
1 parent 888f129 commit f5dc1a3
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 91 deletions.
38 changes: 2 additions & 36 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,49 +307,15 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
/// A peer has disconnected.
/// If the peer has active batches, those are considered failed and re-requested.
#[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"]
pub fn peer_disconnected(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> Result<(), BackFillError> {
pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> {
if matches!(
self.state(),
BackFillState::Failed | BackFillState::NotRequired
) {
return Ok(());
}

if let Some(batch_ids) = self.active_requests.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
match batch.download_failed(false) {
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(id))?;
}
Ok(BatchOperationOutcome::Continue) => {}
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?;
}
}
// If we have run out of peers in which to retry this batch, the backfill state
// transitions to a paused state.
// We still need to reset the state for all the affected batches, so we should not
// short circuit early
if self.retry_batch_download(network, id).is_err() {
debug!(
self.log,
"Batch could not be retried";
"batch_id" => id,
"error" => "no synced peers"
);
}
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
self.active_requests.remove(peer_id);

// Remove the peer from the participation list
self.participating_peers.remove(peer_id);
Expand Down
13 changes: 3 additions & 10 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,16 +368,9 @@ impl<T: BeaconChainTypes> BlockLookups<T> {

pub fn peer_disconnected(&mut self, peer_id: &PeerId) {
/* Check disconnection for single lookups */
self.single_block_lookups.retain(|_, req| {
let should_drop_lookup =
req.should_drop_lookup_on_disconnected_peer(peer_id );

if should_drop_lookup {
debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root());
}

!should_drop_lookup
});
for (_, lookup) in self.single_block_lookups.iter_mut() {
lookup.remove_peer(peer_id);
}
}

/* Processing responses */
Expand Down
15 changes: 2 additions & 13 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,10 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
&& self.blob_request_state.state.is_processed()
}

/// Checks both the block and blob request states to see if the peer is disconnected.
///
/// Returns true if the lookup should be dropped.
pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool {
/// Remove peer from all request states
pub fn remove_peer(&mut self, peer_id: &PeerId) {
self.block_request_state.state.remove_peer(peer_id);
self.blob_request_state.state.remove_peer(peer_id);

if self.all_available_peers().count() == 0 {
return true;
}

// Note: if the peer disconnected happens to have an on-going request associated with this
// lookup we will receive an RPCError and the lookup will fail. No need to manually retry
// now.
false
}
}

Expand Down
21 changes: 19 additions & 2 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,25 @@ impl TestRig {
})
}

fn peer_disconnected(&mut self, peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(peer_id));
fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) {
self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id));

// Return RPCErrors for all active requests of peer
self.drain_network_rx();
while let Ok(request_id) = self.pop_received_network_event(|ev| match ev {
NetworkMessage::SendRequest {
peer_id,
request_id: RequestId::Sync(id),
..
} if *peer_id == disconnected_peer_id => Some(*id),
_ => None,
}) {
self.send_sync_message(SyncMessage::RpcError {
peer_id: disconnected_peer_id,
request_id,
error: RPCError::Disconnected,
});
}
}

fn drain_network_rx(&mut self) {
Expand Down
4 changes: 1 addition & 3 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,7 @@ impl<T: BeaconChainTypes> SyncManager<T> {
self.range_sync.peer_disconnect(&mut self.network, peer_id);
self.block_lookups.peer_disconnected(peer_id);
// Regardless of the outcome, we update the sync status.
let _ = self
.backfill_sync
.peer_disconnected(peer_id, &mut self.network);
let _ = self.backfill_sync.peer_disconnected(peer_id);
self.update_sync_state();
}

Expand Down
26 changes: 2 additions & 24 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,30 +174,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {

/// Removes a peer from the chain.
/// If the peer has active batches, those are considered failed and re-requested.
pub fn remove_peer(
&mut self,
peer_id: &PeerId,
network: &mut SyncNetworkContext<T>,
) -> ProcessingResult {
if let Some(batch_ids) = self.peers.remove(peer_id) {
// fail the batches
for id in batch_ids {
if let Some(batch) = self.batches.get_mut(&id) {
if let BatchOperationOutcome::Failed { blacklist } =
batch.download_failed(true)?
{
return Err(RemoveChain::ChainFailed {
blacklist,
failing_batch: id,
});
}
self.retry_batch_download(network, id)?;
} else {
debug!(self.log, "Batch not found while removing peer";
"peer" => %peer_id, "batch" => id)
}
}
}
pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult {
self.peers.remove(peer_id);

if self.peers.is_empty() {
Err(RemoveChain::EmptyPeerPool)
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/network/src/sync/range_sync/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,8 @@ where
/// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum
/// retries. In this case, we need to remove the chain.
fn remove_peer(&mut self, network: &mut SyncNetworkContext<T>, peer_id: &PeerId) {
for (removed_chain, sync_type, remove_reason) in self
.chains
.call_all(|chain| chain.remove_peer(peer_id, network))
for (removed_chain, sync_type, remove_reason) in
self.chains.call_all(|chain| chain.remove_peer(peer_id))
{
self.on_chain_removed(
removed_chain,
Expand Down

0 comments on commit f5dc1a3

Please sign in to comment.