Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Enable parallel block download #4014

Merged
merged 1 commit into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 13 additions & 36 deletions core/network/src/protocol/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ impl<B: BlockT> ChainSync<B> {
state: PeerSyncState::Available,
recently_announced: Default::default(),
});
return Ok(self.select_new_blocks(who).map(|(_, req)| req))
self.is_idle = false;
return Ok(None)
}

let common_best = std::cmp::min(self.best_queued_number, info.best_number);
Expand Down Expand Up @@ -567,6 +568,7 @@ impl<B: BlockT> ChainSync<B> {
trace!(target: "sync", "Too many blocks in the queue.");
return Either::Left(std::iter::empty())
}
let major_sync = self.status().state == SyncState::Downloading;
let blocks = &mut self.blocks;
let attrs = &self.required_block_attributes;
let fork_targets = &self.fork_targets;
Expand Down Expand Up @@ -596,7 +598,7 @@ impl<B: BlockT> ChainSync<B> {
peer.state = PeerSyncState::DownloadingStale(hash);
have_requests = true;
Some((id.clone(), req))
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs) {
} else if let Some((range, req)) = peer_block_request(id, peer, blocks, attrs, major_sync) {
peer.state = PeerSyncState::DownloadingNew(range.start);
trace!(target: "sync", "New block request for {}", id);
have_requests = true;
Expand Down Expand Up @@ -1123,39 +1125,6 @@ impl<B: BlockT> ChainSync<B> {
})
}

/// Select a range of new blocks to download from the given peer.
fn select_new_blocks(&mut self, who: PeerId) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
// when there are too many blocks in the queue => do not try to download new blocks
if self.queue_blocks.len() > MAX_IMPORTING_BLOCKS {
trace!(target: "sync", "Too many blocks in the queue.");
return None
}

let peer = self.peers.get_mut(&who)?;

if !peer.state.is_available() {
trace!(target: "sync", "Peer {} is busy", who);
return None
}

trace!(
target: "sync",
"Considering new block download from {}, common block is {}, best is {:?}",
who,
peer.common_number,
peer.best_number
);

if let Some((range, req)) = peer_block_request(&who, peer, &mut self.blocks, &self.required_block_attributes) {
trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end);
peer.state = PeerSyncState::DownloadingNew(range.start);
Some((range, req))
} else {
trace!(target: "sync", "Nothing to request from {}", who);
None
}
}

