diff --git a/substrate/client/network/sync/src/chain_sync.rs b/substrate/client/network/sync/src/chain_sync.rs index 858125f93f1fd..2adc6d4234159 100644 --- a/substrate/client/network/sync/src/chain_sync.rs +++ b/substrate/client/network/sync/src/chain_sync.rs @@ -184,90 +184,26 @@ struct GapSync { target: NumberFor, } -/// Action that the parent of [`ChainSync`] should perform after reporting imported blocks with -/// [`ChainSync::on_blocks_processed`]. -pub enum BlockRequestAction { +/// Action that the parent of [`ChainSync`] should perform after reporting a network or block event. +#[derive(Debug)] +pub enum ChainSyncAction { /// Send block request to peer. Always implies dropping a stale block request to the same peer. - SendRequest { peer_id: PeerId, request: BlockRequest }, + SendBlockRequest { peer_id: PeerId, request: BlockRequest }, /// Drop stale block request. - RemoveStale { peer_id: PeerId }, -} - -/// Action that the parent of [`ChainSync`] should perform if we want to import blocks. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ImportBlocksAction { - pub origin: BlockOrigin, - pub blocks: Vec>, -} - -/// Action that the parent of [`ChainSync`] should perform if we want to import justifications. -pub struct ImportJustificationsAction { - pub peer_id: PeerId, - pub hash: B::Hash, - pub number: NumberFor, - pub justifications: Justifications, -} - -/// Result of [`ChainSync::on_block_data`]. -#[derive(Debug, Clone, PartialEq, Eq)] -enum OnBlockData { - /// The block should be imported. - Import(ImportBlocksAction), - /// A new block request needs to be made to the given peer. - Request(PeerId, BlockRequest), - /// Continue processing events. - Continue, -} - -/// Result of [`ChainSync::on_block_justification`]. -#[derive(Debug, Clone, PartialEq, Eq)] -enum OnBlockJustification { - /// The justification needs no further handling. - Nothing, - /// The justification should be imported. - Import { + CancelBlockRequest { peer_id: PeerId }, + /// Peer misbehaved. Disconnect, report it and cancel the block request to it. + DropPeer(BadPeer), + /// Import blocks. + ImportBlocks { origin: BlockOrigin, blocks: Vec> }, + /// Import justifications. + ImportJustifications { peer_id: PeerId, - hash: Block::Hash, - number: NumberFor, + hash: B::Hash, + number: NumberFor, justifications: Justifications, }, } -// Result of [`ChainSync::on_state_data`]. -#[derive(Debug)] -enum OnStateData { - /// The block and state that should be imported. - Import(BlockOrigin, IncomingBlock), - /// A new state request needs to be made to the given peer. - Continue, -} - -/// Action that the parent of [`ChainSync`] should perform after reporting block response with -/// [`ChainSync::on_block_response`]. -pub enum OnBlockResponse { - /// Nothing to do. - Nothing, - /// Perform block request. - SendBlockRequest { peer_id: PeerId, request: BlockRequest }, - /// Import blocks. - ImportBlocks(ImportBlocksAction), - /// Import justifications. - ImportJustifications(ImportJustificationsAction), - /// Invalid block response, the peer should be disconnected and reported. - DisconnectPeer(BadPeer), -} - -/// Action that the parent of [`ChainSync`] should perform after reporting state response with -/// [`ChainSync::on_state_response`]. -pub enum OnStateResponse { - /// Nothing to do. - Nothing, - /// Import blocks. - ImportBlocks(ImportBlocksAction), - /// Invalid state response, the peer should be disconnected and reported. - DisconnectPeer(BadPeer), -} - /// The main data structure which contains all the state for a chains /// active syncing strategy. pub struct ChainSync { @@ -313,6 +249,8 @@ pub struct ChainSync { import_existing: bool, /// Gap download process. gap_sync: Option>, + /// Pending actions. + actions: Vec>, } /// All the data we have about a Peer that we are trying to sync with @@ -427,6 +365,7 @@ where gap_sync: None, warp_sync_config, warp_sync_target_block_header: None, + actions: Vec::new(), }; sync.reset_sync_start_point()?; @@ -509,8 +448,17 @@ where } /// Notify syncing state machine that a new sync peer has connected. + pub fn new_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor) { + match self.new_peer_inner(peer_id, best_hash, best_number) { + Ok(Some(request)) => + self.actions.push(ChainSyncAction::SendBlockRequest { peer_id, request }), + Ok(None) => {}, + Err(bad_peer) => self.actions.push(ChainSyncAction::DropPeer(bad_peer)), + } + } + #[must_use] - pub fn new_peer( + fn new_peer_inner( &mut self, peer_id: PeerId, best_hash: B::Hash, @@ -727,7 +675,7 @@ where peer_id: &PeerId, request: Option>, response: BlockResponse, - ) -> Result, BadPeer> { + ) -> Result<(), BadPeer> { self.downloaded_blocks += response.blocks.len(); let mut gap = false; let new_blocks: Vec> = if let Some(peer) = self.peers.get_mut(peer_id) { @@ -892,10 +840,12 @@ where start: *start, state: next_state, }; - return Ok(OnBlockData::Request( - *peer_id, - ancestry_request::(next_num), - )) + let request = ancestry_request::(next_num); + self.actions.push(ChainSyncAction::SendBlockRequest { + peer_id: *peer_id, + request, + }); + return Ok(()) } else { // Ancestry search is complete. Check if peer is on a stale fork unknown // to us and add it to sync targets if necessary. @@ -929,7 +879,7 @@ where .insert(*peer_id); } peer.state = PeerSyncState::Available; - Vec::new() + return Ok(()) } }, PeerSyncState::DownloadingWarpTargetBlock => { @@ -940,8 +890,7 @@ where match warp_sync.import_target_block( blocks.pop().expect("`blocks` len checked above."), ) { - warp::TargetBlockImportResult::Success => - return Ok(OnBlockData::Continue), + warp::TargetBlockImportResult::Success => return Ok(()), warp::TargetBlockImportResult::BadResponse => return Err(BadPeer(*peer_id, rep::VERIFICATION_FAIL)), } @@ -963,7 +912,7 @@ where "Logic error: we think we are downloading warp target block from {}, but no warp sync is happening.", peer_id, ); - return Ok(OnBlockData::Continue) + return Ok(()) } }, PeerSyncState::Available | @@ -1000,7 +949,9 @@ where return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) }; - Ok(OnBlockData::Import(self.validate_and_queue_blocks(new_blocks, gap))) + self.validate_and_queue_blocks(new_blocks, gap); + + Ok(()) } /// Submit a justification response for processing. @@ -1009,7 +960,7 @@ where &mut self, peer_id: PeerId, response: BlockResponse, - ) -> Result, BadPeer> { + ) -> Result<(), BadPeer> { let peer = if let Some(peer) = self.peers.get_mut(&peer_id) { peer } else { @@ -1017,7 +968,7 @@ where target: LOG_TARGET, "💔 Called on_block_justification with a peer ID of an unknown peer", ); - return Ok(OnBlockJustification::Nothing) + return Ok(()) }; self.allowed_requests.add(&peer_id); @@ -1054,11 +1005,17 @@ where if let Some((peer_id, hash, number, justifications)) = self.extra_justifications.on_response(peer_id, justification) { - return Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) + self.actions.push(ChainSyncAction::ImportJustifications { + peer_id, + hash, + number, + justifications, + }); + return Ok(()) } } - Ok(OnBlockJustification::Nothing) + Ok(()) } /// Report a justification import (successful or not). @@ -1196,8 +1153,7 @@ where } /// Notify that a sync peer has disconnected. - #[must_use] - pub fn peer_disconnected(&mut self, peer_id: &PeerId) -> Option> { + pub fn peer_disconnected(&mut self, peer_id: &PeerId) { self.blocks.clear_peer_download(peer_id); if let Some(gap_sync) = &mut self.gap_sync { gap_sync.blocks.clear_peer_download(peer_id) @@ -1212,7 +1168,9 @@ where let blocks = self.ready_blocks(); - (!blocks.is_empty()).then(|| self.validate_and_queue_blocks(blocks, false)) + if !blocks.is_empty() { + self.validate_and_queue_blocks(blocks, false); + } } /// Get prometheus metrics. @@ -1259,11 +1217,7 @@ where } } - fn validate_and_queue_blocks( - &mut self, - mut new_blocks: Vec>, - gap: bool, - ) -> ImportBlocksAction { + fn validate_and_queue_blocks(&mut self, mut new_blocks: Vec>, gap: bool) { let orig_len = new_blocks.len(); new_blocks.retain(|b| !self.queue_blocks.contains(&b.hash)); if new_blocks.len() != orig_len { @@ -1295,7 +1249,7 @@ where } self.queue_blocks.extend(new_blocks.iter().map(|b| b.hash)); - ImportBlocksAction { origin, blocks: new_blocks } + self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: new_blocks }) } fn update_peer_common_number(&mut self, peer_id: &PeerId, new_common: NumberFor) { @@ -1346,7 +1300,7 @@ where /// Restart the sync process. This will reset all pending block requests and return an iterator /// of new block requests to make to peers. Peers that were downloading finality data (i.e. /// their state was `DownloadingJustification`) are unaffected and will stay in the same state. - fn restart(&mut self) -> impl Iterator, BadPeer>> + '_ { + fn restart(&mut self) { self.blocks.clear(); if let Err(e) = self.reset_sync_start_point() { warn!(target: LOG_TARGET, "💔 Unable to restart sync: {e}"); @@ -1360,7 +1314,7 @@ where ); let old_peers = std::mem::take(&mut self.peers); - old_peers.into_iter().filter_map(move |(peer_id, mut p)| { + old_peers.into_iter().for_each(|(peer_id, mut p)| { // peers that were downloading justifications // should be kept in that state. if let PeerSyncState::DownloadingJustification(_) = p.state { @@ -1374,19 +1328,21 @@ where ); p.common_number = self.best_queued_number; self.peers.insert(peer_id, p); - return None + return } // handle peers that were in other states. - match self.new_peer(peer_id, p.best_hash, p.best_number) { + let action = match self.new_peer_inner(peer_id, p.best_hash, p.best_number) { // since the request is not a justification, remove it from pending responses - Ok(None) => Some(Ok(BlockRequestAction::RemoveStale { peer_id })), + Ok(None) => ChainSyncAction::CancelBlockRequest { peer_id }, // update the request if the new one is available - Ok(Some(request)) => Some(Ok(BlockRequestAction::SendRequest { peer_id, request })), + Ok(Some(request)) => ChainSyncAction::SendBlockRequest { peer_id, request }, // this implies that we need to drop pending response from the peer - Err(e) => Some(Err(e)), - } - }) + Err(bad_peer) => ChainSyncAction::DropPeer(bad_peer), + }; + + self.actions.push(action); + }); } /// Find a block to start sync from. If we sync with state, that's the latest block we have @@ -1534,13 +1490,12 @@ where } /// Submit blocks received in a response. - #[must_use] pub fn on_block_response( &mut self, peer_id: PeerId, request: BlockRequest, blocks: Vec>, - ) -> OnBlockResponse { + ) { let block_response = BlockResponse:: { id: request.id, blocks }; let blocks_range = || match ( @@ -1563,41 +1518,21 @@ where blocks_range(), ); - if request.fields == BlockAttributes::JUSTIFICATION { - match self.on_block_justification(peer_id, block_response) { - Ok(OnBlockJustification::Nothing) => OnBlockResponse::Nothing, - Ok(OnBlockJustification::Import { peer_id, hash, number, justifications }) => - OnBlockResponse::ImportJustifications(ImportJustificationsAction { - peer_id, - hash, - number, - justifications, - }), - Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer), - } + let res = if request.fields == BlockAttributes::JUSTIFICATION { + self.on_block_justification(peer_id, block_response) } else { - match self.on_block_data(&peer_id, Some(request), block_response) { - Ok(OnBlockData::Import(action)) => OnBlockResponse::ImportBlocks(action), - Ok(OnBlockData::Request(peer_id, request)) => - OnBlockResponse::SendBlockRequest { peer_id, request }, - Ok(OnBlockData::Continue) => OnBlockResponse::Nothing, - Err(bad_peer) => OnBlockResponse::DisconnectPeer(bad_peer), - } + self.on_block_data(&peer_id, Some(request), block_response) + }; + + if let Err(bad_peer) = res { + self.actions.push(ChainSyncAction::DropPeer(bad_peer)); } } /// Submit a state received in a response. - #[must_use] - pub fn on_state_response( - &mut self, - peer_id: PeerId, - response: OpaqueStateResponse, - ) -> OnStateResponse { - match self.on_state_data(&peer_id, response) { - Ok(OnStateData::Import(origin, block)) => - OnStateResponse::ImportBlocks(ImportBlocksAction { origin, blocks: vec![block] }), - Ok(OnStateData::Continue) => OnStateResponse::Nothing, - Err(bad_peer) => OnStateResponse::DisconnectPeer(bad_peer), + pub fn on_state_response(&mut self, peer_id: PeerId, response: OpaqueStateResponse) { + if let Err(bad_peer) = self.on_state_data(&peer_id, response) { + self.actions.push(ChainSyncAction::DropPeer(bad_peer)); } } @@ -1833,11 +1768,12 @@ where None } + #[must_use] fn on_state_data( &mut self, peer_id: &PeerId, response: OpaqueStateResponse, - ) -> Result, BadPeer> { + ) -> Result<(), BadPeer> { let response: Box = response.0.downcast().map_err(|_error| { error!( target: LOG_TARGET, @@ -1892,9 +1828,10 @@ where state: Some(state), }; debug!(target: LOG_TARGET, "State download is complete. Import is queued"); - Ok(OnStateData::Import(origin, block)) + self.actions.push(ChainSyncAction::ImportBlocks { origin, blocks: vec![block] }); + Ok(()) }, - ImportResult::Continue => Ok(OnStateData::Continue), + ImportResult::Continue => Ok(()), ImportResult::BadResponse => { debug!(target: LOG_TARGET, "Bad state data received from {peer_id}"); Err(BadPeer(*peer_id, rep::BAD_BLOCK)) @@ -1903,12 +1840,7 @@ where } /// Submit a warp proof response received. - #[must_use] - pub fn on_warp_sync_response( - &mut self, - peer_id: &PeerId, - response: EncodedProof, - ) -> Result<(), BadPeer> { + pub fn on_warp_sync_response(&mut self, peer_id: &PeerId, response: EncodedProof) { if let Some(peer) = self.peers.get_mut(peer_id) { if let PeerSyncState::DownloadingWarpProof = peer.state { peer.state = PeerSyncState::Available; @@ -1925,14 +1857,16 @@ where sync.import_warp_proof(response) } else { debug!(target: LOG_TARGET, "Ignored obsolete warp sync response from {peer_id}"); - return Err(BadPeer(*peer_id, rep::NOT_REQUESTED)) + self.actions + .push(ChainSyncAction::DropPeer(BadPeer(*peer_id, rep::NOT_REQUESTED))); + return }; match import_result { - WarpProofImportResult::Success => Ok(()), + WarpProofImportResult::Success => {}, WarpProofImportResult::BadResponse => { debug!(target: LOG_TARGET, "Bad proof data received from {peer_id}"); - Err(BadPeer(*peer_id, rep::BAD_BLOCK)) + self.actions.push(ChainSyncAction::DropPeer(BadPeer(*peer_id, rep::BAD_BLOCK))); }, } } @@ -1942,17 +1876,14 @@ where /// Call this when a batch of blocks have been processed by the import /// queue, with or without errors. If an error is returned, the pending response /// from the peer must be dropped. - #[must_use] pub fn on_blocks_processed( &mut self, imported: usize, count: usize, results: Vec<(Result>, BlockImportError>, B::Hash)>, - ) -> Box, BadPeer>>> { + ) { trace!(target: LOG_TARGET, "Imported {imported} of {count}"); - let mut output = Vec::new(); - let mut has_error = false; for (_, hash) in &results { self.queue_blocks.remove(hash); @@ -1993,7 +1924,10 @@ where if aux.bad_justification { if let Some(ref peer) = peer_id { warn!("💔 Sent block with bad justification to import"); - output.push(Err(BadPeer(*peer, rep::BAD_JUSTIFICATION))); + self.actions.push(ChainSyncAction::DropPeer(BadPeer( + *peer, + rep::BAD_JUSTIFICATION, + ))); } } @@ -2010,7 +1944,7 @@ where ); self.state_sync = None; self.mode = SyncMode::Full; - output.extend(self.restart()); + self.restart(); } let warp_sync_complete = self .warp_sync @@ -2024,7 +1958,7 @@ where ); self.warp_sync = None; self.mode = SyncMode::Full; - output.extend(self.restart()); + self.restart(); } let gap_sync_complete = self.gap_sync.as_ref().map_or(false, |s| s.target == number); @@ -2042,8 +1976,9 @@ where target: LOG_TARGET, "💔 Peer sent block with incomplete header to import", ); - output.push(Err(BadPeer(peer, rep::INCOMPLETE_HEADER))); - output.extend(self.restart()); + self.actions + .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::INCOMPLETE_HEADER))); + self.restart(); }, Err(BlockImportError::VerificationFailed(peer_id, e)) => { let extra_message = peer_id @@ -2055,10 +1990,11 @@ where ); if let Some(peer) = peer_id { - output.push(Err(BadPeer(peer, rep::VERIFICATION_FAIL))); + self.actions + .push(ChainSyncAction::DropPeer(BadPeer(peer, rep::VERIFICATION_FAIL))); } - output.extend(self.restart()); + self.restart(); }, Err(BlockImportError::BadBlock(peer_id)) => if let Some(peer) = peer_id { @@ -2066,7 +2002,7 @@ where target: LOG_TARGET, "💔 Block {hash:?} received from peer {peer} has been blacklisted", ); - output.push(Err(BadPeer(peer, rep::BAD_BLOCK))); + self.actions.push(ChainSyncAction::DropPeer(BadPeer(peer, rep::BAD_BLOCK))); }, Err(BlockImportError::MissingState) => { // This may happen if the chain we were requesting upon has been discarded @@ -2078,14 +2014,19 @@ where warn!(target: LOG_TARGET, "💔 Error importing block {hash:?}: {}", e.unwrap_err()); self.state_sync = None; self.warp_sync = None; - output.extend(self.restart()); + self.restart(); }, Err(BlockImportError::Cancelled) => {}, }; } self.allowed_requests.set_all(); - Box::new(output.into_iter()) + } + + /// Get pending actions to perform. + #[must_use] + pub fn take_actions(&mut self) -> impl Iterator> { + std::mem::take(&mut self.actions).into_iter() } } diff --git a/substrate/client/network/sync/src/chain_sync/test.rs b/substrate/client/network/sync/src/chain_sync/test.rs index 2eefd2ad13ef8..15b2a95a07c87 100644 --- a/substrate/client/network/sync/src/chain_sync/test.rs +++ b/substrate/client/network/sync/src/chain_sync/test.rs @@ -53,7 +53,7 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { }; // add a new peer with the same best block - sync.new_peer(peer_id, a1_hash, a1_number).unwrap(); + sync.new_peer(peer_id, a1_hash, a1_number); // and request a justification for the block sync.request_justification(&a1_hash, a1_number); @@ -74,10 +74,8 @@ fn processes_empty_response_on_justification_request_for_unknown_block() { // if the peer replies with an empty response (i.e. it doesn't know the block), // the active request should be cleared. - assert_eq!( - sync.on_block_justification(peer_id, BlockResponse:: { id: 0, blocks: vec![] }), - Ok(OnBlockJustification::Nothing), - ); + sync.on_block_justification(peer_id, BlockResponse:: { id: 0, blocks: vec![] }) + .unwrap(); // there should be no in-flight requests assert_eq!(sync.extra_justifications.active_requests().count(), 0); @@ -119,8 +117,8 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { let (b1_hash, b1_number) = new_blocks(50); // add 2 peers at blocks that we don't have locally - sync.new_peer(peer_id1, Hash::random(), 42).unwrap(); - sync.new_peer(peer_id2, Hash::random(), 10).unwrap(); + sync.new_peer(peer_id1, Hash::random(), 42); + sync.new_peer(peer_id2, Hash::random(), 10); // we wil send block requests to these peers // for these blocks we don't know about @@ -130,7 +128,7 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { .all(|(p, _)| { p == peer_id1 || p == peer_id2 })); // add a new peer at a known block - sync.new_peer(peer_id3, b1_hash, b1_number).unwrap(); + sync.new_peer(peer_id3, b1_hash, b1_number); // we request a justification for a block we have locally sync.request_justification(&b1_hash, b1_number); @@ -148,14 +146,19 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { PeerSyncState::DownloadingJustification(b1_hash), ); + // clear old actions + let _ = sync.take_actions(); + // we restart the sync state - let block_requests = sync.restart(); + sync.restart(); + let actions = sync.take_actions().collect::>(); // which should make us send out block requests to the first two peers - assert!(block_requests.map(|r| r.unwrap()).all(|event| match event { - BlockRequestAction::SendRequest { peer_id, .. } => - peer_id == peer_id1 || peer_id == peer_id2, - BlockRequestAction::RemoveStale { .. } => false, + assert_eq!(actions.len(), 2); + assert!(actions.iter().all(|action| match action { + ChainSyncAction::SendBlockRequest { peer_id, .. } => + peer_id == &peer_id1 || peer_id == &peer_id2, + _ => false, })); // peer 3 should be unaffected it was downloading finality data @@ -166,7 +169,7 @@ fn restart_doesnt_affect_peers_downloading_finality_data() { // Set common block to something that we don't have (e.g. failed import) sync.peers.get_mut(&peer_id3).unwrap().common_number = 100; - let _ = sync.restart().count(); + sync.restart(); assert_eq!(sync.peers.get(&peer_id3).unwrap().common_number, 50); } @@ -280,9 +283,8 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { let best_block = blocks.last().unwrap().clone(); let max_blocks_to_request = sync.max_blocks_per_request; // Connect the node we will sync from - sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()) - .unwrap(); - sync.new_peer(peer_id2, info.best_hash, 0).unwrap(); + sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()); + sync.new_peer(peer_id2, info.best_hash, 0); let mut best_block_num = 0; while best_block_num < MAX_DOWNLOAD_AHEAD { @@ -300,11 +302,17 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + // Clear old actions to not deal with them + let _ = sync.take_actions(); + + sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == max_blocks_to_request as usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize, + )); best_block_num += max_blocks_to_request as u32; @@ -356,11 +364,14 @@ fn do_ancestor_search_when_common_block_to_best_qeued_gap_is_to_big() { assert_eq!(FromBlock::Number(best_block_num as u64), peer2_req.from); let response = create_block_response(vec![blocks[(best_block_num - 1) as usize].clone()]); - let res = sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap(); - assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty() - ),); + + // Clear old actions to not deal with them + let _ = sync.take_actions(); + + sync.on_block_data(&peer_id2, Some(peer2_req), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + assert!(actions.is_empty()); let peer1_from = unwrap_from_block_number(peer1_req.unwrap().from); @@ -421,25 +432,34 @@ fn can_sync_huge_fork() { let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone(); // Connect the node we will sync from - sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()); send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); let mut request = get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); + // Discard old actions we are not interested in + let _ = sync.take_actions(); + // Do the ancestor search loop { let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; let response = create_block_response(vec![block.clone()]); - let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); - request = if let OnBlockData::Request(_peer, request) = on_block_data { - request - } else { + sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + + request = if actions.is_empty() { // We found the ancenstor break + } else { + assert_eq!(actions.len(), 1); + match &actions[0] { + ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + action @ _ => panic!("Unexpected action: {action:?}"), + } }; log::trace!(target: LOG_TARGET, "Request: {request:?}"); @@ -463,15 +483,18 @@ fn can_sync_huge_fork() { let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == sync.max_blocks_per_request as usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == sync.max_blocks_per_request as usize + )); best_block_num += sync.max_blocks_per_request as u32; - let _ = sync.on_blocks_processed( + sync.on_blocks_processed( max_blocks_to_request as usize, max_blocks_to_request as usize, resp_blocks @@ -490,6 +513,9 @@ fn can_sync_huge_fork() { .collect(), ); + // Discard pending actions + let _ = sync.take_actions(); + resp_blocks .into_iter() .rev() @@ -539,25 +565,34 @@ fn syncs_fork_without_duplicate_requests() { let common_block = blocks[MAX_BLOCKS_TO_LOOK_BACKWARDS as usize / 2].clone(); // Connect the node we will sync from - sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()); send_block_announce(fork_blocks.last().unwrap().header().clone(), peer_id1, &mut sync); let mut request = get_block_request(&mut sync, FromBlock::Number(info.best_number), 1, &peer_id1); + // Discard pending actions + let _ = sync.take_actions(); + // Do the ancestor search loop { let block = &fork_blocks[unwrap_from_block_number(request.from.clone()) as usize - 1]; let response = create_block_response(vec![block.clone()]); - let on_block_data = sync.on_block_data(&peer_id1, Some(request), response).unwrap(); - request = if let OnBlockData::Request(_peer, request) = on_block_data { - request - } else { + sync.on_block_data(&peer_id1, Some(request), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + + request = if actions.is_empty() { // We found the ancenstor break + } else { + assert_eq!(actions.len(), 1); + match &actions[0] { + ChainSyncAction::SendBlockRequest { peer_id: _, request } => request.clone(), + action @ _ => panic!("Unexpected action: {action:?}"), + } }; log::trace!(target: LOG_TARGET, "Request: {request:?}"); @@ -582,11 +617,17 @@ fn syncs_fork_without_duplicate_requests() { let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap(); + // Discard old actions + let _ = sync.take_actions(); + + sync.on_block_data(&peer_id1, Some(request.clone()), response).unwrap(); + + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == max_blocks_to_request as usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == max_blocks_to_request as usize + )); best_block_num += max_blocks_to_request as u32; @@ -653,8 +694,7 @@ fn removes_target_fork_on_disconnect() { let peer_id1 = PeerId::random(); let common_block = blocks[1].clone(); // Connect the node we will sync from - sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()); // Create a "new" header and announce it let mut header = blocks[0].header().clone(); @@ -678,8 +718,7 @@ fn can_import_response_with_missing_blocks() { let peer_id1 = PeerId::random(); let best_block = blocks[3].clone(); - sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, best_block.hash(), *best_block.header().number()); sync.peers.get_mut(&peer_id1).unwrap().state = PeerSyncState::Available; sync.peers.get_mut(&peer_id1).unwrap().common_number = 0; @@ -730,7 +769,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { let (b1_hash, b1_number) = new_blocks(50); // add new peer and request blocks from them - sync.new_peer(peers[0], Hash::random(), 42).unwrap(); + sync.new_peer(peers[0], Hash::random(), 42); // we don't actually perform any requests, just keep track of peers waiting for a response let mut pending_responses = HashSet::new(); @@ -743,7 +782,7 @@ fn sync_restart_removes_block_but_not_justification_requests() { } // add a new peer at a known block - sync.new_peer(peers[1], b1_hash, b1_number).unwrap(); + sync.new_peer(peers[1], b1_hash, b1_number); // we request a justification for a block we have locally sync.request_justification(&b1_hash, b1_number); @@ -766,24 +805,29 @@ fn sync_restart_removes_block_but_not_justification_requests() { ); assert_eq!(pending_responses.len(), 2); + // discard old actions + let _ = sync.take_actions(); + // restart sync - let request_events = sync.restart().collect::>(); - for event in request_events.iter() { - match event.as_ref().unwrap() { - BlockRequestAction::RemoveStale { peer_id } => { + sync.restart(); + let actions = sync.take_actions().collect::>(); + for action in actions.iter() { + match action { + ChainSyncAction::CancelBlockRequest { peer_id } => { pending_responses.remove(&peer_id); }, - BlockRequestAction::SendRequest { peer_id, .. } => { + ChainSyncAction::SendBlockRequest { peer_id, .. } => { // we drop obsolete response, but don't register a new request, it's checked in // the `assert!` below pending_responses.remove(&peer_id); }, + action @ _ => panic!("Unexpected action: {action:?}"), } } - assert!(request_events.iter().any(|event| { - match event.as_ref().unwrap() { - BlockRequestAction::RemoveStale { .. } => false, - BlockRequestAction::SendRequest { peer_id, .. } => peer_id == &peers[0], + assert!(actions.iter().any(|action| { + match action { + ChainSyncAction::SendBlockRequest { peer_id, .. } => peer_id == &peers[0], + _ => false, } })); @@ -848,11 +892,9 @@ fn request_across_forks() { // Add the peers, all at the common ancestor 100. let common_block = blocks.last().unwrap(); let peer_id1 = PeerId::random(); - sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id1, common_block.hash(), *common_block.header().number()); let peer_id2 = PeerId::random(); - sync.new_peer(peer_id2, common_block.hash(), *common_block.header().number()) - .unwrap(); + sync.new_peer(peer_id2, common_block.hash(), *common_block.header().number()); // Peer 1 announces 107 from fork 1, 100-107 get downloaded. { @@ -864,11 +906,17 @@ fn request_across_forks() { let mut resp_blocks = fork_a_blocks[100_usize..107_usize].to_vec(); resp_blocks.reverse(); let response = create_block_response(resp_blocks.clone()); - let res = sync.on_block_data(&peer, Some(request), response).unwrap(); + + // Drop old actions + let _ = sync.take_actions(); + + sync.on_block_data(&peer, Some(request), response).unwrap(); + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 7_usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 7_usize + )); assert_eq!(sync.best_queued_number, 107); assert_eq!(sync.best_queued_hash, block.hash()); assert!(sync.is_known(&block.header.parent_hash())); @@ -903,11 +951,17 @@ fn request_across_forks() { // block is announced. let request = get_block_request(&mut sync, FromBlock::Hash(block.hash()), 1, &peer); let response = create_block_response(vec![block.clone()]); - let res = sync.on_block_data(&peer, Some(request), response).unwrap(); + + // Drop old actions we are not going to check + let _ = sync.take_actions(); + + sync.on_block_data(&peer, Some(request), response).unwrap(); + let actions = sync.take_actions().collect::>(); + assert_eq!(actions.len(), 1); assert!(matches!( - res, - OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 1_usize - ),); + &actions[0], + ChainSyncAction::ImportBlocks{ origin: _, blocks } if blocks.len() == 1_usize + )); assert!(sync.is_known(&block.header.parent_hash())); } } diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 560887132e3a8..58a9fdc49f209 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -25,10 +25,7 @@ use crate::{ }, block_relay_protocol::{BlockDownloader, BlockResponseError}, block_request_handler::MAX_BLOCKS_IN_RESPONSE, - chain_sync::{ - BlockRequestAction, ChainSync, ImportBlocksAction, ImportJustificationsAction, - OnBlockResponse, OnStateResponse, - }, + chain_sync::{ChainSync, ChainSyncAction}, pending_responses::{PendingResponses, ResponseEvent}, schema::v1::{StateRequest, StateResponse}, service::{ @@ -58,7 +55,7 @@ use schnellru::{ByLength, LruMap}; use tokio::time::{Interval, MissedTickBehavior}; use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; -use sc_consensus::import_queue::ImportQueueService; +use sc_consensus::{import_queue::ImportQueueService, IncomingBlock}; use sc_network::{ config::{ FullNetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, NotificationHandshake, @@ -74,8 +71,11 @@ use sc_network_common::{ }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::{Error as ClientError, HeaderMetadata}; -use sp_consensus::block_validation::BlockAnnounceValidator; -use sp_runtime::traits::{Block as BlockT, Header, NumberFor, Zero}; +use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin}; +use sp_runtime::{ + traits::{Block as BlockT, Header, NumberFor, Zero}, + Justifications, +}; use std::{ collections::{HashMap, HashSet}, @@ -713,11 +713,67 @@ where self.is_major_syncing .store(self.chain_sync.status().state.is_major_syncing(), Ordering::Relaxed); + // Process actions requested by `ChainSync` during `select!`. + self.process_chain_sync_actions(); + // Send outbound requests on `ChanSync`'s behalf. self.send_chain_sync_requests(); } } + fn process_chain_sync_actions(&mut self) { + self.chain_sync.take_actions().for_each(|action| match action { + ChainSyncAction::SendBlockRequest { peer_id, request } => { + // Sending block request implies dropping obsolete pending response as we are not + // interested in it anymore (see [`ChainSyncAction::SendBlockRequest`]). + // Furthermore, only one request at a time is allowed to any peer. + let removed = self.pending_responses.remove(&peer_id); + self.send_block_request(peer_id, request.clone()); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::SendBlockRequest` to {} with {:?}, stale response removed: {}.", + peer_id, + request, + removed, + ) + }, + ChainSyncAction::CancelBlockRequest { peer_id } => { + let removed = self.pending_responses.remove(&peer_id); + + trace!(target: LOG_TARGET, "Processed {action:?}., response removed: {removed}."); + }, + ChainSyncAction::DropPeer(BadPeer(peer_id, rep)) => { + self.pending_responses.remove(&peer_id); + self.network_service + .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); + self.network_service.report_peer(peer_id, rep); + + trace!(target: LOG_TARGET, "Processed {action:?}."); + }, + ChainSyncAction::ImportBlocks { origin, blocks } => { + let count = blocks.len(); + self.import_blocks(origin, blocks); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::ImportBlocks` with {count} blocks.", + ); + }, + ChainSyncAction::ImportJustifications { peer_id, hash, number, justifications } => { + self.import_justifications(peer_id, hash, number, justifications); + + trace!( + target: LOG_TARGET, + "Processed `ChainSyncAction::ImportJustifications` from peer {} for block {} ({}).", + peer_id, + hash, + number, + ) + }, + }); + } + fn perform_periodic_actions(&mut self) { self.report_metrics(); @@ -766,28 +822,7 @@ where ToServiceCommand::ClearJustificationRequests => self.chain_sync.clear_justification_requests(), ToServiceCommand::BlocksProcessed(imported, count, results) => { - for result in self.chain_sync.on_blocks_processed(imported, count, results) { - match result { - Ok(action) => match action { - BlockRequestAction::SendRequest { peer_id, request } => { - // drop obsolete pending response first - self.pending_responses.remove(&peer_id); - self.send_block_request(peer_id, request); - }, - BlockRequestAction::RemoveStale { peer_id } => { - self.pending_responses.remove(&peer_id); - }, - }, - Err(BadPeer(peer_id, repu)) => { - self.pending_responses.remove(&peer_id); - self.network_service.disconnect_peer( - peer_id, - self.block_announce_protocol_name.clone(), - ); - self.network_service.report_peer(peer_id, repu) - }, - } - } + self.chain_sync.on_blocks_processed(imported, count, results); }, ToServiceCommand::JustificationImported(peer_id, hash, number, success) => { self.chain_sync.on_justification_import(hash, number, success); @@ -940,9 +975,7 @@ where } } - if let Some(import_blocks_action) = self.chain_sync.peer_disconnected(&peer_id) { - self.import_blocks(import_blocks_action) - } + self.chain_sync.peer_disconnected(&peer_id); self.pending_responses.remove(&peer_id); self.event_streams.retain(|stream| { @@ -1053,17 +1086,7 @@ where inbound, }; - let req = if peer.info.roles.is_full() { - match self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number) { - Ok(req) => req, - Err(BadPeer(id, repu)) => { - self.network_service.report_peer(id, repu); - return Err(()) - }, - } - } else { - None - }; + self.chain_sync.new_peer(peer_id, peer.info.best_hash, peer.info.best_number); log::debug!(target: LOG_TARGET, "Connected {peer_id}"); @@ -1075,10 +1098,6 @@ where self.num_in_peers += 1; } - if let Some(req) = req { - self.send_block_request(peer_id, req); - } - self.event_streams .retain(|stream| stream.unbounded_send(SyncEvent::PeerConnected(peer_id)).is_ok()); @@ -1202,22 +1221,7 @@ where PeerRequest::Block(req) => { match self.block_downloader.block_response_into_blocks(&req, resp) { Ok(blocks) => { - match self.chain_sync.on_block_response(peer_id, req, blocks) { - OnBlockResponse::SendBlockRequest { peer_id, request } => - self.send_block_request(peer_id, request), - OnBlockResponse::ImportBlocks(import_blocks_action) => - self.import_blocks(import_blocks_action), - OnBlockResponse::ImportJustifications(action) => - self.import_justifications(action), - OnBlockResponse::Nothing => {}, - OnBlockResponse::DisconnectPeer(BadPeer(peer_id, rep)) => { - self.network_service.disconnect_peer( - peer_id, - self.block_announce_protocol_name.clone(), - ); - self.network_service.report_peer(peer_id, rep); - }, - } + self.chain_sync.on_block_response(peer_id, req, blocks); }, Err(BlockResponseError::DecodeFailed(e)) => { debug!( @@ -1262,27 +1266,10 @@ where }, }; - match self.chain_sync.on_state_response(peer_id, response) { - OnStateResponse::ImportBlocks(import_blocks_action) => - self.import_blocks(import_blocks_action), - OnStateResponse::DisconnectPeer(BadPeer(peer_id, rep)) => { - self.network_service.disconnect_peer( - peer_id, - self.block_announce_protocol_name.clone(), - ); - self.network_service.report_peer(peer_id, rep); - }, - OnStateResponse::Nothing => {}, - } + self.chain_sync.on_state_response(peer_id, response); }, PeerRequest::WarpProof => { - if let Err(BadPeer(peer_id, rep)) = - self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp)) - { - self.network_service - .disconnect_peer(peer_id, self.block_announce_protocol_name.clone()); - self.network_service.report_peer(peer_id, rep); - } + self.chain_sync.on_warp_sync_response(&peer_id, EncodedProof(resp)); }, }, Ok(Err(e)) => { @@ -1388,7 +1375,7 @@ where } /// Import blocks. - fn import_blocks(&mut self, ImportBlocksAction { origin, blocks }: ImportBlocksAction) { + fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { if let Some(metrics) = &self.metrics { metrics.import_queue_blocks_submitted.inc(); } @@ -1397,13 +1384,17 @@ where } /// Import justifications. - fn import_justifications(&mut self, action: ImportJustificationsAction) { + fn import_justifications( + &mut self, + peer_id: PeerId, + hash: B::Hash, + number: NumberFor, + justifications: Justifications, + ) { if let Some(metrics) = &self.metrics { metrics.import_queue_justifications_submitted.inc(); } - let ImportJustificationsAction { peer_id, hash, number, justifications } = action; - self.import_queue.import_justifications(peer_id, hash, number, justifications); } }