From 10b40d2531e14244caeeb9d98c3df46faf005c25 Mon Sep 17 00:00:00 2001 From: Aaro Altonen <48052676+altonen@users.noreply.github.com> Date: Thu, 20 Apr 2023 16:27:05 +0300 Subject: [PATCH] Keep track of the pending response for each peer individually (#13941) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Keep track of the pending response for each peer individually When peer disconnects or the syncing is restarted, remove the pending response so syncing won't start sending duplicate requests/receive stale responses from disconnected peers. Before this commit pending responses where stored in `FuturesUnordered` which made it hard to keep track of pending responses for each individual peer. * Update client/network/sync/src/lib.rs Co-authored-by: Bastian Köcher * ".git/.scripts/commands/fmt/fmt.sh" * Apply suggestions from code review Co-authored-by: Dmitry Markin Co-authored-by: Sebastian Kunert * Update client/network/sync/src/lib.rs --------- Co-authored-by: Bastian Köcher Co-authored-by: command-bot <> Co-authored-by: Dmitry Markin Co-authored-by: Sebastian Kunert --- client/network/sync/src/lib.rs | 116 ++++++++++++++++++++++++++++++--- 1 file changed, 108 insertions(+), 8 deletions(-) diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 28959e7f9c886..61223d496ad00 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -342,7 +342,7 @@ pub struct ChainSync { /// Protocol name used to send out warp sync requests warp_sync_protocol_name: Option, /// Pending responses - pending_responses: FuturesUnordered>, + pending_responses: HashMap>, /// Handle to import queue. import_queue: Box>, /// Metrics. @@ -1236,6 +1236,7 @@ where gap_sync.blocks.clear_peer_download(who) } self.peers.remove(who); + self.pending_responses.remove(who); self.extra_justifications.peer_disconnected(who); self.allowed_requests.set_all(); self.fork_targets.retain(|_, target| { @@ -1358,7 +1359,7 @@ where if self.peers.contains_key(&who) { self.pending_responses - .push(Box::pin(async move { (who, PeerRequest::Block(request), rx.await) })); + .insert(who, Box::pin(async move { (who, PeerRequest::Block(request), rx.await) })); } match self.encode_block_request(&opaque_req) { @@ -1453,7 +1454,7 @@ where .notifications_protocol .clone() .into(), - pending_responses: Default::default(), + pending_responses: HashMap::new(), import_queue, metrics: if let Some(r) = &metrics_registry { match SyncingMetrics::register(r) { @@ -1806,6 +1807,9 @@ where return None } + // since the request is not a justification, remove it from pending responses + self.pending_responses.remove(&id); + // handle peers that were in other states. match self.new_peer(id, p.best_hash, p.best_number) { Ok(None) => None, @@ -2005,7 +2009,7 @@ where if self.peers.contains_key(&who) { self.pending_responses - .push(Box::pin(async move { (who, PeerRequest::State, rx.await) })); + .insert(who, Box::pin(async move { (who, PeerRequest::State, rx.await) })); } match self.encode_state_request(&request) { @@ -2033,7 +2037,7 @@ where if self.peers.contains_key(&who) { self.pending_responses - .push(Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) })); + .insert(who, Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) })); } match &self.warp_sync_protocol_name { @@ -2171,9 +2175,20 @@ where } fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll> { - while let Poll::Ready(Some((id, request, response))) = - self.pending_responses.poll_next_unpin(cx) - { + let ready_responses = self + .pending_responses + .values_mut() + .filter_map(|future| match future.poll_unpin(cx) { + Poll::Pending => None, + Poll::Ready(result) => Some(result), + }) + .collect::>(); + + for (id, request, response) in ready_responses { + self.pending_responses + .remove(&id) + .expect("Logic error: peer id from pending response is missing in the map."); + match response { Ok(Ok(resp)) => match request { PeerRequest::Block(req) => { @@ -4092,4 +4107,89 @@ mod test { let state = AncestorSearchState::::BinarySearch(1, 3); assert!(handle_ancestor_search_state(&state, 2, true).is_none()); } + + #[test] + fn sync_restart_removes_block_but_not_justification_requests() { + let mut client = Arc::new(TestClientBuilder::new().build()); + let block_announce_validator = Box::new(DefaultBlockAnnounceValidator); + let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new()); + let (_chain_sync_network_provider, chain_sync_network_handle) = + NetworkServiceProvider::new(); + let (mut sync, _) = ChainSync::new( + SyncMode::Full, + client.clone(), + ProtocolId::from("test-protocol-name"), + &Some(String::from("test-fork-id")), + Roles::from(&Role::Full), + block_announce_validator, + 1, + 64, + None, + None, + chain_sync_network_handle, + import_queue, + ProtocolName::from("block-request"), + ProtocolName::from("state-request"), + None, + ) + .unwrap(); + + let peers = vec![PeerId::random(), PeerId::random()]; + + let mut new_blocks = |n| { + for _ in 0..n { + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + block_on(client.import(BlockOrigin::Own, block.clone())).unwrap(); + } + + let info = client.info(); + (info.best_hash, info.best_number) + }; + + let (b1_hash, b1_number) = new_blocks(50); + + // add new peer and request blocks from them + sync.new_peer(peers[0], Hash::random(), 42).unwrap(); + + // we wil send block requests to these peers + // for these blocks we don't know about + for (peer, request) in sync.block_requests() { + sync.send_block_request(peer, request); + } + + // add a new peer at a known block + sync.new_peer(peers[1], b1_hash, b1_number).unwrap(); + + // we request a justification for a block we have locally + sync.request_justification(&b1_hash, b1_number); + + // the justification request should be scheduled to the + // new peer which is at the given block + let mut requests = sync.justification_requests().collect::>(); + assert_eq!(requests.len(), 1); + let (peer, request) = requests.remove(0); + sync.send_block_request(peer, request); + + assert!(!std::matches!( + sync.peers.get(&peers[0]).unwrap().state, + PeerSyncState::DownloadingJustification(_), + )); + assert_eq!( + sync.peers.get(&peers[1]).unwrap().state, + PeerSyncState::DownloadingJustification(b1_hash), + ); + assert_eq!(sync.pending_responses.len(), 2); + + let requests = sync.restart().collect::>(); + assert!(requests.iter().any(|res| res.as_ref().unwrap().0 == peers[0])); + + assert_eq!(sync.pending_responses.len(), 1); + assert!(sync.pending_responses.get(&peers[1]).is_some()); + assert_eq!( + sync.peers.get(&peers[1]).unwrap().state, + PeerSyncState::DownloadingJustification(b1_hash), + ); + sync.peer_disconnected(&peers[1]); + assert_eq!(sync.pending_responses.len(), 0); + } }