diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index ad541391d6b..83a96837f56 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -1092,20 +1092,6 @@ impl MinerService for Miner { fn chain_new_blocks(&self, chain: &MiningBlockChainClient, _imported: &[H256], _invalid: &[H256], enacted: &[H256], retracted: &[H256]) { trace!(target: "miner", "chain_new_blocks"); - fn fetch_transactions(chain: &MiningBlockChainClient, hash: &H256) -> Vec { - let block = chain - .block(BlockId::Hash(*hash)) - // Client should send message after commit to db and inserting to chain. - .expect("Expected in-chain blocks."); - let block = BlockView::new(&block); - let txs = block.transactions(); - // populate sender - for tx in &txs { - let _sender = tx.sender(); - } - txs - } - // 1. We ignore blocks that were `imported` (because it means that they are not in canon-chain, and transactions // should be still available in the queue. // 2. We ignore blocks that are `invalid` because it doesn't have any meaning in terms of the transactions that @@ -1116,35 +1102,29 @@ impl MinerService for Miner { // Then import all transactions... { - let out_of_chain = retracted - .par_iter() - .map(|h| fetch_transactions(chain, h)); - out_of_chain.for_each(|txs| { - let mut transaction_queue = self.transaction_queue.lock(); - let _ = self.add_transactions_to_queue( - chain, txs, TransactionOrigin::RetractedBlock, &mut transaction_queue - ); - }); + retracted.par_iter() + .map(|hash| { + let block = chain.block(BlockId::Hash(*hash)) + .expect("Client is sending message after commit to db and inserting to chain; the block is available; qed"); + let block = BlockView::new(&block); + let txs = block.transactions(); + // populate sender + for tx in &txs { + let _sender = tx.sender(); + } + txs + }).for_each(|txs| { + let mut transaction_queue = self.transaction_queue.lock(); + let _ = self.add_transactions_to_queue( + chain, txs, TransactionOrigin::RetractedBlock, &mut transaction_queue + ); + }); } - // ...and at the end remove old ones + // ...and at the end remove the old ones { - let in_chain = enacted - .par_iter() - .map(|h: &H256| fetch_transactions(chain, h)); - - in_chain.for_each(|mut txs| { - let mut transaction_queue = self.transaction_queue.lock(); - - let to_remove = txs.drain(..) - .map(|tx| { - tx.sender().expect("Transaction is in block, so sender has to be defined.") - }) - .collect::>(); - for sender in to_remove { - transaction_queue.remove_all(sender, chain.latest_nonce(&sender)); - } - }); + let mut transaction_queue = self.transaction_queue.lock(); + transaction_queue.remove_old(|sender| chain.latest_nonce(sender)); } if enacted.len() > 0 { diff --git a/ethcore/src/miner/transaction_queue.rs b/ethcore/src/miner/transaction_queue.rs index cd2d3ba470f..b4cc93a9cfc 100644 --- a/ethcore/src/miner/transaction_queue.rs +++ b/ethcore/src/miner/transaction_queue.rs @@ -79,8 +79,10 @@ //! we check if the transactions should go to `current` (comparing state nonce) //! - When it's removed from `current` - all transactions from this sender (`current` & `future`) are recalculated. //! 3. `remove_all` is used to inform the queue about client (state) nonce changes. -//! - It removes all transactions (either from `current` or `future`) with nonce < client nonce -//! - It moves matching `future` transactions to `current` +//! - It removes all transactions (either from `current` or `future`) with nonce < client nonce +//! - It moves matching `future` transactions to `current` +//! 4. `remove_old` is used as convenient method to update the state nonce for all senders in the queue. +//! - Invokes `remove_all` with latest state nonce for all senders. use std::ops::Deref; use std::cmp::Ordering; @@ -752,6 +754,26 @@ impl TransactionQueue { /// Removes all transactions from particular sender up to (excluding) given client (state) nonce. /// Client (State) Nonce = next valid nonce for this sender. pub fn remove_all(&mut self, sender: Address, client_nonce: U256) { + // Check if there is anything in current... + let should_check_in_current = self.current.by_address.row(&sender) + // If nonce == client_nonce nothing is changed + .and_then(|by_nonce| by_nonce.keys().find(|nonce| *nonce < &client_nonce)) + .map(|_| ()); + // ... or future + let should_check_in_future = self.future.by_address.row(&sender) + // if nonce == client_nonce we need to promote to current + .and_then(|by_nonce| by_nonce.keys().find(|nonce| *nonce <= &client_nonce)) + .map(|_| ()); + + if should_check_in_current.or(should_check_in_future).is_none() { + return; + } + + self.remove_all_internal(sender, client_nonce); + } + + /// Always updates future and moves transactions from current to future. + fn remove_all_internal(&mut self, sender: Address, client_nonce: U256) { // We will either move transaction to future or remove it completely // so there will be no transactions from this sender in current self.last_nonces.remove(&sender); @@ -765,6 +787,20 @@ impl TransactionQueue { assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len()); } + /// Checks the current nonce for all transactions' senders in the queue and removes the old transactions. + pub fn remove_old(&mut self, fetch_nonce: F) where + F: Fn(&Address) -> U256, + { + let senders = self.current.by_address.keys() + .chain(self.future.by_address.keys()) + .cloned() + .collect::>(); + + for sender in senders { + self.remove_all(sender, fetch_nonce(&sender)); + } + } + /// Penalize transactions from sender of transaction with given hash. /// I.e. it should change the priority of the transaction in the queue. /// @@ -847,7 +883,7 @@ impl TransactionQueue { if order.is_some() { // This will keep consistency in queue // Moves all to future and then promotes a batch from current: - self.remove_all(sender, current_nonce); + self.remove_all_internal(sender, current_nonce); assert_eq!(self.future.by_priority.len() + self.current.by_priority.len(), self.by_hash.len()); return; } @@ -2438,7 +2474,7 @@ mod test { } #[test] - fn should_reject_transactions_below_bas_gas() { + fn should_reject_transactions_below_base_gas() { // given let mut txq = TransactionQueue::default(); let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into()); @@ -2457,4 +2493,26 @@ mod test { } + #[test] + fn should_clear_all_old_transactions() { + // given + let mut txq = TransactionQueue::default(); + let (tx1, tx2) = new_tx_pair_default(1.into(), 0.into()); + let (tx3, tx4) = new_tx_pair_default(1.into(), 0.into()); + let nonce1 = tx1.nonce; + + // Insert all transactions + txq.add(tx1, TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap(); + txq.add(tx2, TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap(); + txq.add(tx3, TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap(); + txq.add(tx4, TransactionOrigin::External, &default_account_details, &gas_estimator).unwrap(); + assert_eq!(txq.top_transactions().len(), 4); + + // when + txq.remove_old(|_| nonce1 + U256::one()); + + // then + assert_eq!(txq.top_transactions().len(), 2); + } + } diff --git a/util/table/src/lib.rs b/util/table/src/lib.rs index a13beab11de..f65c1e1712d 100644 --- a/util/table/src/lib.rs +++ b/util/table/src/lib.rs @@ -18,6 +18,7 @@ use std::hash::Hash; use std::collections::HashMap; +use std::collections::hash_map::Keys; /// Structure to hold double-indexed values /// @@ -41,6 +42,11 @@ impl Table } } + /// Returns keys iterator for this Table. + pub fn keys(&self) -> Keys> { + self.map.keys() + } + /// Removes all elements from this Table pub fn clear(&mut self) { self.map.clear();