/// What is the status of the block corresponding to the given hash?
fn block_status(&self, hash: &B::Hash) -> Result<BlockStatus, ClientError> {
if self.queue_blocks.contains(hash) {
Expand Down Expand Up @@ -1254,8 +1223,16 @@ fn peer_block_request<B: BlockT>(
peer: &PeerSync<B>,
blocks: &mut BlockCollection<B>,
attrs: &message::BlockAttributes,
major_sync: bool,
) -> Option<(Range<NumberFor<B>>, BlockRequest<B>)> {
if let Some(range) = blocks.needed_blocks(id.clone(), MAX_BLOCKS_TO_REQUEST, peer.best_number, peer.common_number) {
let max_parallel = if major_sync { 1 } else { 3 };
if let Some(range) = blocks.needed_blocks(
id.clone(),
MAX_BLOCKS_TO_REQUEST,
peer.best_number,
peer.common_number,
max_parallel,
) {
let request = message::generic::BlockRequest {
id: 0,
fields: attrs.clone(),
Expand Down
36 changes: 19 additions & 17 deletions core/network/src/protocol/sync/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use libp2p::PeerId;
use sr_primitives::traits::{Block as BlockT, NumberFor, One};
use crate::message;

const MAX_PARALLEL_DOWNLOADS: u32 = 1;

/// Block data with origin.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockData<B: BlockT> {
Expand Down Expand Up @@ -84,9 +82,7 @@ impl<B: BlockT> BlockCollection<B> {

match self.blocks.get(&start) {
Some(&BlockRangeState::Downloading { .. }) => {
trace!(target: "sync", "Ignored block data still marked as being downloaded: {}", start);
debug_assert!(false);
return;
trace!(target: "sync", "Inserting block data still marked as being downloaded: {}", start);
},
Some(&BlockRangeState::Complete(ref existing)) if existing.len() >= blocks.len() => {
trace!(target: "sync", "Ignored block data already downloaded: {}", start);
Expand All @@ -100,8 +96,15 @@ impl<B: BlockT> BlockCollection<B> {
}

/// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded.
pub fn needed_blocks(&mut self, who: PeerId, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>)
-> Option<Range<NumberFor<B>>> {
pub fn needed_blocks(
&mut self,
who: PeerId,
count: usize,
peer_best: NumberFor<B>,
common: NumberFor<B>,
max_parallel: u32,
) -> Option<Range<NumberFor<B>>>
{
// First block number that we need to download
let first_different = common + <NumberFor<B>>::one();
let count = (count as u32).into();
Expand All @@ -112,7 +115,7 @@ impl<B: BlockT> BlockCollection<B> {
let next = downloading_iter.next();
break match &(prev, next) {
&(Some((start, &BlockRangeState::Downloading { ref len, downloading })), _)
if downloading < MAX_PARALLEL_DOWNLOADS =>
if downloading < max_parallel =>
(*start .. *start + *len, downloading),
&(Some((start, r)), Some((next_start, _))) if *start + r.len() < *next_start =>
(*start + r.len() .. cmp::min(*next_start, *start + r.len() + count), 0), // gap
Expand Down Expand Up @@ -185,7 +188,6 @@ impl<B: BlockT> BlockCollection<B> {
true
},
_ => {
debug_assert!(false);
false
}
};
Expand Down Expand Up @@ -242,18 +244,18 @@ mod test {
let peer2 = PeerId::random();

let blocks = generate_blocks(150);
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(1 .. 41));
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(41 .. 81));
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0), Some(81 .. 121));
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1), Some(1 .. 41));
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1), Some(41 .. 81));
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 0, 1), Some(81 .. 121));

bc.clear_peer_download(&peer1);
bc.insert(41, blocks[41..81].to_vec(), peer1.clone());
assert_eq!(bc.drain(1), vec![]);
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0), Some(121 .. 151));
assert_eq!(bc.needed_blocks(peer1.clone(), 40, 150, 0, 1), Some(121 .. 151));
bc.clear_peer_download(&peer0);
bc.insert(1, blocks[1..11].to_vec(), peer0.clone());

assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0), Some(11 .. 41));
assert_eq!(bc.needed_blocks(peer0.clone(), 40, 150, 0, 1), Some(11 .. 41));
assert_eq!(bc.drain(1), blocks[1..11].iter()
.map(|b| BlockData { block: b.clone(), origin: Some(peer0.clone()) }).collect::<Vec<_>>());

Expand All @@ -267,7 +269,7 @@ mod test {
.map(|b| BlockData { block: b.clone(), origin: Some(peer1.clone()) }).collect::<Vec<_>>()[..]);

bc.clear_peer_download(&peer2);
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80), Some(81 .. 121));
assert_eq!(bc.needed_blocks(peer2.clone(), 40, 150, 80, 1), Some(81 .. 121));
bc.clear_peer_download(&peer2);
bc.insert(81, blocks[81..121].to_vec(), peer2.clone());
bc.clear_peer_download(&peer1);
Expand All @@ -292,7 +294,7 @@ mod test {
bc.blocks.insert(114305, BlockRangeState::Complete(blocks));

let peer0 = PeerId::random();
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000), Some(1 .. 100));
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600), Some(100 + 128 .. 100 + 128 + 128));
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 000, 1), Some(1 .. 100));
assert_eq!(bc.needed_blocks(peer0.clone(), 128, 10000, 600, 1), Some(100 + 128 .. 100 + 128 + 128));
}
}