From f5dc1a3eb1cea408f8e061802f0ba8c95f9a3ff3 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 1 May 2024 17:14:50 +0900 Subject: [PATCH] Expect RPCError::Disconnect to fail ongoing requests --- .../network/src/sync/backfill_sync/mod.rs | 38 +------------------ .../network/src/sync/block_lookups/mod.rs | 13 ++----- .../sync/block_lookups/single_block_lookup.rs | 15 +------- .../network/src/sync/block_lookups/tests.rs | 21 +++++++++- beacon_node/network/src/sync/manager.rs | 4 +- .../network/src/sync/range_sync/chain.rs | 26 +------------ .../network/src/sync/range_sync/range.rs | 5 +-- 7 files changed, 31 insertions(+), 91 deletions(-) diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 9075bb15f08..4be92d59a4b 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -307,11 +307,7 @@ impl BackFillSync { /// A peer has disconnected. /// If the peer has active batches, those are considered failed and re-requested. #[must_use = "A failure here indicates the backfill sync has failed and the global sync state should be updated"] - pub fn peer_disconnected( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> Result<(), BackFillError> { + pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Result<(), BackFillError> { if matches!( self.state(), BackFillState::Failed | BackFillState::NotRequired @@ -319,37 +315,7 @@ impl BackFillSync { return Ok(()); } - if let Some(batch_ids) = self.active_requests.remove(peer_id) { - // fail the batches - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - match batch.download_failed(false) { - Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { - self.fail_sync(BackFillError::BatchDownloadFailed(id))?; - } - Ok(BatchOperationOutcome::Continue) => {} - Err(e) => { - self.fail_sync(BackFillError::BatchInvalidState(id, e.0))?; - } - } - // If we have run out of peers in which to retry this batch, the backfill state - // transitions to a paused state. - // We still need to reset the state for all the affected batches, so we should not - // short circuit early - if self.retry_batch_download(network, id).is_err() { - debug!( - self.log, - "Batch could not be retried"; - "batch_id" => id, - "error" => "no synced peers" - ); - } - } else { - debug!(self.log, "Batch not found while removing peer"; - "peer" => %peer_id, "batch" => id) - } - } - } + self.active_requests.remove(peer_id); // Remove the peer from the participation list self.participating_peers.remove(peer_id); diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a0c7c33bb0f..5faaeeea9b1 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -368,16 +368,9 @@ impl BlockLookups { pub fn peer_disconnected(&mut self, peer_id: &PeerId) { /* Check disconnection for single lookups */ - self.single_block_lookups.retain(|_, req| { - let should_drop_lookup = - req.should_drop_lookup_on_disconnected_peer(peer_id ); - - if should_drop_lookup { - debug!(self.log, "Dropping single lookup after peer disconnection"; "block_root" => %req.block_root()); - } - - !should_drop_lookup - }); + for (_, lookup) in self.single_block_lookups.iter_mut() { + lookup.remove_peer(peer_id); + } } /* Processing responses */ diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 76deb236742..9f3bf1989b1 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -162,21 +162,10 @@ impl SingleBlockLookup { && self.blob_request_state.state.is_processed() } - /// Checks both the block and blob request states to see if the peer is disconnected. - /// - /// Returns true if the lookup should be dropped. - pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool { + /// Remove peer from all request states + pub fn remove_peer(&mut self, peer_id: &PeerId) { self.block_request_state.state.remove_peer(peer_id); self.blob_request_state.state.remove_peer(peer_id); - - if self.all_available_peers().count() == 0 { - return true; - } - - // Note: if the peer disconnected happens to have an on-going request associated with this - // lookup we will receive an RPCError and the lookup will fail. No need to manually retry - // now. - false } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index 302a0489c3b..75e0fc524f1 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -450,8 +450,25 @@ impl TestRig { }) } - fn peer_disconnected(&mut self, peer_id: PeerId) { - self.send_sync_message(SyncMessage::Disconnect(peer_id)); + fn peer_disconnected(&mut self, disconnected_peer_id: PeerId) { + self.send_sync_message(SyncMessage::Disconnect(disconnected_peer_id)); + + // Return RPCErrors for all active requests of peer + self.drain_network_rx(); + while let Ok(request_id) = self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request_id: RequestId::Sync(id), + .. + } if *peer_id == disconnected_peer_id => Some(*id), + _ => None, + }) { + self.send_sync_message(SyncMessage::RpcError { + peer_id: disconnected_peer_id, + request_id, + error: RPCError::Disconnected, + }); + } } fn drain_network_rx(&mut self) { diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 0836d97c49f..2b72e3db125 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -366,9 +366,7 @@ impl SyncManager { self.range_sync.peer_disconnect(&mut self.network, peer_id); self.block_lookups.peer_disconnected(peer_id); // Regardless of the outcome, we update the sync status. - let _ = self - .backfill_sync - .peer_disconnected(peer_id, &mut self.network); + let _ = self.backfill_sync.peer_disconnected(peer_id); self.update_sync_state(); } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index c60cdb2cc9f..9a6c99ebf6c 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -174,30 +174,8 @@ impl SyncingChain { /// Removes a peer from the chain. /// If the peer has active batches, those are considered failed and re-requested. - pub fn remove_peer( - &mut self, - peer_id: &PeerId, - network: &mut SyncNetworkContext, - ) -> ProcessingResult { - if let Some(batch_ids) = self.peers.remove(peer_id) { - // fail the batches - for id in batch_ids { - if let Some(batch) = self.batches.get_mut(&id) { - if let BatchOperationOutcome::Failed { blacklist } = - batch.download_failed(true)? - { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: id, - }); - } - self.retry_batch_download(network, id)?; - } else { - debug!(self.log, "Batch not found while removing peer"; - "peer" => %peer_id, "batch" => id) - } - } - } + pub fn remove_peer(&mut self, peer_id: &PeerId) -> ProcessingResult { + self.peers.remove(peer_id); if self.peers.is_empty() { Err(RemoveChain::EmptyPeerPool) diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index c8e82666840..fe48db35b45 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -278,9 +278,8 @@ where /// for this peer. If so we mark the batch as failed. The batch may then hit it's maximum /// retries. In this case, we need to remove the chain. fn remove_peer(&mut self, network: &mut SyncNetworkContext, peer_id: &PeerId) { - for (removed_chain, sync_type, remove_reason) in self - .chains - .call_all(|chain| chain.remove_peer(peer_id, network)) + for (removed_chain, sync_type, remove_reason) in + self.chains.call_all(|chain| chain.remove_peer(peer_id)) { self.on_chain_removed( removed_chain,