diff --git a/Cargo.lock b/Cargo.lock index 5133a53f22c..b18c09e7bfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1955,7 +1955,7 @@ dependencies = [ [[package]] name = "parity" -version = "1.11.1" +version = "1.11.2" dependencies = [ "ansi_term 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "atty 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2006,7 +2006,7 @@ dependencies = [ "parity-rpc 1.11.0", "parity-rpc-client 1.4.0", "parity-updater 1.11.0", - "parity-version 1.11.1", + "parity-version 1.11.2", "parity-whisper 0.1.0", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "path 0.1.0", @@ -2054,7 +2054,7 @@ dependencies = [ "parity-reactor 0.1.0", "parity-ui 1.11.0", "parity-ui-deprecation 1.10.0", - "parity-version 1.11.1", + "parity-version 1.11.2", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "registrar 0.0.1", @@ -2210,7 +2210,7 @@ dependencies = [ "order-stat 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "parity-reactor 0.1.0", "parity-updater 1.11.0", - "parity-version 1.11.1", + "parity-version 1.11.2", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "patricia-trie 0.1.0", "pretty_assertions 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2331,7 +2331,7 @@ dependencies = [ "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-hash-fetch 1.11.0", - "parity-version 1.11.1", + "parity-version 1.11.2", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "path 0.1.0", "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2342,7 +2342,7 @@ dependencies = [ [[package]] name = "parity-version" -version = "1.11.1" +version = "1.11.2" dependencies = [ "ethcore-bytes 0.1.0", "rlp 0.2.1", diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 175f3d5c8c6..39bbf092cbf 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -18,7 +18,7 @@ use std::collections::{HashSet, HashMap, BTreeMap, BTreeSet, VecDeque}; use std::fmt; use std::str::FromStr; use std::sync::{Arc, Weak}; -use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; +use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use std::time::{Instant}; // util @@ -208,6 +208,8 @@ pub struct Client { queue_transactions: IoChannelQueue, /// Ancient blocks import queue queue_ancient_blocks: IoChannelQueue, + /// Hashes of pending ancient block wainting to be included + pending_ancient_blocks: RwLock>, /// Consensus messages import queue queue_consensus_message: IoChannelQueue, @@ -461,6 +463,7 @@ impl Importer { let hash = header.hash(); let _import_lock = self.import_lock.lock(); + trace!(target: "client", "Trying to import old block #{}", header.number()); { trace_time!("import_old_block"); // verify the block, passing the chain for updating the epoch verifier. @@ -741,6 +744,7 @@ impl Client { notify: RwLock::new(Vec::new()), queue_transactions: IoChannelQueue::new(MAX_TX_QUEUE_SIZE), queue_ancient_blocks: IoChannelQueue::new(MAX_ANCIENT_BLOCKS_QUEUE_SIZE), + pending_ancient_blocks: RwLock::new(HashSet::new()), queue_consensus_message: IoChannelQueue::new(usize::max_value()), last_hashes: RwLock::new(VecDeque::new()), factories: factories, @@ -1972,7 +1976,7 @@ impl BlockChainClient for Client { impl IoClient for Client { fn queue_transactions(&self, transactions: Vec, peer_id: usize) { let len = transactions.len(); - self.queue_transactions.queue(&mut self.io_channel.lock(), len, move |client| { + self.queue_transactions.queue(&mut self.io_channel.lock(), move |client| { trace_time!("import_queued_transactions"); let txs: Vec = transactions @@ -1996,23 +2000,32 @@ impl IoClient for Client { { // check block order - if self.chain.read().is_known(&header.hash()) { + if self.chain.read().is_known(&hash) { bail!(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain)); } - let status = self.block_status(BlockId::Hash(*header.parent_hash())); - if status == BlockStatus::Unknown || status == BlockStatus::Pending { - bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(*header.parent_hash()))); + + let parent_hash = *header.parent_hash(); + let parent_pending = self.pending_ancient_blocks.read().contains(&parent_hash); + let status = self.block_status(BlockId::Hash(parent_hash)); + if !parent_pending && (status == BlockStatus::Unknown || status == BlockStatus::Pending) { + bail!(BlockImportErrorKind::Block(BlockError::UnknownParent(parent_hash))); } } - match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), 1, move |client| { - client.importer.import_old_block( + self.pending_ancient_blocks.write().insert(hash); + + trace!(target: "client", "Queuing old block #{}", header.number()); + match self.queue_ancient_blocks.queue(&mut self.io_channel.lock(), move |client| { + let result = client.importer.import_old_block( &header, &block_bytes, &receipts_bytes, &**client.db.read(), &*client.chain.read() - ).map(|_| ()).unwrap_or_else(|e| { + ); + + client.pending_ancient_blocks.write().remove(&hash); + result.map(|_| ()).unwrap_or_else(|e| { error!(target: "client", "Error importing ancient block: {}", e); }); }) { @@ -2022,7 +2035,7 @@ impl IoClient for Client { } fn queue_consensus_message(&self, message: Bytes) { - match self.queue_consensus_message.queue(&mut self.io_channel.lock(), 1, move |client| { + match self.queue_consensus_message.queue(&mut self.io_channel.lock(), move |client| { if let Err(e) = client.engine().handle_message(&message) { debug!(target: "poa", "Invalid message received: {}", e); } @@ -2433,35 +2446,38 @@ impl fmt::Display for QueueError { /// Queue some items to be processed by IO client. struct IoChannelQueue { - currently_queued: Arc, + queue: Arc>>>, limit: usize, } impl IoChannelQueue { pub fn new(limit: usize) -> Self { IoChannelQueue { - currently_queued: Default::default(), + queue: Default::default(), limit, } } - pub fn queue(&self, channel: &mut IoChannel, count: usize, fun: F) -> Result<(), QueueError> where - F: Fn(&Client) + Send + Sync + 'static, + pub fn queue(&self, channel: &mut IoChannel, fun: F) -> Result<(), QueueError> + where F: Fn(&Client) + Send + Sync + 'static { - let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); - ensure!(queue_size < self.limit, QueueError::Full(self.limit)); + { + let mut queue = self.queue.lock(); + let queue_size = queue.len(); + ensure!(queue_size < self.limit, QueueError::Full(self.limit)); - let currently_queued = self.currently_queued.clone(); + queue.push_back(Box::new(fun)); + } + + let queue = self.queue.clone(); let result = channel.send(ClientIoMessage::execute(move |client| { - currently_queued.fetch_sub(count, AtomicOrdering::SeqCst); - fun(client); + while let Some(fun) = queue.lock().pop_front() { + fun(client); + } })); match result { - Ok(_) => { - self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst); - Ok(()) - }, + Ok(_) => Ok(()), Err(e) => Err(QueueError::Channel(e)), } } diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 321c783b4db..283f4ed610d 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -266,7 +266,7 @@ impl BlockCollection { } } - /// Get a valid chain of blocks ordered in descending order and ready for importing into blockchain. + /// Get a valid chain of blocks ordered in ascending order and ready for importing into blockchain. pub fn drain(&mut self) -> Vec { if self.blocks.is_empty() || self.head.is_none() { return Vec::new();