From e39e52484417ae3f10bc3baa705140c8a3114f2e Mon Sep 17 00:00:00 2001 From: Andrei Kashin Date: Tue, 25 Apr 2023 11:56:03 +0100 Subject: [PATCH 1/6] feature: Introduce a limit to transaction pool size - Add enum for different transaction outcomes - Introduce transaction pool size accounting --- chain/chunks/src/client.rs | 39 +++++++++++++++++++++---- chain/client/src/client.rs | 18 ++++++++++-- core/chain-configs/src/client_config.rs | 4 +++ nearcore/src/config.rs | 1 + 4 files changed, 54 insertions(+), 8 deletions(-) diff --git a/chain/chunks/src/client.rs b/chain/chunks/src/client.rs index 5f6b811d488..782e7c8da0a 100644 --- a/chain/chunks/src/client.rs +++ b/chain/chunks/src/client.rs @@ -30,27 +30,55 @@ pub enum ShardsManagerResponse { ChunkHeaderReadyForInclusion { chunk_header: ShardChunkHeader, chunk_producer: AccountId }, } +#[derive(Debug)] +pub enum InsertTransactionResult { + /// Transaction was successfully inserted. + Success, + /// Transaction is already in the pool. + Duplicate, + /// Not enough space to fit the transaction. + NoSpaceLeft, +} + pub struct ShardedTransactionPool { tx_pools: HashMap, /// Useful to make tests deterministic and reproducible, /// while keeping the security of randomization of transactions in pool rng_seed: RngSeed, + + /// If set, new transactions that bring the size of the pool over this limit will be rejected. + /// The size is tracked and enforced separately for each shard. + pool_size_limit: Option, } impl ShardedTransactionPool { - pub fn new(rng_seed: RngSeed) -> Self { + pub fn new(rng_seed: RngSeed, pool_size_limit: Option) -> Self { TransactionPool::init_metrics(); - Self { tx_pools: HashMap::new(), rng_seed } + Self { tx_pools: HashMap::new(), rng_seed, pool_size_limit } } pub fn get_pool_iterator(&mut self, shard_id: ShardId) -> Option> { self.tx_pools.get_mut(&shard_id).map(|pool| pool.pool_iterator()) } - /// Returns true if transaction is not in the pool before call - pub fn insert_transaction(&mut self, shard_id: ShardId, tx: SignedTransaction) -> bool { - self.pool_for_shard(shard_id).insert_transaction(tx) + /// Tries to insert the transaction into the pool for a given shard. + pub fn insert_transaction( + &mut self, + shard_id: ShardId, + tx: SignedTransaction, + ) -> InsertTransactionResult { + if let Some(limit) = self.pool_size_limit { + if self.pool_for_shard(shard_id).transaction_size() > limit { + return InsertTransactionResult::NoSpaceLeft; + } + } + + if !self.pool_for_shard(shard_id).insert_transaction(tx) { + return InsertTransactionResult::Duplicate; + } + + InsertTransactionResult::Success } pub fn remove_transactions(&mut self, shard_id: ShardId, transactions: &[SignedTransaction]) { @@ -81,6 +109,7 @@ impl ShardedTransactionPool { shard_id: ShardId, transactions: &[SignedTransaction], ) { + // TODO(akashin): Enforce size limit here as well. self.pool_for_shard(shard_id).reintroduce_transactions(transactions.to_vec()); } } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index b5fcf37c19b..c9ab10c2bd5 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -217,7 +217,8 @@ impl Client { chain.store(), chain_config.background_migration_threads, )?; - let sharded_tx_pool = ShardedTransactionPool::new(rng_seed); + let sharded_tx_pool = + ShardedTransactionPool::new(rng_seed, config.transaction_pool_size_limit); let sync_status = SyncStatus::AwaitingPeers; let genesis_block = chain.genesis_block(); let epoch_sync = EpochSync::new( @@ -2010,8 +2011,19 @@ impl Client { } else { // Transactions only need to be recorded if the node is a validator. if me.is_some() { - self.sharded_tx_pool.insert_transaction(shard_id, tx.clone()); - trace!(target: "client", shard_id, "Recorded a transaction."); + match self.sharded_tx_pool.insert_transaction(shard_id, tx.clone()) { + near_chunks::client::InsertTransactionResult::Success => { + trace!(target: "client", shard_id, "Recorded a transaction."); + } + near_chunks::client::InsertTransactionResult::Duplicate => { + // TODO(akashin): Decide what to do in this case. + return Err(Error::Other("Duplicate transaction inserted".to_string())); + } + near_chunks::client::InsertTransactionResult::NoSpaceLeft => { + // TODO(akashin): Introduce a dedicated error code. + return Err(Error::Other("No space left".to_string())); + } + } } // Active validator: diff --git a/core/chain-configs/src/client_config.rs b/core/chain-configs/src/client_config.rs index fe67ead2893..e7b6ed0ce78 100644 --- a/core/chain-configs/src/client_config.rs +++ b/core/chain-configs/src/client_config.rs @@ -251,6 +251,9 @@ pub struct ClientConfig { pub state_sync_enabled: bool, /// Options for syncing state. pub state_sync: StateSyncConfig, + /// Limit of the size of per-shard transaction pool measured in bytes. If not set, the size + /// will be unbounded. + pub transaction_pool_size_limit: Option, } impl ClientConfig { @@ -323,6 +326,7 @@ impl ClientConfig { flat_storage_creation_period: Duration::from_secs(1), state_sync_enabled: false, state_sync: StateSyncConfig::default(), + transaction_pool_size_limit: None, } } } diff --git a/nearcore/src/config.rs b/nearcore/src/config.rs index f34367dae1b..b122d576ab2 100644 --- a/nearcore/src/config.rs +++ b/nearcore/src/config.rs @@ -680,6 +680,7 @@ impl NearConfig { flat_storage_creation_period: config.store.flat_storage_creation_period, state_sync_enabled: config.state_sync_enabled.unwrap_or(false), state_sync: config.state_sync.unwrap_or_default(), + transaction_pool_size_limit: None, }, network_config: NetworkConfig::new( config.network, From 37edfbecbcb6d68287b7b632acbedc19e02b2367 Mon Sep 17 00:00:00 2001 From: Andrei Kashin Date: Thu, 11 May 2023 11:00:28 +0100 Subject: [PATCH 2/6] Move transaction size limiting deeper --- chain/chunks/src/client.rs | 30 +++++---------------------- chain/client/src/client.rs | 18 +++++++++------- chain/pool/src/lib.rs | 42 ++++++++++++++++++++++++++++++-------- 3 files changed, 49 insertions(+), 41 deletions(-) diff --git a/chain/chunks/src/client.rs b/chain/chunks/src/client.rs index 782e7c8da0a..2a76ee07ba1 100644 --- a/chain/chunks/src/client.rs +++ b/chain/chunks/src/client.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use actix::Message; -use near_pool::{PoolIteratorWrapper, TransactionPool}; +use near_pool::{InsertTransactionResult, PoolIteratorWrapper, TransactionPool}; use near_primitives::{ epoch_manager::RngSeed, sharding::{EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader}, @@ -30,16 +30,6 @@ pub enum ShardsManagerResponse { ChunkHeaderReadyForInclusion { chunk_header: ShardChunkHeader, chunk_producer: AccountId }, } -#[derive(Debug)] -pub enum InsertTransactionResult { - /// Transaction was successfully inserted. - Success, - /// Transaction is already in the pool. - Duplicate, - /// Not enough space to fit the transaction. - NoSpaceLeft, -} - pub struct ShardedTransactionPool { tx_pools: HashMap, @@ -68,17 +58,7 @@ impl ShardedTransactionPool { shard_id: ShardId, tx: SignedTransaction, ) -> InsertTransactionResult { - if let Some(limit) = self.pool_size_limit { - if self.pool_for_shard(shard_id).transaction_size() > limit { - return InsertTransactionResult::NoSpaceLeft; - } - } - - if !self.pool_for_shard(shard_id).insert_transaction(tx) { - return InsertTransactionResult::Duplicate; - } - - InsertTransactionResult::Success + self.pool_for_shard(shard_id).insert_transaction(tx) } pub fn remove_transactions(&mut self, shard_id: ShardId, transactions: &[SignedTransaction]) { @@ -99,9 +79,9 @@ impl ShardedTransactionPool { } fn pool_for_shard(&mut self, shard_id: ShardId) -> &mut TransactionPool { - self.tx_pools - .entry(shard_id) - .or_insert_with(|| TransactionPool::new(Self::random_seed(&self.rng_seed, shard_id))) + self.tx_pools.entry(shard_id).or_insert_with(|| { + TransactionPool::new(Self::random_seed(&self.rng_seed, shard_id), self.pool_size_limit) + }) } pub fn reintroduce_transactions( diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index c9ab10c2bd5..b0c72d4b450 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -38,6 +38,7 @@ use near_network::types::{ HighestHeightPeerInfo, NetworkRequests, PeerManagerAdapter, ReasonForBan, }; use near_o11y::log_assert; +use near_pool::InsertTransactionResult; use near_primitives::block::{Approval, ApprovalInner, ApprovalMessage, Block, BlockHeader, Tip}; use near_primitives::block_header::ApprovalType; use near_primitives::challenge::{Challenge, ChallengeBody}; @@ -2012,16 +2013,19 @@ impl Client { // Transactions only need to be recorded if the node is a validator. if me.is_some() { match self.sharded_tx_pool.insert_transaction(shard_id, tx.clone()) { - near_chunks::client::InsertTransactionResult::Success => { + InsertTransactionResult::Success => { trace!(target: "client", shard_id, "Recorded a transaction."); } - near_chunks::client::InsertTransactionResult::Duplicate => { - // TODO(akashin): Decide what to do in this case. - return Err(Error::Other("Duplicate transaction inserted".to_string())); + InsertTransactionResult::Duplicate => { + trace!(target: "client", shard_id, "Duplicate transaction, not forwarding it."); + return Ok(ProcessTxResponse::ValidTx); } - near_chunks::client::InsertTransactionResult::NoSpaceLeft => { - // TODO(akashin): Introduce a dedicated error code. - return Err(Error::Other("No space left".to_string())); + InsertTransactionResult::NoSpaceLeft => { + if is_forwarded { + trace!(target: "client", shard_id, "No space left for transaction, dropping it."); + } else { + trace!(target: "client", shard_id, "No space left for transaction, trying to forward it."); + } } } } diff --git a/chain/pool/src/lib.rs b/chain/pool/src/lib.rs index d750b6c1c6a..3e456a99f5f 100644 --- a/chain/pool/src/lib.rs +++ b/chain/pool/src/lib.rs @@ -13,6 +13,16 @@ use std::ops::Bound; mod metrics; pub mod types; +#[derive(Debug)] +pub enum InsertTransactionResult { + /// Transaction was successfully inserted. + Success, + /// Transaction is already in the pool. + Duplicate, + /// Not enough space to fit the transaction. + NoSpaceLeft, +} + /// Transaction pool: keeps track of transactions that were not yet accepted into the block chain. pub struct TransactionPool { /// Transactions are grouped by a pair of (account ID, signer public key). @@ -25,17 +35,20 @@ pub struct TransactionPool { key_seed: RngSeed, /// The key after which the pool iterator starts. Doesn't have to be present in the pool. last_used_key: PoolKey, + /// If set, new transactions that bring the size of the pool over this limit will be rejected. + total_transaction_size_limit: Option, /// Total size of transactions in the pool measured in bytes. total_transaction_size: u64, } impl TransactionPool { - pub fn new(key_seed: RngSeed) -> Self { + pub fn new(key_seed: RngSeed, total_transaction_size_limit: Option) -> Self { Self { key_seed, transactions: BTreeMap::new(), unique_transactions: HashSet::new(), last_used_key: CryptoHash::default(), + total_transaction_size_limit, total_transaction_size: 0, } } @@ -53,27 +66,38 @@ impl TransactionPool { } /// Inserts a signed transaction that passed validation into the pool. - pub fn insert_transaction(&mut self, signed_transaction: SignedTransaction) -> bool { + pub fn insert_transaction( + &mut self, + signed_transaction: SignedTransaction, + ) -> InsertTransactionResult { if !self.unique_transactions.insert(signed_transaction.get_hash()) { // The hash of this transaction was already seen, skip it. - return false; + return InsertTransactionResult::Duplicate; } - metrics::TRANSACTION_POOL_TOTAL.inc(); - - let signer_id = &signed_transaction.transaction.signer_id; - let signer_public_key = &signed_transaction.transaction.public_key; // We never expect the total size to go over `u64` during real operation as that would // be more than 10^9 GiB of RAM consumed for transaction pool, so panicing here is intended // to catch a logic error in estimation of transaction size. - self.total_transaction_size = self + let new_total_transaction_size = self .total_transaction_size .checked_add(signed_transaction.get_size()) .expect("Total transaction size is too large"); + if let Some(limit) = self.total_transaction_size_limit { + if new_total_transaction_size > limit { + return InsertTransactionResult::NoSpaceLeft; + } + } + + // At this point transaction is accepted to the pool. + metrics::TRANSACTION_POOL_TOTAL.inc(); + self.total_transaction_size = new_total_transaction_size; + + let signer_id = &signed_transaction.transaction.signer_id; + let signer_public_key = &signed_transaction.transaction.public_key; self.transactions .entry(self.key(signer_id, signer_public_key)) .or_insert_with(Vec::new) .push(signed_transaction); - true + InsertTransactionResult::Success } /// Returns a pool iterator wrapper that implements an iterator-like trait to iterate over From 5ea6fc96812d189b1871bebfc5030b92c15c9d93 Mon Sep 17 00:00:00 2001 From: Andrei Kashin Date: Thu, 11 May 2023 11:23:39 +0100 Subject: [PATCH 3/6] Add a test for transaction pool capacity --- chain/pool/src/lib.rs | 46 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 9 deletions(-) diff --git a/chain/pool/src/lib.rs b/chain/pool/src/lib.rs index 3e456a99f5f..710b168dd00 100644 --- a/chain/pool/src/lib.rs +++ b/chain/pool/src/lib.rs @@ -13,7 +13,7 @@ use std::ops::Bound; mod metrics; pub mod types; -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub enum InsertTransactionResult { /// Transaction was successfully inserted. Success, @@ -66,6 +66,7 @@ impl TransactionPool { } /// Inserts a signed transaction that passed validation into the pool. + #[must_use] pub fn insert_transaction( &mut self, signed_transaction: SignedTransaction, @@ -304,11 +305,11 @@ mod tests { mut transactions: Vec, expected_weight: u32, ) -> (Vec, TransactionPool) { - let mut pool = TransactionPool::new(TEST_SEED); + let mut pool = TransactionPool::new(TEST_SEED, None); let mut rng = thread_rng(); transactions.shuffle(&mut rng); for tx in transactions { - pool.insert_transaction(tx); + assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success); } ( prepare_transactions(&mut pool, expected_weight) @@ -415,12 +416,12 @@ mod tests { }) .collect::>(); - let mut pool = TransactionPool::new(TEST_SEED); + let mut pool = TransactionPool::new(TEST_SEED, None); let mut rng = thread_rng(); transactions.shuffle(&mut rng); for tx in transactions.clone() { println!("{:?}", tx); - pool.insert_transaction(tx); + assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success); } assert_eq!(pool.len(), n as usize); @@ -472,7 +473,10 @@ mod tests { assert_eq!(pool.len(), 5); for tx in transactions { - pool.insert_transaction(tx); + assert!(matches!( + pool.insert_transaction(tx), + InsertTransactionResult::Success | InsertTransactionResult::Duplicate + )); } assert_eq!(pool.len(), 10); let txs = prepare_transactions(&mut pool, 10); @@ -506,7 +510,10 @@ mod tests { assert_eq!(pool.len(), 5); for tx in transactions { - pool.insert_transaction(tx); + assert!(matches!( + pool.insert_transaction(tx), + InsertTransactionResult::Success | InsertTransactionResult::Duplicate + )); } assert_eq!(pool.len(), 10); let txs = prepare_transactions(&mut pool, 5); @@ -519,13 +526,13 @@ mod tests { #[test] fn test_transaction_pool_size() { - let mut pool = TransactionPool::new(TEST_SEED); + let mut pool = TransactionPool::new(TEST_SEED, None); let transactions = generate_transactions("alice.near", "alice.near", 1, 100); let mut total_transaction_size = 0; // Adding transactions increases the size. for tx in transactions.clone() { total_transaction_size += tx.get_size(); - pool.insert_transaction(tx); + assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success); assert_eq!(pool.transaction_size(), total_transaction_size); } // Removing transactions decreases the size. @@ -536,4 +543,25 @@ mod tests { } assert_eq!(pool.transaction_size(), 0); } + + #[test] + fn test_transaction_pool_size_limit() { + let pool_size_limit = 1000; + let mut pool = TransactionPool::new(TEST_SEED, Some(pool_size_limit)); + let transactions = generate_transactions("alice.near", "alice.near", 1, 100); + let mut total_transaction_size = 0; + // Repeatedly fills the pool to capacity and then removes all transactions and starts + // again. + for tx in transactions.clone() { + if total_transaction_size + tx.get_size() > pool_size_limit { + assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::NoSpaceLeft); + pool.remove_transactions(&transactions); + total_transaction_size = 0; + } else { + total_transaction_size += tx.get_size(); + assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success); + } + assert_eq!(pool.transaction_size(), total_transaction_size); + } + } } From ba5faabcc39d9e05b3aa02ec640015cc31639e3b Mon Sep 17 00:00:00 2001 From: Andrei Kashin Date: Thu, 11 May 2023 11:42:45 +0100 Subject: [PATCH 4/6] Add debug info on reintroducing transactions --- chain/chunks/src/client.rs | 15 ++++++++++++--- chain/client/src/client.rs | 20 ++++++++++++++------ chain/pool/src/lib.rs | 7 ------- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/chain/chunks/src/client.rs b/chain/chunks/src/client.rs index 2a76ee07ba1..048a081f1f2 100644 --- a/chain/chunks/src/client.rs +++ b/chain/chunks/src/client.rs @@ -84,13 +84,22 @@ impl ShardedTransactionPool { }) } + /// Reintroduces transactions back during the chain reorg. Returns the number of transactions + /// that were added or are already present in the pool. pub fn reintroduce_transactions( &mut self, shard_id: ShardId, transactions: &[SignedTransaction], - ) { - // TODO(akashin): Enforce size limit here as well. - self.pool_for_shard(shard_id).reintroduce_transactions(transactions.to_vec()); + ) -> usize { + let mut reintroduced_count = 0; + let pool = self.pool_for_shard(shard_id); + for tx in transactions { + reintroduced_count += match pool.insert_transaction(tx.clone()) { + InsertTransactionResult::Success | InsertTransactionResult::Duplicate => 1, + InsertTransactionResult::NoSpaceLeft => 0, + } + } + reintroduced_count } } diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index b0c72d4b450..3a9dce71e5d 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -363,11 +363,15 @@ impl Client { false, &self.shard_tracker, ) { - self.sharded_tx_pool.reintroduce_transactions( - shard_id, - // By now the chunk must be in store, otherwise the block would have been orphaned - self.chain.get_chunk(&chunk_header.chunk_hash()).unwrap().transactions(), - ); + // By now the chunk must be in store, otherwise the block would have been orphaned + let chunk = self.chain.get_chunk(&chunk_header.chunk_hash()).unwrap(); + let reintroduced_count = self + .sharded_tx_pool + .reintroduce_transactions(shard_id, &chunk.transactions()); + if reintroduced_count < chunk.transactions().len() { + debug!(target: "client", "Reintroduced {} transactions out of {}", + reintroduced_count, chunk.transactions().len()); + } } } } @@ -916,7 +920,11 @@ impl Client { }; // Reintroduce valid transactions back to the pool. They will be removed when the chunk is // included into the block. - sharded_tx_pool.reintroduce_transactions(shard_id, &transactions); + let reintroduced_count = sharded_tx_pool.reintroduce_transactions(shard_id, &transactions); + if reintroduced_count < transactions.len() { + debug!(target: "client", "Reintroduced {} transactions out of {}", + reintroduced_count, transactions.len()); + } Ok(transactions) } diff --git a/chain/pool/src/lib.rs b/chain/pool/src/lib.rs index 710b168dd00..398460b4322 100644 --- a/chain/pool/src/lib.rs +++ b/chain/pool/src/lib.rs @@ -149,13 +149,6 @@ impl TransactionPool { } } - /// Reintroduces transactions back during the chain reorg. - pub fn reintroduce_transactions(&mut self, transactions: Vec) { - for tx in transactions { - self.insert_transaction(tx); - } - } - /// Returns the number of unique transactions in the pool. pub fn len(&self) -> usize { self.unique_transactions.len() From dbedf50defcbc2f1eece8c16b0377af8a6d64d3b Mon Sep 17 00:00:00 2001 From: Andrei Kashin Date: Thu, 11 May 2023 14:06:16 +0100 Subject: [PATCH 5/6] Change wording in trace call --- chain/client/src/client.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 3a9dce71e5d..c3d0903368b 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -2030,9 +2030,9 @@ impl Client { } InsertTransactionResult::NoSpaceLeft => { if is_forwarded { - trace!(target: "client", shard_id, "No space left for transaction, dropping it."); + trace!(target: "client", shard_id, "Transaction pool is full, dropping the transaction."); } else { - trace!(target: "client", shard_id, "No space left for transaction, trying to forward it."); + trace!(target: "client", shard_id, "Transaction pool is full, trying to forward the transaction."); } } } From 8a61a3ed908a40b330c76d0faf2c6fcc13fcc2aa Mon Sep 17 00:00:00 2001 From: Andrei Kashin Date: Thu, 11 May 2023 14:28:52 +0100 Subject: [PATCH 6/6] Simplify test for transaction size limit --- chain/pool/src/lib.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/chain/pool/src/lib.rs b/chain/pool/src/lib.rs index 398460b4322..a5f6c3582bb 100644 --- a/chain/pool/src/lib.rs +++ b/chain/pool/src/lib.rs @@ -539,22 +539,17 @@ mod tests { #[test] fn test_transaction_pool_size_limit() { - let pool_size_limit = 1000; - let mut pool = TransactionPool::new(TEST_SEED, Some(pool_size_limit)); let transactions = generate_transactions("alice.near", "alice.near", 1, 100); - let mut total_transaction_size = 0; - // Repeatedly fills the pool to capacity and then removes all transactions and starts - // again. - for tx in transactions.clone() { - if total_transaction_size + tx.get_size() > pool_size_limit { - assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::NoSpaceLeft); - pool.remove_transactions(&transactions); - total_transaction_size = 0; - } else { - total_transaction_size += tx.get_size(); + // Each transaction is at least 1 byte in size, so the last transaction will not fit. + let pool_size_limit = + transactions.iter().map(|tx| tx.get_size()).sum::().checked_sub(1).unwrap(); + let mut pool = TransactionPool::new(TEST_SEED, Some(pool_size_limit)); + for (i, tx) in transactions.iter().cloned().enumerate() { + if i + 1 < transactions.len() { assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success); + } else { + assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::NoSpaceLeft); } - assert_eq!(pool.transaction_size(), total_transaction_size); } } }