Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: Introduce a limit to transaction pool size #8970

Merged
merged 6 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions chain/chunks/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;

use actix::Message;

use near_pool::{PoolIteratorWrapper, TransactionPool};
use near_pool::{InsertTransactionResult, PoolIteratorWrapper, TransactionPool};
use near_primitives::{
epoch_manager::RngSeed,
sharding::{EncodedShardChunk, PartialEncodedChunk, ShardChunk, ShardChunkHeader},
Expand Down Expand Up @@ -36,20 +36,28 @@ pub struct ShardedTransactionPool {
/// Useful to make tests deterministic and reproducible,
/// while keeping the security of randomization of transactions in pool
rng_seed: RngSeed,

/// If set, new transactions that bring the size of the pool over this limit will be rejected.
/// The size is tracked and enforced separately for each shard.
pool_size_limit: Option<u64>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what is the largest valid transaction?

Copy link
Contributor Author

@aborg-dev aborg-dev May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen the number "4MiB" mentioned a few times, e.g. in #8936 and

max_transaction_size 4_194_304
and
if transaction_size > max_transaction_size {

}

impl ShardedTransactionPool {
pub fn new(rng_seed: RngSeed) -> Self {
pub fn new(rng_seed: RngSeed, pool_size_limit: Option<u64>) -> Self {
TransactionPool::init_metrics();
Self { tx_pools: HashMap::new(), rng_seed }
Self { tx_pools: HashMap::new(), rng_seed, pool_size_limit }
}

pub fn get_pool_iterator(&mut self, shard_id: ShardId) -> Option<PoolIteratorWrapper<'_>> {
self.tx_pools.get_mut(&shard_id).map(|pool| pool.pool_iterator())
}

/// Returns true if transaction is not in the pool before call
pub fn insert_transaction(&mut self, shard_id: ShardId, tx: SignedTransaction) -> bool {
/// Tries to insert the transaction into the pool for a given shard.
pub fn insert_transaction(
&mut self,
shard_id: ShardId,
tx: SignedTransaction,
) -> InsertTransactionResult {
self.pool_for_shard(shard_id).insert_transaction(tx)
}

Expand All @@ -71,17 +79,27 @@ impl ShardedTransactionPool {
}

fn pool_for_shard(&mut self, shard_id: ShardId) -> &mut TransactionPool {
self.tx_pools
.entry(shard_id)
.or_insert_with(|| TransactionPool::new(Self::random_seed(&self.rng_seed, shard_id)))
self.tx_pools.entry(shard_id).or_insert_with(|| {
TransactionPool::new(Self::random_seed(&self.rng_seed, shard_id), self.pool_size_limit)
})
}

/// Reintroduces transactions back during the chain reorg. Returns the number of transactions
/// that were added or are already present in the pool.
pub fn reintroduce_transactions(
&mut self,
shard_id: ShardId,
transactions: &[SignedTransaction],
) {
self.pool_for_shard(shard_id).reintroduce_transactions(transactions.to_vec());
) -> usize {
let mut reintroduced_count = 0;
let pool = self.pool_for_shard(shard_id);
for tx in transactions {
reintroduced_count += match pool.insert_transaction(tx.clone()) {
InsertTransactionResult::Success | InsertTransactionResult::Duplicate => 1,
InsertTransactionResult::NoSpaceLeft => 0,
}
}
reintroduced_count
}
}

Expand Down
42 changes: 33 additions & 9 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use near_network::types::{
HighestHeightPeerInfo, NetworkRequests, PeerManagerAdapter, ReasonForBan,
};
use near_o11y::log_assert;
use near_pool::InsertTransactionResult;
use near_primitives::block::{Approval, ApprovalInner, ApprovalMessage, Block, BlockHeader, Tip};
use near_primitives::block_header::ApprovalType;
use near_primitives::challenge::{Challenge, ChallengeBody};
Expand Down Expand Up @@ -217,7 +218,8 @@ impl Client {
chain.store(),
chain_config.background_migration_threads,
)?;
let sharded_tx_pool = ShardedTransactionPool::new(rng_seed);
let sharded_tx_pool =
ShardedTransactionPool::new(rng_seed, config.transaction_pool_size_limit);
let sync_status = SyncStatus::AwaitingPeers;
let genesis_block = chain.genesis_block();
let epoch_sync = EpochSync::new(
Expand Down Expand Up @@ -361,11 +363,15 @@ impl Client {
false,
&self.shard_tracker,
) {
self.sharded_tx_pool.reintroduce_transactions(
shard_id,
// By now the chunk must be in store, otherwise the block would have been orphaned
self.chain.get_chunk(&chunk_header.chunk_hash()).unwrap().transactions(),
);
// By now the chunk must be in store, otherwise the block would have been orphaned
let chunk = self.chain.get_chunk(&chunk_header.chunk_hash()).unwrap();
let reintroduced_count = self
.sharded_tx_pool
.reintroduce_transactions(shard_id, &chunk.transactions());
if reintroduced_count < chunk.transactions().len() {
debug!(target: "client", "Reintroduced {} transactions out of {}",
reintroduced_count, chunk.transactions().len());
}
}
}
}
Expand Down Expand Up @@ -914,7 +920,11 @@ impl Client {
};
// Reintroduce valid transactions back to the pool. They will be removed when the chunk is
// included into the block.
sharded_tx_pool.reintroduce_transactions(shard_id, &transactions);
let reintroduced_count = sharded_tx_pool.reintroduce_transactions(shard_id, &transactions);
if reintroduced_count < transactions.len() {
debug!(target: "client", "Reintroduced {} transactions out of {}",
reintroduced_count, transactions.len());
}
Ok(transactions)
}

Expand Down Expand Up @@ -2010,8 +2020,22 @@ impl Client {
} else {
// Transactions only need to be recorded if the node is a validator.
if me.is_some() {
self.sharded_tx_pool.insert_transaction(shard_id, tx.clone());
trace!(target: "client", shard_id, "Recorded a transaction.");
match self.sharded_tx_pool.insert_transaction(shard_id, tx.clone()) {
InsertTransactionResult::Success => {
trace!(target: "client", shard_id, "Recorded a transaction.");
}
InsertTransactionResult::Duplicate => {
trace!(target: "client", shard_id, "Duplicate transaction, not forwarding it.");
return Ok(ProcessTxResponse::ValidTx);
}
InsertTransactionResult::NoSpaceLeft => {
if is_forwarded {
trace!(target: "client", shard_id, "Transaction pool is full, dropping the transaction.");
} else {
trace!(target: "client", shard_id, "Transaction pool is full, trying to forward the transaction.");
}
}
}
}

// Active validator:
Expand Down
88 changes: 64 additions & 24 deletions chain/pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ use std::ops::Bound;
mod metrics;
pub mod types;

#[derive(Debug, PartialEq)]
pub enum InsertTransactionResult {
/// Transaction was successfully inserted.
Success,
/// Transaction is already in the pool.
Duplicate,
/// Not enough space to fit the transaction.
NoSpaceLeft,
}

/// Transaction pool: keeps track of transactions that were not yet accepted into the block chain.
pub struct TransactionPool {
/// Transactions are grouped by a pair of (account ID, signer public key).
Expand All @@ -25,17 +35,20 @@ pub struct TransactionPool {
key_seed: RngSeed,
/// The key after which the pool iterator starts. Doesn't have to be present in the pool.
last_used_key: PoolKey,
/// If set, new transactions that bring the size of the pool over this limit will be rejected.
total_transaction_size_limit: Option<u64>,
/// Total size of transactions in the pool measured in bytes.
total_transaction_size: u64,
}

impl TransactionPool {
pub fn new(key_seed: RngSeed) -> Self {
pub fn new(key_seed: RngSeed, total_transaction_size_limit: Option<u64>) -> Self {
Self {
key_seed,
transactions: BTreeMap::new(),
unique_transactions: HashSet::new(),
last_used_key: CryptoHash::default(),
total_transaction_size_limit,
total_transaction_size: 0,
}
}
Expand All @@ -53,27 +66,39 @@ impl TransactionPool {
}

/// Inserts a signed transaction that passed validation into the pool.
pub fn insert_transaction(&mut self, signed_transaction: SignedTransaction) -> bool {
#[must_use]
pub fn insert_transaction(
&mut self,
signed_transaction: SignedTransaction,
) -> InsertTransactionResult {
if !self.unique_transactions.insert(signed_transaction.get_hash()) {
// The hash of this transaction was already seen, skip it.
return false;
return InsertTransactionResult::Duplicate;
}
metrics::TRANSACTION_POOL_TOTAL.inc();

let signer_id = &signed_transaction.transaction.signer_id;
let signer_public_key = &signed_transaction.transaction.public_key;
// We never expect the total size to go over `u64` during real operation as that would
// be more than 10^9 GiB of RAM consumed for transaction pool, so panicing here is intended
// to catch a logic error in estimation of transaction size.
self.total_transaction_size = self
let new_total_transaction_size = self
.total_transaction_size
.checked_add(signed_transaction.get_size())
.expect("Total transaction size is too large");
if let Some(limit) = self.total_transaction_size_limit {
if new_total_transaction_size > limit {
return InsertTransactionResult::NoSpaceLeft;
}
}

// At this point transaction is accepted to the pool.
metrics::TRANSACTION_POOL_TOTAL.inc();
self.total_transaction_size = new_total_transaction_size;

let signer_id = &signed_transaction.transaction.signer_id;
let signer_public_key = &signed_transaction.transaction.public_key;
self.transactions
.entry(self.key(signer_id, signer_public_key))
.or_insert_with(Vec::new)
.push(signed_transaction);
true
InsertTransactionResult::Success
}

/// Returns a pool iterator wrapper that implements an iterator-like trait to iterate over
Expand Down Expand Up @@ -124,13 +149,6 @@ impl TransactionPool {
}
}

/// Reintroduces transactions back during the chain reorg.
pub fn reintroduce_transactions(&mut self, transactions: Vec<SignedTransaction>) {
for tx in transactions {
self.insert_transaction(tx);
}
}

/// Returns the number of unique transactions in the pool.
pub fn len(&self) -> usize {
self.unique_transactions.len()
Expand Down Expand Up @@ -280,11 +298,11 @@ mod tests {
mut transactions: Vec<SignedTransaction>,
expected_weight: u32,
) -> (Vec<u64>, TransactionPool) {
let mut pool = TransactionPool::new(TEST_SEED);
let mut pool = TransactionPool::new(TEST_SEED, None);
let mut rng = thread_rng();
transactions.shuffle(&mut rng);
for tx in transactions {
pool.insert_transaction(tx);
assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success);
}
(
prepare_transactions(&mut pool, expected_weight)
Expand Down Expand Up @@ -391,12 +409,12 @@ mod tests {
})
.collect::<Vec<_>>();

let mut pool = TransactionPool::new(TEST_SEED);
let mut pool = TransactionPool::new(TEST_SEED, None);
let mut rng = thread_rng();
transactions.shuffle(&mut rng);
for tx in transactions.clone() {
println!("{:?}", tx);
pool.insert_transaction(tx);
assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success);
}
assert_eq!(pool.len(), n as usize);

Expand Down Expand Up @@ -448,7 +466,10 @@ mod tests {
assert_eq!(pool.len(), 5);

for tx in transactions {
pool.insert_transaction(tx);
assert!(matches!(
pool.insert_transaction(tx),
InsertTransactionResult::Success | InsertTransactionResult::Duplicate
));
}
assert_eq!(pool.len(), 10);
let txs = prepare_transactions(&mut pool, 10);
Expand Down Expand Up @@ -482,7 +503,10 @@ mod tests {
assert_eq!(pool.len(), 5);

for tx in transactions {
pool.insert_transaction(tx);
assert!(matches!(
pool.insert_transaction(tx),
InsertTransactionResult::Success | InsertTransactionResult::Duplicate
));
}
assert_eq!(pool.len(), 10);
let txs = prepare_transactions(&mut pool, 5);
Expand All @@ -495,13 +519,13 @@ mod tests {

#[test]
fn test_transaction_pool_size() {
let mut pool = TransactionPool::new(TEST_SEED);
let mut pool = TransactionPool::new(TEST_SEED, None);
let transactions = generate_transactions("alice.near", "alice.near", 1, 100);
let mut total_transaction_size = 0;
// Adding transactions increases the size.
for tx in transactions.clone() {
total_transaction_size += tx.get_size();
pool.insert_transaction(tx);
assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success);
assert_eq!(pool.transaction_size(), total_transaction_size);
}
// Removing transactions decreases the size.
Expand All @@ -512,4 +536,20 @@ mod tests {
}
assert_eq!(pool.transaction_size(), 0);
}

#[test]
fn test_transaction_pool_size_limit() {
let transactions = generate_transactions("alice.near", "alice.near", 1, 100);
// Each transaction is at least 1 byte in size, so the last transaction will not fit.
let pool_size_limit =
transactions.iter().map(|tx| tx.get_size()).sum::<u64>().checked_sub(1).unwrap();
let mut pool = TransactionPool::new(TEST_SEED, Some(pool_size_limit));
for (i, tx) in transactions.iter().cloned().enumerate() {
if i + 1 < transactions.len() {
assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::Success);
} else {
assert_eq!(pool.insert_transaction(tx), InsertTransactionResult::NoSpaceLeft);
}
}
}
}
4 changes: 4 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ pub struct ClientConfig {
pub state_sync_enabled: bool,
/// Options for syncing state.
pub state_sync: StateSyncConfig,
/// Limit of the size of per-shard transaction pool measured in bytes. If not set, the size
/// will be unbounded.
pub transaction_pool_size_limit: Option<u64>,
}

impl ClientConfig {
Expand Down Expand Up @@ -323,6 +326,7 @@ impl ClientConfig {
flat_storage_creation_period: Duration::from_secs(1),
state_sync_enabled: false,
state_sync: StateSyncConfig::default(),
transaction_pool_size_limit: None,
}
}
}
1 change: 1 addition & 0 deletions nearcore/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ impl NearConfig {
flat_storage_creation_period: config.store.flat_storage_creation_period,
state_sync_enabled: config.state_sync_enabled.unwrap_or(false),
state_sync: config.state_sync.unwrap_or_default(),
transaction_pool_size_limit: None,
},
network_config: NetworkConfig::new(
config.network,
Expand Down