diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 25f000c89ac..c023cbdfbef 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1205,7 +1205,7 @@ impl BlockChainClient for Client { } fn import_block(&self, bytes: Bytes) -> Result { - use verification::queue::kind::HasHash; + use verification::queue::kind::BlockLike; use verification::queue::kind::blocks::Unverified; // create unverified block here so the `sha3` calculation can be cached. @@ -1245,7 +1245,9 @@ impl BlockChainClient for Client { } fn chain_info(&self) -> BlockChainInfo { - self.chain.read().chain_info() + let mut chain_info = self.chain.read().chain_info(); + chain_info.pending_total_difficulty = chain_info.total_difficulty + self.block_queue.total_difficulty(); + chain_info } fn additional_params(&self) -> BTreeMap { @@ -1369,6 +1371,7 @@ impl BlockChainClient for Client { PruningInfo { earliest_chain: self.chain.read().first_block_number().unwrap_or(1), earliest_state: self.state_db.lock().journal_db().earliest_era().unwrap_or(0), + state_history_size: Some(self.history), } } diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index a384f12272b..b9a51babdd0 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -92,6 +92,8 @@ pub struct TestBlockChainClient { pub first_block: RwLock>, /// Traces to return pub traces: RwLock>>, + /// Pruning history size to report. + pub history: RwLock>, } /// Used for generating test client blocks. @@ -154,6 +156,7 @@ impl TestBlockChainClient { ancient_block: RwLock::new(None), first_block: RwLock::new(None), traces: RwLock::new(None), + history: RwLock::new(None), }; client.add_blocks(1, EachBlockWith::Nothing); // add genesis block client.genesis_hash = client.last_hash.read().clone(); @@ -314,6 +317,11 @@ impl TestBlockChainClient { let res = res.into_iter().next().unwrap().expect("Successful import"); assert_eq!(res, TransactionImportResult::Current); } + + /// Set reported history size. + pub fn set_history(&self, h: Option) { + *self.history.write() = h; + } } pub fn get_temp_state_db() -> GuardedTempResult { @@ -704,6 +712,7 @@ impl BlockChainClient for TestBlockChainClient { PruningInfo { earliest_chain: 1, earliest_state: 1, + state_history_size: *self.history.read(), } } diff --git a/ethcore/src/types/pruning_info.rs b/ethcore/src/types/pruning_info.rs index 80c3e0ce22d..44331686525 100644 --- a/ethcore/src/types/pruning_info.rs +++ b/ethcore/src/types/pruning_info.rs @@ -28,4 +28,6 @@ pub struct PruningInfo { pub earliest_chain: u64, /// The first block where state requests may be served. pub earliest_state: u64, + /// State pruning history size. + pub state_history_size: Option, } diff --git a/ethcore/src/verification/queue/kind.rs b/ethcore/src/verification/queue/kind.rs index 5d4bb745118..3ce86ad4793 100644 --- a/ethcore/src/verification/queue/kind.rs +++ b/ethcore/src/verification/queue/kind.rs @@ -19,18 +19,21 @@ use engines::Engine; use error::Error; -use util::{HeapSizeOf, H256}; +use util::{HeapSizeOf, H256, U256}; pub use self::blocks::Blocks; pub use self::headers::Headers; /// Something which can produce a hash and a parent hash. -pub trait HasHash { +pub trait BlockLike { /// Get the hash of this item. fn hash(&self) -> H256; /// Get the hash of this item's parent. fn parent_hash(&self) -> H256; + + /// Get the difficulty of this item. + fn difficulty(&self) -> U256; } /// Defines transitions between stages of verification. @@ -45,13 +48,13 @@ pub trait HasHash { /// consistent. pub trait Kind: 'static + Sized + Send + Sync { /// The first stage: completely unverified. - type Input: Sized + Send + HasHash + HeapSizeOf; + type Input: Sized + Send + BlockLike + HeapSizeOf; /// The second stage: partially verified. - type Unverified: Sized + Send + HasHash + HeapSizeOf; + type Unverified: Sized + Send + BlockLike + HeapSizeOf; /// The third stage: completely verified. - type Verified: Sized + Send + HasHash + HeapSizeOf; + type Verified: Sized + Send + BlockLike + HeapSizeOf; /// Attempt to create the `Unverified` item from the input. fn create(input: Self::Input, engine: &Engine) -> Result; @@ -62,14 +65,14 @@ pub trait Kind: 'static + Sized + Send + Sync { /// The blocks verification module. pub mod blocks { - use super::{Kind, HasHash}; + use super::{Kind, BlockLike}; use engines::Engine; use error::Error; use header::Header; use verification::{PreverifiedBlock, verify_block_basic, verify_block_unordered}; - use util::{Bytes, HeapSizeOf, H256}; + use util::{Bytes, HeapSizeOf, H256, U256}; /// A mode for verifying blocks. pub struct Blocks; @@ -126,7 +129,7 @@ pub mod blocks { } } - impl HasHash for Unverified { + impl BlockLike for Unverified { fn hash(&self) -> H256 { self.header.hash() } @@ -134,9 +137,13 @@ pub mod blocks { fn parent_hash(&self) -> H256 { self.header.parent_hash().clone() } + + fn difficulty(&self) -> U256 { + self.header.difficulty().clone() + } } - impl HasHash for PreverifiedBlock { + impl BlockLike for PreverifiedBlock { fn hash(&self) -> H256 { self.header.hash() } @@ -144,12 +151,16 @@ pub mod blocks { fn parent_hash(&self) -> H256 { self.header.parent_hash().clone() } + + fn difficulty(&self) -> U256 { + self.header.difficulty().clone() + } } } /// Verification for headers. pub mod headers { - use super::{Kind, HasHash}; + use super::{Kind, BlockLike}; use engines::Engine; use error::Error; @@ -157,10 +168,12 @@ pub mod headers { use verification::verify_header_params; use util::hash::H256; + use util::U256; - impl HasHash for Header { + impl BlockLike for Header { fn hash(&self) -> H256 { self.hash() } fn parent_hash(&self) -> H256 { self.parent_hash().clone() } + fn difficulty(&self) -> U256 { self.difficulty().clone() } } /// A mode for verifying headers. diff --git a/ethcore/src/verification/queue/mod.rs b/ethcore/src/verification/queue/mod.rs index 673275102a6..774370b4db4 100644 --- a/ethcore/src/verification/queue/mod.rs +++ b/ethcore/src/verification/queue/mod.rs @@ -26,7 +26,7 @@ use error::*; use engines::Engine; use service::*; -use self::kind::{HasHash, Kind}; +use self::kind::{BlockLike, Kind}; pub use types::verification_queue_info::VerificationQueueInfo as QueueInfo; @@ -132,13 +132,14 @@ pub struct VerificationQueue { deleting: Arc, ready_signal: Arc, empty: Arc, - processing: RwLock>, + processing: RwLock>, // hash to difficulty ticks_since_adjustment: AtomicUsize, max_queue_size: usize, max_mem_use: usize, scale_verifiers: bool, verifier_handles: Vec>, state: Arc<(Mutex, Condvar)>, + total_difficulty: RwLock, } struct QueueSignal { @@ -269,7 +270,7 @@ impl VerificationQueue { more_to_verify: more_to_verify, verification: verification, deleting: deleting, - processing: RwLock::new(HashSet::new()), + processing: RwLock::new(HashMap::new()), empty: empty, ticks_since_adjustment: AtomicUsize::new(0), max_queue_size: max(config.max_queue_size, MIN_QUEUE_LIMIT), @@ -277,6 +278,7 @@ impl VerificationQueue { scale_verifiers: scale_verifiers, verifier_handles: verifier_handles, state: state, + total_difficulty: RwLock::new(0.into()), } } @@ -434,6 +436,7 @@ impl VerificationQueue { sizes.unverified.store(0, AtomicOrdering::Release); sizes.verifying.store(0, AtomicOrdering::Release); sizes.verified.store(0, AtomicOrdering::Release); + *self.total_difficulty.write() = 0.into(); self.processing.write().clear(); } @@ -448,7 +451,7 @@ impl VerificationQueue { /// Check if the item is currently in the queue pub fn status(&self, hash: &H256) -> Status { - if self.processing.read().contains(hash) { + if self.processing.read().contains_key(hash) { return Status::Queued; } if self.verification.bad.lock().contains(hash) { @@ -461,7 +464,7 @@ impl VerificationQueue { pub fn import(&self, input: K::Input) -> ImportResult { let h = input.hash(); { - if self.processing.read().contains(&h) { + if self.processing.read().contains_key(&h) { return Err(ImportError::AlreadyQueued.into()); } @@ -480,7 +483,11 @@ impl VerificationQueue { Ok(item) => { self.verification.sizes.unverified.fetch_add(item.heap_size_of_children(), AtomicOrdering::SeqCst); - self.processing.write().insert(h.clone()); + self.processing.write().insert(h.clone(), item.difficulty()); + { + let mut td = self.total_difficulty.write(); + *td = *td + item.difficulty(); + } self.verification.unverified.lock().push_back(item); self.more_to_verify.notify_all(); Ok(h) @@ -511,7 +518,10 @@ impl VerificationQueue { bad.reserve(hashes.len()); for hash in hashes { bad.insert(hash.clone()); - processing.remove(hash); + if let Some(difficulty) = processing.remove(hash) { + let mut td = self.total_difficulty.write(); + *td = *td - difficulty; + } } let mut new_verified = VecDeque::new(); @@ -520,7 +530,10 @@ impl VerificationQueue { if bad.contains(&output.parent_hash()) { removed_size += output.heap_size_of_children(); bad.insert(output.hash()); - processing.remove(&output.hash()); + if let Some(difficulty) = processing.remove(&output.hash()) { + let mut td = self.total_difficulty.write(); + *td = *td - difficulty; + } } else { new_verified.push_back(output); } @@ -538,7 +551,10 @@ impl VerificationQueue { } let mut processing = self.processing.write(); for hash in hashes { - processing.remove(hash); + if let Some(difficulty) = processing.remove(hash) { + let mut td = self.total_difficulty.write(); + *td = *td - difficulty; + } } processing.is_empty() } @@ -592,6 +608,11 @@ impl VerificationQueue { } } + /// Get the total difficulty of all the blocks in the queue. + pub fn total_difficulty(&self) -> U256 { + self.total_difficulty.read().clone() + } + /// Get the current number of working verifiers. pub fn num_verifiers(&self) -> usize { match *self.state.0.lock() { @@ -760,6 +781,22 @@ mod tests { } } + #[test] + fn returns_total_difficulty() { + let queue = get_test_queue(false); + let block = get_good_dummy_block(); + let hash = BlockView::new(&block).header().hash().clone(); + if let Err(e) = queue.import(Unverified::new(block)) { + panic!("error importing block that is valid by definition({:?})", e); + } + queue.flush(); + assert_eq!(queue.total_difficulty(), 131072.into()); + queue.drain(10); + assert_eq!(queue.total_difficulty(), 131072.into()); + queue.mark_as_good(&[ hash ]); + assert_eq!(queue.total_difficulty(), 0.into()); + } + #[test] fn returns_ok_for_drained_duplicates() { let queue = get_test_queue(false); diff --git a/parity/cli/config.full.toml b/parity/cli/config.full.toml index fc55e5294ae..c2456dfcd6d 100644 --- a/parity/cli/config.full.toml +++ b/parity/cli/config.full.toml @@ -93,7 +93,7 @@ notify_work = ["http://localhost:3001"] [footprint] tracing = "auto" pruning = "auto" -pruning_history = 64 +pruning_history = 1200 cache_size_db = 64 cache_size_blocks = 8 cache_size_queue = 50 diff --git a/parity/cli/mod.rs b/parity/cli/mod.rs index 158ae95ec9a..47a0af0bb3d 100644 --- a/parity/cli/mod.rs +++ b/parity/cli/mod.rs @@ -237,7 +237,7 @@ usage! { or |c: &Config| otry!(c.footprint).tracing.clone(), flag_pruning: String = "auto", or |c: &Config| otry!(c.footprint).pruning.clone(), - flag_pruning_history: u64 = 64u64, + flag_pruning_history: u64 = 1200u64, or |c: &Config| otry!(c.footprint).pruning_history.clone(), flag_cache_size_db: u32 = 64u32, or |c: &Config| otry!(c.footprint).cache_size_db.clone(), @@ -629,7 +629,7 @@ mod tests { // -- Footprint Options flag_tracing: "auto".into(), flag_pruning: "auto".into(), - flag_pruning_history: 64u64, + flag_pruning_history: 1200u64, flag_cache_size_db: 64u32, flag_cache_size_blocks: 8u32, flag_cache_size_queue: 50u32, diff --git a/parity/configuration.rs b/parity/configuration.rs index e4e4da08e10..bb6556808bb 100644 --- a/parity/configuration.rs +++ b/parity/configuration.rs @@ -898,7 +898,7 @@ mod tests { file_path: Some("blockchain.json".into()), format: Default::default(), pruning: Default::default(), - pruning_history: 64, + pruning_history: 1200, compaction: Default::default(), wal: true, tracing: Default::default(), @@ -920,7 +920,7 @@ mod tests { dirs: Default::default(), file_path: Some("blockchain.json".into()), pruning: Default::default(), - pruning_history: 64, + pruning_history: 1200, format: Default::default(), compaction: Default::default(), wal: true, @@ -942,7 +942,7 @@ mod tests { dirs: Default::default(), file_path: Some("state.json".into()), pruning: Default::default(), - pruning_history: 64, + pruning_history: 1200, format: Default::default(), compaction: Default::default(), wal: true, @@ -966,7 +966,7 @@ mod tests { dirs: Default::default(), file_path: Some("blockchain.json".into()), pruning: Default::default(), - pruning_history: 64, + pruning_history: 1200, format: Some(DataFormat::Hex), compaction: Default::default(), wal: true, @@ -1001,7 +1001,7 @@ mod tests { dirs: Default::default(), spec: Default::default(), pruning: Default::default(), - pruning_history: 64, + pruning_history: 1200, daemon: None, logger_config: Default::default(), miner_options: Default::default(), diff --git a/sync/src/block_sync.rs b/sync/src/block_sync.rs index c2ff1245792..e5b13835be7 100644 --- a/sync/src/block_sync.rs +++ b/sync/src/block_sync.rs @@ -32,9 +32,8 @@ const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 64; const MAX_RECEPITS_TO_REQUEST: usize = 128; const SUBCHAIN_SIZE: u64 = 256; -const MAX_ROUND_PARENTS: usize = 32; +const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; -const MAX_REORG_BLOCKS: u64 = 20; #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Downloader state @@ -95,27 +94,38 @@ pub struct BlockDownloader { last_imported_hash: H256, /// Number of blocks imported this round imported_this_round: Option, + /// Block number the last round started with. + last_round_start: BlockNumber, + last_round_start_hash: H256, /// Block parents imported this round (hash, parent) round_parents: VecDeque<(H256, H256)>, /// Do we need to download block recetips. download_receipts: bool, /// Sync up to the block with this hash. target_hash: Option, + /// Reorganize up to this many blocks. Up to genesis if `None`, + max_reorg_blocks: Option, + /// Probing range for seeking common best block. + retract_step: u64, } impl BlockDownloader { /// Create a new instance of syncing strategy. - pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> BlockDownloader { + pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber, max_reorg: Option) -> BlockDownloader { BlockDownloader { state: State::Idle, highest_block: None, last_imported_block: start_number, last_imported_hash: start_hash.clone(), + last_round_start: start_number, + last_round_start_hash: start_hash.clone(), blocks: BlockCollection::new(sync_receipts), imported_this_round: None, round_parents: VecDeque::new(), download_receipts: sync_receipts, target_hash: None, + max_reorg_blocks: max_reorg, + retract_step: 1, } } @@ -127,9 +137,12 @@ impl BlockDownloader { /// Mark a block as known in the chain pub fn mark_as_known(&mut self, hash: &H256, number: BlockNumber) { - if number == self.last_imported_block + 1 { + if number >= self.last_imported_block + 1 { self.last_imported_block = number; self.last_imported_hash = hash.clone(); + self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + 1); + self.last_round_start = number; + self.last_round_start_hash = hash.clone(); } } @@ -148,12 +161,6 @@ impl BlockDownloader { self.target_hash = Some(hash.clone()); } - /// Set starting sync block - pub fn _set_start(&mut self, hash: &H256, number: BlockNumber) { - self.last_imported_hash = hash.clone(); - self.last_imported_block = number; - } - /// Unmark header as being downloaded. pub fn clear_header_download(&mut self, hash: &H256) { self.blocks.clear_header_download(hash) @@ -172,6 +179,7 @@ impl BlockDownloader { pub fn reset_to(&mut self, hashes: Vec) { self.reset(); self.blocks.reset_to(hashes); + self.state = State::Blocks; } /// Returns used heap memory size. @@ -260,7 +268,7 @@ impl BlockDownloader { return Ok(DownloadAction::Reset); } else { let best = io.chain().chain_info().best_block_number; - if best > self.last_imported_block && best - self.last_imported_block > MAX_REORG_BLOCKS { + if best > self.last_imported_block && (self.last_imported_block == 0 || best - self.last_imported_block > self.max_reorg_blocks.unwrap_or(u64::max_value())) { trace!(target: "sync", "No common block, disabling peer"); return Err(BlockDownloaderImportError::Invalid); } @@ -336,39 +344,47 @@ impl BlockDownloader { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = State::ChainHead; - trace!(target: "sync", "Starting round (last imported count = {:?}, block = {:?}", self.imported_this_round, self.last_imported_block); + trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. + let start = self.last_round_start; + let start_hash = self.last_round_start_hash; match self.imported_this_round { - Some(n) if n == 0 && self.last_imported_block > 0 => { + Some(n) if n == 0 && start > 0 => { // nothing was imported last round, step back to a previous block // search parent in last round known parents first - if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == self.last_imported_hash) { - self.last_imported_block -= 1; + if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) { + self.last_imported_block = start - 1; self.last_imported_hash = p.clone(); trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); } else { let best = io.chain().chain_info().best_block_number; - if best > self.last_imported_block && best - self.last_imported_block > MAX_REORG_BLOCKS { - debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", self.last_imported_block, self.last_imported_hash); + if best > start && (start == 0 || best - start > self.max_reorg_blocks.unwrap_or(u64::max_value())) { + debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); self.reset(); } else { - match io.chain().block_hash(BlockId::Number(self.last_imported_block - 1)) { + let n = start - min(self.retract_step, start); + self.retract_step *= 2; + match io.chain().block_hash(BlockId::Number(n)) { Some(h) => { - self.last_imported_block -= 1; + self.last_imported_block = n; self.last_imported_hash = h; - trace!(target: "sync", "Searching common header in the blockchain {} ({})", self.last_imported_block, self.last_imported_hash); + trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); } None => { - debug!(target: "sync", "Could not revert to previous block, last: {} ({})", self.last_imported_block, self.last_imported_hash); + debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); self.reset(); } } } } }, - _ => (), + _ => { + self.retract_step = 1; + }, } + self.last_round_start = self.last_imported_block; + self.last_round_start_hash = self.last_imported_hash; self.imported_this_round = None; } @@ -474,6 +490,9 @@ impl BlockDownloader { self.block_imported(&h, number, &parent); }, Err(BlockImportError::Block(BlockError::UnknownParent(_))) if allow_out_of_order => { + break; + }, + Err(BlockImportError::Block(BlockError::UnknownParent(_))) => { trace!(target: "sync", "Unknown new block parent, restarting sync"); break; }, diff --git a/sync/src/chain.rs b/sync/src/chain.rs index 968107ba5c3..72b1cf5687a 100644 --- a/sync/src/chain.rs +++ b/sync/src/chain.rs @@ -372,6 +372,7 @@ impl ChainSync { /// Create a new instance of syncing strategy. pub fn new(config: SyncConfig, chain: &BlockChainClient) -> ChainSync { let chain_info = chain.chain_info(); + let pruning = chain.pruning_info(); let mut sync = ChainSync { state: if config.warp_sync { SyncState::WaitingPeers } else { SyncState::Idle }, starting_block: chain.chain_info().best_block_number, @@ -379,7 +380,7 @@ impl ChainSync { peers: HashMap::new(), handshaking_peers: HashMap::new(), active_peers: HashSet::new(), - new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), + new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number, pruning.state_history_size), old_blocks: None, last_sent_block_number: 0, network_id: config.network_id, @@ -459,6 +460,7 @@ impl ChainSync { fn reset(&mut self, io: &mut SyncIo) { self.new_blocks.reset(); self.snapshot.clear(); + let chain_info = io.chain().chain_info(); if self.state == SyncState::SnapshotData { debug!(target:"sync", "Aborting snapshot restore"); io.snapshot_service().abort_restore(); @@ -466,6 +468,10 @@ impl ChainSync { for (_, ref mut p) in &mut self.peers { if p.block_set != Some(BlockSet::OldBlocks) { p.reset_asking(); + if p.difficulty.is_none() { + // assume peer has up to date difficulty + p.difficulty = Some(chain_info.pending_total_difficulty); + } } } self.state = SyncState::Idle; @@ -557,14 +563,15 @@ impl ChainSync { /// Update sync after the blockchain has been changed externally. pub fn update_targets(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block + let pruning = chain.pruning_info(); let chain = chain.chain_info(); - self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); + self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number, pruning.state_history_size); self.old_blocks = None; if self.download_old_blocks { if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); - let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number); + let mut downloader = BlockDownloader::new(true, &ancient_block_hash, ancient_block_number, pruning.state_history_size); if let Some(hash) = chain.first_block_hash { trace!(target: "sync", "Downloader target set to {:?}", hash); downloader.set_target(&hash); @@ -860,6 +867,12 @@ impl ChainSync { trace!(target: "sync", "Ignoring new block from unconfirmed peer {}", peer_id); return Ok(()); } + let difficulty: U256 = try!(r.val_at(1)); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if peer.difficulty.map_or(true, |pd| difficulty > pd) { + peer.difficulty = Some(difficulty); + } + } let block_rlp = try!(r.at(0)); let header_rlp = try!(block_rlp.at(0)); let h = header_rlp.as_raw().sha3(); @@ -888,6 +901,8 @@ impl ChainSync { trace!(target: "sync", "New block already queued {:?}", h); }, Ok(_) => { + // abort current download of the same block + self.complete_sync(io); self.new_blocks.mark_as_known(&header.hash(), header.number()); trace!(target: "sync", "New block queued {:?} ({})", h, header.number()); }, @@ -906,16 +921,10 @@ impl ChainSync { } else { trace!(target: "sync", "New unknown block {:?}", h); //TODO: handle too many unknown blocks - let difficulty: U256 = try!(r.val_at(1)); - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { - if peer.difficulty.map_or(true, |pd| difficulty > pd) { - peer.difficulty = Some(difficulty); - trace!(target: "sync", "Received block {:?} with no known parent. Peer needs syncing...", h); - } - } self.sync_peer(io, peer_id, true); } } + self.continue_sync(io); Ok(()) } @@ -925,16 +934,24 @@ impl ChainSync { trace!(target: "sync", "Ignoring new hashes from unconfirmed peer {}", peer_id); return Ok(()); } + let hashes: Vec<_> = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::(0), item.val_at::(1))).collect(); + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + // Peer has new blocks with unknown difficulty + peer.difficulty = None; + if let Some(&(Ok(ref h), _)) = hashes.last() { + peer.latest_hash = h.clone(); + } + } if self.state != SyncState::Idle { trace!(target: "sync", "Ignoring new hashes since we're already downloading."); let max = r.iter().take(MAX_NEW_HASHES).map(|item| item.val_at::(1).unwrap_or(0)).fold(0u64, max); if max > self.highest_block.unwrap_or(0) { self.highest_block = Some(max); } + self.continue_sync(io); return Ok(()); } trace!(target: "sync", "{} -> NewHashes ({} entries)", peer_id, r.item_count()); - let hashes = r.iter().take(MAX_NEW_HASHES).map(|item| (item.val_at::(0), item.val_at::(1))); let mut max_height: BlockNumber = 0; let mut new_hashes = Vec::new(); let last_imported_number = self.new_blocks.last_imported_block_number(); @@ -982,6 +999,7 @@ impl ChainSync { self.state = SyncState::NewBlocks; self.sync_peer(io, peer_id, true); } + self.continue_sync(io); Ok(()) } @@ -1106,7 +1124,7 @@ impl ChainSync { thread_rng().shuffle(&mut peers); //TODO: sort by rating // prefer peers with higher protocol version peers.sort_by(|&(_, _, ref v1), &(_, _, ref v2)| v1.cmp(v2)); - trace!(target: "sync", "Syncing with {}/{} peers", self.active_peers.len(), peers.len()); + trace!(target: "sync", "Syncing with peers: {} active, {} confirmed, {} total", self.active_peers.len(), peers.len(), self.peers.len()); for (p, _, _) in peers { if self.active_peers.contains(&p) { self.sync_peer(io, p, false); @@ -1135,12 +1153,13 @@ impl ChainSync { /// Find something to do for a peer. Called for a new peer or when a peer is done with its task. fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) { if !self.active_peers.contains(&peer_id) { - trace!(target: "sync", "Skipping deactivated peer"); + trace!(target: "sync", "Skipping deactivated peer {}", peer_id); return; } let (peer_latest, peer_difficulty, peer_snapshot_number, peer_snapshot_hash) = { if let Some(peer) = self.peers.get_mut(&peer_id) { if peer.asking != PeerAsking::Nothing || !peer.can_sync() { + trace!(target: "sync", "Skipping busy peer {}", peer_id); return; } if self.state == SyncState::Waiting { @@ -1161,7 +1180,7 @@ impl ChainSync { let num_active_peers = self.peers.values().filter(|p| p.asking != PeerAsking::Nothing).count(); let higher_difficulty = peer_difficulty.map_or(true, |pd| pd > syncing_difficulty); - if force || self.state == SyncState::NewBlocks || higher_difficulty || self.old_blocks.is_some() { + if force || higher_difficulty || self.old_blocks.is_some() { match self.state { SyncState::WaitingPeers => { trace!(target: "sync", "Checking snapshot sync: {} vs {}", peer_snapshot_number, chain_info.best_block_number); @@ -1174,9 +1193,10 @@ impl ChainSync { } let have_latest = io.chain().block_status(BlockId::Hash(peer_latest)) != BlockStatus::Unknown; + trace!(target: "sync", "Considering peer {}, force={}, td={:?}, our td={}, latest={}, have_latest={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, peer_latest, have_latest, self.state); if !have_latest && (higher_difficulty || force || self.state == SyncState::NewBlocks) { // check if got new blocks to download - trace!(target: "sync", "Syncing with {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); + trace!(target: "sync", "Syncing with peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); if let Some(request) = self.new_blocks.request_blocks(io, num_active_peers) { self.request_blocks(io, peer_id, request, BlockSet::NewBlocks); if self.state == SyncState::Idle { @@ -1206,6 +1226,8 @@ impl ChainSync { SyncState::SnapshotManifest | //already downloading from other peer SyncState::Waiting | SyncState::SnapshotWaiting => () } + } else { + trace!(target: "sync", "Skipping peer {}, force={}, td={:?}, our td={}, state={:?}", peer_id, force, peer_difficulty, syncing_difficulty, self.state); } } @@ -2035,7 +2057,9 @@ impl ChainSync { /// called when block is imported to chain - propagates the blocks and updates transactions sent to peers pub fn chain_new_blocks(&mut self, io: &mut SyncIo, _imported: &[H256], invalid: &[H256], _enacted: &[H256], _retracted: &[H256], sealed: &[H256], proposed: &[Bytes]) { - if io.is_chain_queue_empty() { + let queue_info = io.chain().queue_info(); + if !self.status().is_syncing(queue_info) || !sealed.is_empty() { + trace!(target: "sync", "Propagating blocks, state={:?}", self.state); self.propagate_latest_blocks(io, sealed); self.propagate_proposed_blocks(io, proposed); } diff --git a/sync/src/tests/chain.rs b/sync/src/tests/chain.rs index 6d1ffaf83eb..3a598ba62f7 100644 --- a/sync/src/tests/chain.rs +++ b/sync/src/tests/chain.rs @@ -255,8 +255,12 @@ fn high_td_attach() { fn disconnect_on_unrelated_chain() { ::env_logger::init().ok(); let mut net = TestNet::new(2); - net.peer(0).chain.add_blocks(200, EachBlockWith::Uncle); - net.peer(1).chain.add_blocks(100, EachBlockWith::Nothing); + net.peer(0).chain.set_history(Some(20)); + net.peer(1).chain.set_history(Some(20)); + net.restart_peer(0); + net.restart_peer(1); + net.peer(0).chain.add_blocks(500, EachBlockWith::Uncle); + net.peer(1).chain.add_blocks(300, EachBlockWith::Nothing); net.sync(); assert_eq!(net.disconnect_events, vec![(0, 0)]); }