Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Keep track of the pending response for each peer individually #13941

Merged
merged 5 commits into from
Apr 20, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 106 additions & 8 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub struct ChainSync<B: BlockT, Client> {
/// Protocol name used to send out warp sync requests
warp_sync_protocol_name: Option<ProtocolName>,
/// Pending responses
pending_responses: FuturesUnordered<PendingResponse<B>>,
pending_responses: HashMap<PeerId, PendingResponse<B>>,
/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,
/// Metrics.
Expand Down Expand Up @@ -1238,6 +1238,7 @@ where
gap_sync.blocks.clear_peer_download(who)
}
self.peers.remove(who);
self.pending_responses.remove(who);
self.extra_justifications.peer_disconnected(who);
self.allowed_requests.set_all();
self.fork_targets.retain(|_, target| {
Expand Down Expand Up @@ -1360,7 +1361,7 @@ where

if self.peers.contains_key(&who) {
self.pending_responses
.push(Box::pin(async move { (who, PeerRequest::Block(request), rx.await) }));
.insert(who, Box::pin(async move { (who, PeerRequest::Block(request), rx.await) }));
}

match self.encode_block_request(&opaque_req) {
Expand Down Expand Up @@ -1457,7 +1458,7 @@ where
.notifications_protocol
.clone()
.into(),
pending_responses: Default::default(),
pending_responses: HashMap::new(),
import_queue,
metrics: if let Some(r) = &metrics_registry {
match SyncingMetrics::register(r) {
Expand Down Expand Up @@ -1810,6 +1811,9 @@ where
return None
}

// since the request is not a justification, remove it from pending responsees
altonen marked this conversation as resolved.
Show resolved Hide resolved
self.pending_responses.remove(&id);

// handle peers that were in other states.
match self.new_peer(id, p.best_hash, p.best_number) {
Ok(None) => None,
Expand Down Expand Up @@ -2009,7 +2013,7 @@ where

if self.peers.contains_key(&who) {
self.pending_responses
.push(Box::pin(async move { (who, PeerRequest::State, rx.await) }));
.insert(who, Box::pin(async move { (who, PeerRequest::State, rx.await) }));
}

match self.encode_state_request(&request) {
Expand Down Expand Up @@ -2037,7 +2041,7 @@ where

if self.peers.contains_key(&who) {
self.pending_responses
.push(Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) }));
.insert(who, Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) }));
}

match &self.warp_sync_protocol_name {
Expand Down Expand Up @@ -2175,9 +2179,18 @@ where
}

fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll<ImportResult<B>> {
while let Poll::Ready(Some((id, request, response))) =
self.pending_responses.poll_next_unpin(cx)
{
let ready_responses = self
.pending_responses
altonen marked this conversation as resolved.
Show resolved Hide resolved
.iter_mut()
.filter_map(|(_, future)| match future.poll_unpin(cx) {
altonen marked this conversation as resolved.
Show resolved Hide resolved
Poll::Pending => None,
Poll::Ready(result) => Some(result),
})
.collect::<Vec<_>>();

for (id, request, response) in ready_responses {
self.pending_responses.remove(&id).expect("peer to exist");
altonen marked this conversation as resolved.
Show resolved Hide resolved

match response {
Ok(Ok(resp)) => match request {
PeerRequest::Block(req) => {
Expand Down Expand Up @@ -4116,4 +4129,89 @@ mod test {
let state = AncestorSearchState::<Block>::BinarySearch(1, 3);
assert!(handle_ancestor_search_state(&state, 2, true).is_none());
}

#[test]
fn sync_restart_removes_block_but_not_justification_requests() {
let mut client = Arc::new(TestClientBuilder::new().build());
let block_announce_validator = Box::new(DefaultBlockAnnounceValidator);
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
client.clone(),
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
block_announce_validator,
1,
64,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
)
.unwrap();

let peers = vec![PeerId::random(), PeerId::random()];

let mut new_blocks = |n| {
for _ in 0..n {
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
}

let info = client.info();
(info.best_hash, info.best_number)
};

let (b1_hash, b1_number) = new_blocks(50);

// add new peer and request blocks from them
sync.new_peer(peers[0], Hash::random(), 42).unwrap();

// we wil send block requests to these peers
// for these blocks we don't know about
for (peer, request) in sync.block_requests() {
sync.send_block_request(peer, request);
}

// add a new peer at a known block
sync.new_peer(peers[1], b1_hash, b1_number).unwrap();

// we request a justification for a block we have locally
sync.request_justification(&b1_hash, b1_number);

// the justification request should be scheduled to the
// new peer which is at the given block
let mut requests = sync.justification_requests().collect::<Vec<_>>();
assert_eq!(requests.len(), 1);
let (peer, request) = requests.remove(0);
sync.send_block_request(peer, request);

assert!(!std::matches!(
sync.peers.get(&peers[0]).unwrap().state,
PeerSyncState::DownloadingJustification(_),
));
assert_eq!(
sync.peers.get(&peers[1]).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);
assert_eq!(sync.pending_responses.len(), 2);

let requests = sync.restart().collect::<Vec<_>>();
assert!(requests.iter().any(|res| res.as_ref().unwrap().0 == peers[0]));

assert_eq!(sync.pending_responses.len(), 1);
assert!(sync.pending_responses.get(&peers[1]).is_some());
assert_eq!(
sync.peers.get(&peers[1]).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);
sync.peer_disconnected(&peers[1]);
assert_eq!(sync.pending_responses.len(), 0);
}
}