Skip to content

Commit

Permalink
Keep track of the pending response for each peer individually (parity…
Browse files Browse the repository at this point in the history
…tech#13941)

* Keep track of the pending response for each peer individually

When peer disconnects or the syncing is restarted, remove the pending
response so syncing won't start sending duplicate requests/receive stale
responses from disconnected peers.

Before this commit pending responses where stored in `FuturesUnordered`
which made it hard to keep track of pending responses for each individual
peer.

* Update client/network/sync/src/lib.rs

Co-authored-by: Bastian Köcher <git@kchr.de>

* ".git/.scripts/commands/fmt/fmt.sh"

* Apply suggestions from code review

Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>

* Update client/network/sync/src/lib.rs

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: command-bot <>
Co-authored-by: Dmitry Markin <dmitry@markin.tech>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
  • Loading branch information
4 people authored and ukint-vs committed Apr 23, 2023
1 parent 63dc463 commit 10b40d2
Showing 1 changed file with 108 additions and 8 deletions.
116 changes: 108 additions & 8 deletions client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub struct ChainSync<B: BlockT, Client> {
/// Protocol name used to send out warp sync requests
warp_sync_protocol_name: Option<ProtocolName>,
/// Pending responses
pending_responses: FuturesUnordered<PendingResponse<B>>,
pending_responses: HashMap<PeerId, PendingResponse<B>>,
/// Handle to import queue.
import_queue: Box<dyn ImportQueueService<B>>,
/// Metrics.
Expand Down Expand Up @@ -1236,6 +1236,7 @@ where
gap_sync.blocks.clear_peer_download(who)
}
self.peers.remove(who);
self.pending_responses.remove(who);
self.extra_justifications.peer_disconnected(who);
self.allowed_requests.set_all();
self.fork_targets.retain(|_, target| {
Expand Down Expand Up @@ -1358,7 +1359,7 @@ where

if self.peers.contains_key(&who) {
self.pending_responses
.push(Box::pin(async move { (who, PeerRequest::Block(request), rx.await) }));
.insert(who, Box::pin(async move { (who, PeerRequest::Block(request), rx.await) }));
}

match self.encode_block_request(&opaque_req) {
Expand Down Expand Up @@ -1453,7 +1454,7 @@ where
.notifications_protocol
.clone()
.into(),
pending_responses: Default::default(),
pending_responses: HashMap::new(),
import_queue,
metrics: if let Some(r) = &metrics_registry {
match SyncingMetrics::register(r) {
Expand Down Expand Up @@ -1806,6 +1807,9 @@ where
return None
}

// since the request is not a justification, remove it from pending responses
self.pending_responses.remove(&id);

// handle peers that were in other states.
match self.new_peer(id, p.best_hash, p.best_number) {
Ok(None) => None,
Expand Down Expand Up @@ -2005,7 +2009,7 @@ where

if self.peers.contains_key(&who) {
self.pending_responses
.push(Box::pin(async move { (who, PeerRequest::State, rx.await) }));
.insert(who, Box::pin(async move { (who, PeerRequest::State, rx.await) }));
}

match self.encode_state_request(&request) {
Expand Down Expand Up @@ -2033,7 +2037,7 @@ where

if self.peers.contains_key(&who) {
self.pending_responses
.push(Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) }));
.insert(who, Box::pin(async move { (who, PeerRequest::WarpProof, rx.await) }));
}

match &self.warp_sync_protocol_name {
Expand Down Expand Up @@ -2171,9 +2175,20 @@ where
}

fn poll_pending_responses(&mut self, cx: &mut std::task::Context) -> Poll<ImportResult<B>> {
while let Poll::Ready(Some((id, request, response))) =
self.pending_responses.poll_next_unpin(cx)
{
let ready_responses = self
.pending_responses
.values_mut()
.filter_map(|future| match future.poll_unpin(cx) {
Poll::Pending => None,
Poll::Ready(result) => Some(result),
})
.collect::<Vec<_>>();

for (id, request, response) in ready_responses {
self.pending_responses
.remove(&id)
.expect("Logic error: peer id from pending response is missing in the map.");

match response {
Ok(Ok(resp)) => match request {
PeerRequest::Block(req) => {
Expand Down Expand Up @@ -4092,4 +4107,89 @@ mod test {
let state = AncestorSearchState::<Block>::BinarySearch(1, 3);
assert!(handle_ancestor_search_state(&state, 2, true).is_none());
}

#[test]
fn sync_restart_removes_block_but_not_justification_requests() {
let mut client = Arc::new(TestClientBuilder::new().build());
let block_announce_validator = Box::new(DefaultBlockAnnounceValidator);
let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let (mut sync, _) = ChainSync::new(
SyncMode::Full,
client.clone(),
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
block_announce_validator,
1,
64,
None,
None,
chain_sync_network_handle,
import_queue,
ProtocolName::from("block-request"),
ProtocolName::from("state-request"),
None,
)
.unwrap();

let peers = vec![PeerId::random(), PeerId::random()];

let mut new_blocks = |n| {
for _ in 0..n {
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
}

let info = client.info();
(info.best_hash, info.best_number)
};

let (b1_hash, b1_number) = new_blocks(50);

// add new peer and request blocks from them
sync.new_peer(peers[0], Hash::random(), 42).unwrap();

// we wil send block requests to these peers
// for these blocks we don't know about
for (peer, request) in sync.block_requests() {
sync.send_block_request(peer, request);
}

// add a new peer at a known block
sync.new_peer(peers[1], b1_hash, b1_number).unwrap();

// we request a justification for a block we have locally
sync.request_justification(&b1_hash, b1_number);

// the justification request should be scheduled to the
// new peer which is at the given block
let mut requests = sync.justification_requests().collect::<Vec<_>>();
assert_eq!(requests.len(), 1);
let (peer, request) = requests.remove(0);
sync.send_block_request(peer, request);

assert!(!std::matches!(
sync.peers.get(&peers[0]).unwrap().state,
PeerSyncState::DownloadingJustification(_),
));
assert_eq!(
sync.peers.get(&peers[1]).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);
assert_eq!(sync.pending_responses.len(), 2);

let requests = sync.restart().collect::<Vec<_>>();
assert!(requests.iter().any(|res| res.as_ref().unwrap().0 == peers[0]));

assert_eq!(sync.pending_responses.len(), 1);
assert!(sync.pending_responses.get(&peers[1]).is_some());
assert_eq!(
sync.peers.get(&peers[1]).unwrap().state,
PeerSyncState::DownloadingJustification(b1_hash),
);
sync.peer_disconnected(&peers[1]);
assert_eq!(sync.pending_responses.len(), 0);
}
}

0 comments on commit 10b40d2

Please sign in to comment.