From 157788dfc430d86c26f8e90f412fbb78a4083787 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 20 Aug 2024 14:10:59 +0300 Subject: [PATCH 01/14] Some mempool tweaks --- mining/src/mempool/config.rs | 6 ++---- mining/src/mempool/model/transactions_pool.rs | 6 +++++- mining/src/mempool/model/tx.rs | 4 ++++ 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/mining/src/mempool/config.rs b/mining/src/mempool/config.rs index 419a4362a..1cf4174e2 100644 --- a/mining/src/mempool/config.rs +++ b/mining/src/mempool/config.rs @@ -3,7 +3,7 @@ use kaspa_consensus_core::constants::TX_VERSION; pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: u32 = 1_000_000; pub(crate) const DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS: u64 = 5; -pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 60; +pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 24 * 60 * 60; pub(crate) const DEFAULT_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10; pub(crate) const DEFAULT_ACCEPTED_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 120; pub(crate) const DEFAULT_ACCEPTED_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10; @@ -11,9 +11,7 @@ pub(crate) const DEFAULT_ORPHAN_EXPIRE_INTERVAL_SECONDS: u64 = 60; pub(crate) const DEFAULT_ORPHAN_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10; pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_MASS: u64 = 100_000; - -// TODO: when rusty-kaspa nodes run most of the network, consider increasing this value -pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_COUNT: u64 = 50; +pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_COUNT: u64 = 500; /// DEFAULT_MINIMUM_RELAY_TRANSACTION_FEE specifies the minimum transaction fee for a transaction to be accepted to /// the mempool and relayed. It is specified in sompi per 1kg (or 1000 grams) of transaction mass. diff --git a/mining/src/mempool/model/transactions_pool.rs b/mining/src/mempool/model/transactions_pool.rs index bc3469409..67b9988b4 100644 --- a/mining/src/mempool/model/transactions_pool.rs +++ b/mining/src/mempool/model/transactions_pool.rs @@ -21,7 +21,7 @@ use kaspa_consensus_core::{ use kaspa_core::{time::unix_now, trace, warn}; use std::{ collections::{hash_map::Keys, hash_set::Iter}, - sync::Arc, + sync::{atomic::AtomicU64, Arc}, }; use super::frontier::Frontier; @@ -64,6 +64,8 @@ pub(crate) struct TransactionsPool { /// last expire scan time in milliseconds last_expire_scan_time: u64, + total_compute_mass: u64, + /// Store of UTXOs utxo_set: MempoolUtxoSet, } @@ -79,6 +81,7 @@ impl TransactionsPool { last_expire_scan_daa_score: 0, last_expire_scan_time: unix_now(), utxo_set: MempoolUtxoSet::new(), + total_compute_mass: 0, } } @@ -117,6 +120,7 @@ impl TransactionsPool { } self.utxo_set.add_transaction(&transaction.mtx); + self.total_compute_mass += transaction.calculated_compute_mass().expect("we expect this field to be already calculated"); self.all_transactions.insert(id, transaction); trace!("Added transaction {}", id); Ok(()) diff --git a/mining/src/mempool/model/tx.rs b/mining/src/mempool/model/tx.rs index 9b65faeb2..86a68087f 100644 --- a/mining/src/mempool/model/tx.rs +++ b/mining/src/mempool/model/tx.rs @@ -32,6 +32,10 @@ impl MempoolTransaction { let parent_id = self.id(); transaction.tx.inputs.iter().any(|x| x.previous_outpoint.transaction_id == parent_id) } + + pub(crate) fn calculated_compute_mass(&self) -> Option { + self.mtx.calculated_compute_mass + } } impl RbfPolicy { From 955a3e72b74e68a8bb053dec54b8a063f5840259 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Thu, 22 Aug 2024 19:24:32 +0300 Subject: [PATCH 02/14] Change mempool tx eviction policy --- mining/errors/src/mempool.rs | 6 +- mining/src/mempool/config.rs | 13 ++- mining/src/mempool/model/frontier.rs | 8 +- mining/src/mempool/model/transactions_pool.rs | 84 +++++++------------ .../validate_and_insert_transaction.rs | 16 +++- 5 files changed, 60 insertions(+), 67 deletions(-) diff --git a/mining/errors/src/mempool.rs b/mining/errors/src/mempool.rs index be8ff389a..319aaa484 100644 --- a/mining/errors/src/mempool.rs +++ b/mining/errors/src/mempool.rs @@ -33,9 +33,9 @@ pub enum RuleError { #[error("replace by fee found more than one double spending transaction in the mempool")] RejectRbfTooManyDoubleSpendingTransactions, - /// New behavior: a transaction is rejected if the mempool is full - #[error("number of high-priority transactions in mempool ({0}) has reached the maximum allowed ({1})")] - RejectMempoolIsFull(usize, u64), + /// a transaction is rejected if the mempool is full + #[error("transaction could not be added to the mempool because it's full with transactions with higher priority")] + RejectMempoolIsFull, /// An error emitted by mining\src\mempool\check_transaction_standard.rs #[error("transaction {0} is not standard: {1}")] diff --git a/mining/src/mempool/config.rs b/mining/src/mempool/config.rs index 1cf4174e2..cfd07f383 100644 --- a/mining/src/mempool/config.rs +++ b/mining/src/mempool/config.rs @@ -1,6 +1,7 @@ use kaspa_consensus_core::constants::TX_VERSION; -pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: u32 = 1_000_000; +pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: usize = 1_000_000; +pub(crate) const DEFAULT_MEMPOOL_COMPUTE_MASS_LIMIT: u64 = 500_000_000; // This limits the mempool to about 500 MB. pub(crate) const DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS: u64 = 5; pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 24 * 60 * 60; @@ -26,7 +27,8 @@ pub(crate) const DEFAULT_MAXIMUM_STANDARD_TRANSACTION_VERSION: u16 = TX_VERSION; #[derive(Clone, Debug)] pub struct Config { - pub maximum_transaction_count: u32, + pub maximum_transaction_count: usize, + pub mempool_compute_mass_limit: u64, pub maximum_build_block_template_attempts: u64, pub transaction_expire_interval_daa_score: u64, pub transaction_expire_scan_interval_daa_score: u64, @@ -49,7 +51,8 @@ pub struct Config { impl Config { #[allow(clippy::too_many_arguments)] pub fn new( - maximum_transaction_count: u32, + maximum_transaction_count: usize, + mempool_compute_mass_limit: u64, maximum_build_block_template_attempts: u64, transaction_expire_interval_daa_score: u64, transaction_expire_scan_interval_daa_score: u64, @@ -70,6 +73,7 @@ impl Config { ) -> Self { Self { maximum_transaction_count, + mempool_compute_mass_limit, maximum_build_block_template_attempts, transaction_expire_interval_daa_score, transaction_expire_scan_interval_daa_score, @@ -95,6 +99,7 @@ impl Config { pub const fn build_default(target_milliseconds_per_block: u64, relay_non_std_transactions: bool, max_block_mass: u64) -> Self { Self { maximum_transaction_count: DEFAULT_MAXIMUM_TRANSACTION_COUNT, + mempool_compute_mass_limit: DEFAULT_MEMPOOL_COMPUTE_MASS_LIMIT, maximum_build_block_template_attempts: DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS, transaction_expire_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS * 1000 / target_milliseconds_per_block, transaction_expire_scan_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS * 1000 @@ -119,7 +124,7 @@ impl Config { } pub fn apply_ram_scale(mut self, ram_scale: f64) -> Self { - self.maximum_transaction_count = (self.maximum_transaction_count as f64 * ram_scale.min(1.0)) as u32; // Allow only scaling down + self.maximum_transaction_count = (self.maximum_transaction_count as f64 * ram_scale.min(1.0)) as usize; // Allow only scaling down self } diff --git a/mining/src/mempool/model/frontier.rs b/mining/src/mempool/model/frontier.rs index e8d2b54ab..cd719feaa 100644 --- a/mining/src/mempool/model/frontier.rs +++ b/mining/src/mempool/model/frontier.rs @@ -5,12 +5,12 @@ use crate::{ }; use feerate_key::FeerateTransactionKey; -use kaspa_consensus_core::block::TemplateTransactionSelector; +use kaspa_consensus_core::{block::TemplateTransactionSelector, tx::Transaction}; use kaspa_core::trace; use rand::{distributions::Uniform, prelude::Distribution, Rng}; use search_tree::SearchTree; use selectors::{SequenceSelector, SequenceSelectorInput, TakeAllSelector}; -use std::collections::HashSet; +use std::{collections::HashSet, iter::FusedIterator, sync::Arc}; pub(crate) mod feerate_key; pub(crate) mod search_tree; @@ -254,6 +254,10 @@ impl Frontier { } estimator } + + pub fn ascending_iter(&self) -> impl DoubleEndedIterator> + ExactSizeIterator + FusedIterator { + self.search_tree.ascending_iter().map(|key| &key.tx) + } } #[cfg(test)] diff --git a/mining/src/mempool/model/transactions_pool.rs b/mining/src/mempool/model/transactions_pool.rs index 67b9988b4..f4c726157 100644 --- a/mining/src/mempool/model/transactions_pool.rs +++ b/mining/src/mempool/model/transactions_pool.rs @@ -21,7 +21,7 @@ use kaspa_consensus_core::{ use kaspa_core::{time::unix_now, trace, warn}; use std::{ collections::{hash_map::Keys, hash_set::Iter}, - sync::{atomic::AtomicU64, Arc}, + sync::Arc, }; use super::frontier::Frontier; @@ -185,70 +185,44 @@ impl TransactionsPool { self.ready_transactions.build_feerate_estimator(args) } - /// Is the mempool transaction identified by `transaction_id` unchained, thus having no successor? - pub(crate) fn transaction_is_unchained(&self, transaction_id: &TransactionId) -> bool { - if self.all_transactions.contains_key(transaction_id) { - if let Some(chains) = self.chained_transactions.get(transaction_id) { - return chains.is_empty(); - } - return true; - } - false - } - /// Returns the exceeding low-priority transactions having the lowest fee rates in order - /// to have room for at least `free_slots` new transactions. The returned transactions + /// to make room for `transaction`. The returned transactions /// are guaranteed to be unchained (no successor in mempool) and to not be parent of /// `transaction`. /// - /// An error is returned if the mempool is filled with high priority transactions. - pub(crate) fn limit_transaction_count( - &self, - free_slots: usize, - transaction: &MutableTransaction, - ) -> RuleResult> { - assert!(free_slots > 0); + /// An error is returned if the mempool is filled with high priority transactions, or + /// at least one candidate for removal has a higher fee rate than `transaction`. + pub(crate) fn limit_transaction_count(&self, transaction: &MutableTransaction) -> RuleResult> { // Returns a vector of transactions to be removed that the caller has to remove actually. // The caller is golang validateAndInsertTransaction equivalent. // This behavior differs from golang impl. - let trim_size = self.len() + free_slots - usize::min(self.len() + free_slots, self.config.maximum_transaction_count as usize); - let mut transactions_to_remove = Vec::with_capacity(trim_size); - if trim_size > 0 { - // TODO: consider introducing an index on all_transactions low-priority items instead. - // - // Sorting this vector here may be sub-optimal compared with maintaining a sorted - // index of all_transactions low-priority items if the proportion of low-priority txs - // in all_transactions is important. - let low_priority_txs = self - .all_transactions - .values() - .filter(|x| x.priority == Priority::Low && self.transaction_is_unchained(&x.id()) && !x.is_parent_of(transaction)); - - if trim_size == 1 { - // This is the most likely case. Here we just search the minimum, thus avoiding the need to sort altogether. - if let Some(tx) = low_priority_txs.min_by(|a, b| a.fee_rate().partial_cmp(&b.fee_rate()).unwrap()) { - transactions_to_remove.push(tx); - } - } else { - let mut low_priority_txs = low_priority_txs.collect::>(); - if low_priority_txs.len() > trim_size { - low_priority_txs.sort_by(|a, b| a.fee_rate().partial_cmp(&b.fee_rate()).unwrap()); - transactions_to_remove.extend_from_slice(&low_priority_txs[0..usize::min(trim_size, low_priority_txs.len())]); + self.ready_transactions + .ascending_iter() + .map(|tx| self.all_transactions.get(&tx.id()).unwrap()) + .filter(|mtx| mtx.priority == Priority::Low && !mtx.is_parent_of(transaction)) + .scan((self.len() + 1, self.total_compute_mass + transaction.calculated_compute_mass.unwrap()), |state, mtx| { + (*state).0 -= 1; + (*state).1 -= mtx.calculated_compute_mass().unwrap(); + Some((mtx, *state)) + }) + .take_while(|(_, state)| { + self.len() - state.0 > self.config.maximum_transaction_count + && self.total_compute_mass - state.1 > self.config.mempool_compute_mass_limit + }) + .map(|(mtx, _)| { + if mtx.fee_rate() <= transaction.calculated_feerate().unwrap() { + Ok(mtx.id()) } else { - transactions_to_remove = low_priority_txs; + let err = RuleError::RejectMempoolIsFull; + warn!("{}", err); + Err(err) } - } - } - - // An error is returned if the mempool is filled with high priority and other unremovable transactions. - let tx_count = self.len() + free_slots - transactions_to_remove.len(); - if tx_count as u64 > self.config.maximum_transaction_count as u64 { - let err = RuleError::RejectMempoolIsFull(tx_count - free_slots, self.config.maximum_transaction_count as u64); - warn!("{}", err.to_string()); - return Err(err); - } + }) + .collect() + } - Ok(transactions_to_remove.iter().map(|x| x.id()).collect()) + pub(crate) fn get_total_compute_mass(&self) -> u64 { + self.total_compute_mass } pub(crate) fn all_transaction_ids_with_priority(&self, priority: Priority) -> Vec { diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index bcfedc2db..9eddaa0c0 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -73,9 +73,19 @@ impl Mempool { self.validate_transaction_in_context(&transaction)?; // Before adding the transaction, check if there is room in the pool - self.transaction_pool.limit_transaction_count(1, &transaction)?.iter().try_for_each(|x| { - self.remove_transaction(x, true, TxRemovalReason::MakingRoom, format!(" for {}", transaction_id).as_str()) - })?; + for x in self.transaction_pool.limit_transaction_count(&transaction)?.iter() { + self.remove_transaction(x, true, TxRemovalReason::MakingRoom, format!(" for {}", transaction_id).as_str())?; + // self.transaction_pool.limit_transaction_count(&transaction) returns the + // smallest prefix of `ready_transactions` (sorted by ascending fee-rate) + // that makes enough room for `transaction`, but since each call to `self.remove_transaction` + // also removes all transactions dependant on `x` we might already have enough room. + if self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count + && self.transaction_pool.get_total_compute_mass() + transaction.calculated_compute_mass.unwrap() + <= self.config.mempool_compute_mass_limit + { + break; + } + } // Add the transaction to the mempool as a MempoolTransaction and return a clone of the embedded Arc let accepted_transaction = From b4171c64d55cbd1677b0809c49ae8d2c02d9f2e9 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Thu, 22 Aug 2024 20:56:15 +0300 Subject: [PATCH 03/14] Add test_evict --- mining/src/manager_tests.rs | 47 +++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index 0a2f04faf..a5252c163 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -10,7 +10,7 @@ mod tests { model::frontier::selectors::TakeAllSelector, tx::{Orphan, Priority, RbfPolicy}, }, - model::tx_query::TransactionQuery, + model::{tx_insert::TransactionInsertion, tx_query::TransactionQuery}, testutils::consensus_mock::ConsensusMock, MiningCounters, }; @@ -26,7 +26,7 @@ mod tests { subnets::SUBNETWORK_ID_NATIVE, tx::{ scriptvec, MutableTransaction, ScriptPublicKey, Transaction, TransactionId, TransactionInput, TransactionOutpoint, - TransactionOutput, UtxoEntry, + TransactionOutput, UtxoEntry, VerifiableTransaction, }, }; use kaspa_hashes::Hash; @@ -1116,6 +1116,49 @@ mod tests { // TODO: extend the test according to the golang scenario } + #[test] + fn test_evict() { + let consensus = Arc::new(ConsensusMock::new()); + let counters = Arc::new(MiningCounters::default()); + const TX_COUNT: u32 = 10; + const MASS_LIMIT: u64 = 1_000_000; + let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MASS_LIMIT); + config.mempool_compute_mass_limit = TX_COUNT as u64; + let mining_manager = MiningManager::with_config(config, None, counters); + let txs = (0..TX_COUNT) + .map(|i| { + let tx = create_transaction_with_utxo_entry(i, 0).as_verifiable().tx().clone(); + tx + }) + .collect_vec(); + + for tx in txs { + validate_and_insert_transaction(&mining_manager, consensus.as_ref(), tx.clone()).unwrap(); + } + + let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT, 0).as_verifiable().tx().clone(); + heavy_tx.payload = vec![0u8; 5000]; + validate_and_insert_transaction(&mining_manager, consensus.as_ref(), heavy_tx.clone()).unwrap(); + assert!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len() != 0); + assert!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len() < TX_COUNT as usize); + } + + fn validate_and_insert_transaction( + mining_manager: &MiningManager, + consensus: &dyn ConsensusApi, + tx: Transaction, + ) -> Result { + mining_manager.validate_and_insert_transaction(consensus, tx, Priority::Low, Orphan::Allowed, RbfPolicy::Forbidden) + } + + // fn validate_and_insert_mutable_transaction( + // mining_manager: &MiningManager, + // consensus: &dyn ConsensusApi, + // tx: MutableTransaction, + // ) -> Result { + // mining_manager.validate_and_insert_mutable_transaction(consensus, tx, Priority::Low, Orphan::Allowed, RbfPolicy::Forbidden) + // } + fn sweep_compare_modified_template_to_built( consensus: &dyn ConsensusApi, address_prefix: Prefix, From 0f116d012a17da6f97f26063c5b4fd92b0687e26 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Sun, 25 Aug 2024 22:47:29 +0300 Subject: [PATCH 04/14] Some fixes --- mining/src/manager.rs | 5 ++ mining/src/manager_tests.rs | 44 +++++++++--------- mining/src/mempool/mod.rs | 5 ++ mining/src/mempool/model/transactions_pool.rs | 46 +++++++++++-------- .../validate_and_insert_transaction.rs | 14 +++++- mining/src/testutils/consensus_mock.rs | 7 ++- 6 files changed, 75 insertions(+), 46 deletions(-) diff --git a/mining/src/manager.rs b/mining/src/manager.rs index 081854698..725108880 100644 --- a/mining/src/manager.rs +++ b/mining/src/manager.rs @@ -836,6 +836,11 @@ impl MiningManager { pub fn unknown_transactions(&self, transactions: Vec) -> Vec { self.mempool.read().unknown_transactions(transactions) } + + // For tests + pub(crate) fn get_total_compute_mass(&self) -> u64 { + self.mempool.read().get_total_compute_mass() + } } /// Async proxy for the mining manager diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index a5252c163..816ddbcee 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -26,7 +26,7 @@ mod tests { subnets::SUBNETWORK_ID_NATIVE, tx::{ scriptvec, MutableTransaction, ScriptPublicKey, Transaction, TransactionId, TransactionInput, TransactionOutpoint, - TransactionOutput, UtxoEntry, VerifiableTransaction, + TransactionOutput, UtxoEntry, }, }; use kaspa_hashes::Hash; @@ -1121,44 +1121,44 @@ mod tests { let consensus = Arc::new(ConsensusMock::new()); let counters = Arc::new(MiningCounters::default()); const TX_COUNT: u32 = 10; - const MASS_LIMIT: u64 = 1_000_000; + const MASS_LIMIT: u64 = 10_000; let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MASS_LIMIT); - config.mempool_compute_mass_limit = TX_COUNT as u64; + config.mempool_compute_mass_limit = MASS_LIMIT; let mining_manager = MiningManager::with_config(config, None, counters); let txs = (0..TX_COUNT) .map(|i| { - let tx = create_transaction_with_utxo_entry(i, 0).as_verifiable().tx().clone(); + let mut tx = create_transaction_with_utxo_entry(i, 0); + tx.calculated_compute_mass = Some(1000); tx + // let tx = create_transaction_with_utxo_entry(i, 0).as_verifiable().tx().clone(); + // tx }) .collect_vec(); for tx in txs { - validate_and_insert_transaction(&mining_manager, consensus.as_ref(), tx.clone()).unwrap(); + validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), tx).unwrap(); + // validate_and_insert_transaction(&mining_manager, consensus.as_ref(), tx.clone()).unwrap(); + assert!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len() != 0); } - - let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT, 0).as_verifiable().tx().clone(); - heavy_tx.payload = vec![0u8; 5000]; - validate_and_insert_transaction(&mining_manager, consensus.as_ref(), heavy_tx.clone()).unwrap(); - assert!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len() != 0); - assert!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len() < TX_COUNT as usize); + assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); + + let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT, 0); + heavy_tx.calculated_compute_mass = Some(5000); + heavy_tx.calculated_fee = Some(5000); + assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); + validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx.clone()).unwrap(); + assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize - 4); + assert!(mining_manager.get_total_compute_mass() <= MASS_LIMIT); } - fn validate_and_insert_transaction( + fn validate_and_insert_mutable_transaction( mining_manager: &MiningManager, consensus: &dyn ConsensusApi, - tx: Transaction, + tx: MutableTransaction, ) -> Result { - mining_manager.validate_and_insert_transaction(consensus, tx, Priority::Low, Orphan::Allowed, RbfPolicy::Forbidden) + mining_manager.validate_and_insert_mutable_transaction(consensus, tx, Priority::Low, Orphan::Allowed, RbfPolicy::Forbidden) } - // fn validate_and_insert_mutable_transaction( - // mining_manager: &MiningManager, - // consensus: &dyn ConsensusApi, - // tx: MutableTransaction, - // ) -> Result { - // mining_manager.validate_and_insert_mutable_transaction(consensus, tx, Priority::Low, Orphan::Allowed, RbfPolicy::Forbidden) - // } - fn sweep_compare_modified_template_to_built( consensus: &dyn ConsensusApi, address_prefix: Prefix, diff --git a/mining/src/mempool/mod.rs b/mining/src/mempool/mod.rs index e5cd7dbeb..1355169bf 100644 --- a/mining/src/mempool/mod.rs +++ b/mining/src/mempool/mod.rs @@ -162,6 +162,11 @@ impl Mempool { .filter(|transaction_id| !(self.transaction_pool.has(transaction_id) || self.orphan_pool.has(transaction_id))); self.accepted_transactions.unaccepted(&mut not_in_pools_txs) } + + // For tests + pub(crate) fn get_total_compute_mass(&self) -> u64 { + self.transaction_pool.get_total_compute_mass() + } } pub mod tx { diff --git a/mining/src/mempool/model/transactions_pool.rs b/mining/src/mempool/model/transactions_pool.rs index f4c726157..fd52db7a4 100644 --- a/mining/src/mempool/model/transactions_pool.rs +++ b/mining/src/mempool/model/transactions_pool.rs @@ -163,6 +163,7 @@ impl TransactionsPool { // Remove the transaction from the mempool UTXO set self.utxo_set.remove_transaction(&removed_tx.mtx, &parent_ids); + self.total_compute_mass -= removed_tx.calculated_compute_mass().unwrap(); Ok(removed_tx) } @@ -196,29 +197,34 @@ impl TransactionsPool { // Returns a vector of transactions to be removed that the caller has to remove actually. // The caller is golang validateAndInsertTransaction equivalent. // This behavior differs from golang impl. - self.ready_transactions + let mut txs_to_remove = Vec::with_capacity(1); + let mut selected_mass = 0; + let mut num_selected = 0; + for tx in self + .ready_transactions .ascending_iter() .map(|tx| self.all_transactions.get(&tx.id()).unwrap()) .filter(|mtx| mtx.priority == Priority::Low && !mtx.is_parent_of(transaction)) - .scan((self.len() + 1, self.total_compute_mass + transaction.calculated_compute_mass.unwrap()), |state, mtx| { - (*state).0 -= 1; - (*state).1 -= mtx.calculated_compute_mass().unwrap(); - Some((mtx, *state)) - }) - .take_while(|(_, state)| { - self.len() - state.0 > self.config.maximum_transaction_count - && self.total_compute_mass - state.1 > self.config.mempool_compute_mass_limit - }) - .map(|(mtx, _)| { - if mtx.fee_rate() <= transaction.calculated_feerate().unwrap() { - Ok(mtx.id()) - } else { - let err = RuleError::RejectMempoolIsFull; - warn!("{}", err); - Err(err) - } - }) - .collect() + { + if self.len() + 1 - num_selected <= self.config.maximum_transaction_count + && self.total_compute_mass + transaction.calculated_compute_mass.unwrap() - selected_mass + <= self.config.mempool_compute_mass_limit + { + break; + } + + if tx.fee_rate() > transaction.calculated_feerate().unwrap() { + let err = RuleError::RejectMempoolIsFull; + warn!("{}", err); + return Err(err); + } + + txs_to_remove.push(tx.id()); + selected_mass += tx.calculated_compute_mass().unwrap(); + num_selected += 1; + } + + Ok(txs_to_remove) } pub(crate) fn get_total_compute_mass(&self) -> u64 { diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index 9eddaa0c0..d584a4a40 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -23,7 +23,10 @@ impl Mempool { ) -> RuleResult { self.validate_transaction_unacceptance(&transaction)?; // Populate mass in the beginning, it will be used in multiple places throughout the validation and insertion. - transaction.calculated_compute_mass = Some(consensus.calculate_transaction_compute_mass(&transaction.tx)); + // We only populate if it's `None` to allow tests set arbitrary values. + if transaction.calculated_compute_mass.is_none() { + transaction.calculated_compute_mass = Some(consensus.calculate_transaction_compute_mass(&transaction.tx)); + } self.validate_transaction_in_isolation(&transaction)?; let feerate_threshold = self.get_replace_by_fee_constraint(&transaction, rbf_policy)?; self.populate_mempool_entries(&mut transaction); @@ -73,7 +76,8 @@ impl Mempool { self.validate_transaction_in_context(&transaction)?; // Before adding the transaction, check if there is room in the pool - for x in self.transaction_pool.limit_transaction_count(&transaction)?.iter() { + let txs_to_remove = self.transaction_pool.limit_transaction_count(&transaction)?; + for x in txs_to_remove.iter() { self.remove_transaction(x, true, TxRemovalReason::MakingRoom, format!(" for {}", transaction_id).as_str())?; // self.transaction_pool.limit_transaction_count(&transaction) returns the // smallest prefix of `ready_transactions` (sorted by ascending fee-rate) @@ -87,6 +91,12 @@ impl Mempool { } } + assert!( + self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count + && self.transaction_pool.get_total_compute_mass() + transaction.calculated_compute_mass.unwrap() + <= self.config.mempool_compute_mass_limit + ); + // Add the transaction to the mempool as a MempoolTransaction and return a clone of the embedded Arc let accepted_transaction = self.transaction_pool.add_transaction(transaction, consensus.get_virtual_daa_score(), priority)?.mtx.tx.clone(); diff --git a/mining/src/testutils/consensus_mock.rs b/mining/src/testutils/consensus_mock.rs index d68e062a0..28d3f5897 100644 --- a/mining/src/testutils/consensus_mock.rs +++ b/mining/src/testutils/consensus_mock.rs @@ -133,11 +133,14 @@ impl ConsensusApi for ConsensusMock { // At this point we know all UTXO entries are populated, so we can safely calculate the fee let total_in: u64 = mutable_tx.entries.iter().map(|x| x.as_ref().unwrap().amount).sum(); let total_out: u64 = mutable_tx.tx.outputs.iter().map(|x| x.value).sum(); - let calculated_fee = total_in - total_out; mutable_tx .tx .set_mass(self.calculate_transaction_storage_mass(mutable_tx).unwrap() + mutable_tx.calculated_compute_mass.unwrap()); - mutable_tx.calculated_fee = Some(calculated_fee); + + if mutable_tx.calculated_fee.is_none() { + let calculated_fee = total_in - total_out; + mutable_tx.calculated_fee = Some(calculated_fee); + } Ok(()) } From 3f43b6391b6abe50489291c3257332f39eba34a4 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Sun, 25 Aug 2024 23:07:26 +0300 Subject: [PATCH 05/14] clippy --- mining/src/manager.rs | 2 +- mining/src/manager_tests.rs | 5 +---- mining/src/mempool/mod.rs | 2 +- mining/src/mempool/model/transactions_pool.rs | 1 + .../src/mempool/validate_and_insert_transaction.rs | 14 +++++++++----- 5 files changed, 13 insertions(+), 11 deletions(-) diff --git a/mining/src/manager.rs b/mining/src/manager.rs index 725108880..2c09fcef5 100644 --- a/mining/src/manager.rs +++ b/mining/src/manager.rs @@ -837,7 +837,7 @@ impl MiningManager { self.mempool.read().unknown_transactions(transactions) } - // For tests + #[cfg(test)] pub(crate) fn get_total_compute_mass(&self) -> u64 { self.mempool.read().get_total_compute_mass() } diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index 816ddbcee..3aab3671c 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -1130,15 +1130,12 @@ mod tests { let mut tx = create_transaction_with_utxo_entry(i, 0); tx.calculated_compute_mass = Some(1000); tx - // let tx = create_transaction_with_utxo_entry(i, 0).as_verifiable().tx().clone(); - // tx }) .collect_vec(); for tx in txs { validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), tx).unwrap(); - // validate_and_insert_transaction(&mining_manager, consensus.as_ref(), tx.clone()).unwrap(); - assert!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len() != 0); + assert!(!mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.is_empty()); } assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); diff --git a/mining/src/mempool/mod.rs b/mining/src/mempool/mod.rs index 1355169bf..48c1e81c3 100644 --- a/mining/src/mempool/mod.rs +++ b/mining/src/mempool/mod.rs @@ -163,7 +163,7 @@ impl Mempool { self.accepted_transactions.unaccepted(&mut not_in_pools_txs) } - // For tests + #[cfg(test)] pub(crate) fn get_total_compute_mass(&self) -> u64 { self.transaction_pool.get_total_compute_mass() } diff --git a/mining/src/mempool/model/transactions_pool.rs b/mining/src/mempool/model/transactions_pool.rs index fd52db7a4..2f75af68f 100644 --- a/mining/src/mempool/model/transactions_pool.rs +++ b/mining/src/mempool/model/transactions_pool.rs @@ -200,6 +200,7 @@ impl TransactionsPool { let mut txs_to_remove = Vec::with_capacity(1); let mut selected_mass = 0; let mut num_selected = 0; + #[allow(clippy::explicit_counter_loop)] for tx in self .ready_transactions .ascending_iter() diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index d584a4a40..3a2ecdb01 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -83,6 +83,7 @@ impl Mempool { // smallest prefix of `ready_transactions` (sorted by ascending fee-rate) // that makes enough room for `transaction`, but since each call to `self.remove_transaction` // also removes all transactions dependant on `x` we might already have enough room. + #[allow(clippy::int_plus_one)] if self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count && self.transaction_pool.get_total_compute_mass() + transaction.calculated_compute_mass.unwrap() <= self.config.mempool_compute_mass_limit @@ -91,11 +92,14 @@ impl Mempool { } } - assert!( - self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count - && self.transaction_pool.get_total_compute_mass() + transaction.calculated_compute_mass.unwrap() - <= self.config.mempool_compute_mass_limit - ); + #[allow(clippy::int_plus_one)] + { + assert!( + self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count + && self.transaction_pool.get_total_compute_mass() + transaction.calculated_compute_mass.unwrap() + <= self.config.mempool_compute_mass_limit + ); + } // Add the transaction to the mempool as a MempoolTransaction and return a clone of the embedded Arc let accepted_transaction = From dba2e515e542d72610769282573835d5263ca43e Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Mon, 26 Aug 2024 11:41:55 +0300 Subject: [PATCH 06/14] move to estimated size --- consensus/core/src/tx.rs | 11 +++++++++-- mining/src/manager_tests.rs | 13 +++++++------ mining/src/mempool/config.rs | 10 +++++----- mining/src/mempool/mod.rs | 2 +- mining/src/mempool/model/transactions_pool.rs | 19 +++++++++---------- mining/src/mempool/model/tx.rs | 4 ++-- .../validate_and_insert_transaction.rs | 13 ++++++++----- 7 files changed, 41 insertions(+), 31 deletions(-) diff --git a/consensus/core/src/tx.rs b/consensus/core/src/tx.rs index 1d8dd8334..e24e7306c 100644 --- a/consensus/core/src/tx.rs +++ b/consensus/core/src/tx.rs @@ -375,12 +375,13 @@ pub struct MutableTransaction = std::sync::Arc, /// Populated compute mass (does not include the storage mass) pub calculated_compute_mass: Option, + pub estimated_size: Option, } impl> MutableTransaction { pub fn new(tx: T) -> Self { let num_inputs = tx.as_ref().inputs.len(); - Self { tx, entries: vec![None; num_inputs], calculated_fee: None, calculated_compute_mass: None } + Self { tx, entries: vec![None; num_inputs], calculated_fee: None, calculated_compute_mass: None, estimated_size: None } } pub fn id(&self) -> TransactionId { @@ -389,7 +390,13 @@ impl> MutableTransaction { pub fn with_entries(tx: T, entries: Vec) -> Self { assert_eq!(tx.as_ref().inputs.len(), entries.len()); - Self { tx, entries: entries.into_iter().map(Some).collect(), calculated_fee: None, calculated_compute_mass: None } + Self { + tx, + entries: entries.into_iter().map(Some).collect(), + calculated_fee: None, + calculated_compute_mass: None, + estimated_size: None, + } } /// Returns the tx wrapped as a [`VerifiableTransaction`]. Note that this function diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index 3aab3671c..6aab1dc58 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -1121,14 +1121,14 @@ mod tests { let consensus = Arc::new(ConsensusMock::new()); let counters = Arc::new(MiningCounters::default()); const TX_COUNT: u32 = 10; - const MASS_LIMIT: u64 = 10_000; - let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MASS_LIMIT); - config.mempool_compute_mass_limit = MASS_LIMIT; + const SIZE_LIMIT: u64 = 10_000; + let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS); + config.mempool_size_limit = SIZE_LIMIT; let mining_manager = MiningManager::with_config(config, None, counters); let txs = (0..TX_COUNT) .map(|i| { let mut tx = create_transaction_with_utxo_entry(i, 0); - tx.calculated_compute_mass = Some(1000); + tx.estimated_size = Some(1000); tx }) .collect_vec(); @@ -1140,12 +1140,12 @@ mod tests { assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT, 0); - heavy_tx.calculated_compute_mass = Some(5000); + heavy_tx.estimated_size = Some(5000); heavy_tx.calculated_fee = Some(5000); assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx.clone()).unwrap(); assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize - 4); - assert!(mining_manager.get_total_compute_mass() <= MASS_LIMIT); + assert!(mining_manager.get_total_compute_mass() <= SIZE_LIMIT); } fn validate_and_insert_mutable_transaction( @@ -1323,6 +1323,7 @@ mod tests { mutable_tx.calculated_fee = Some(DEFAULT_MINIMUM_RELAY_TRANSACTION_FEE); // Please note: this is the ConsensusMock version of the calculated_mass which differs from Consensus mutable_tx.calculated_compute_mass = Some(transaction_estimated_serialized_size(&mutable_tx.tx)); + mutable_tx.estimated_size = Some(transaction_estimated_serialized_size(&mutable_tx.tx)); mutable_tx.entries[0] = Some(entry); mutable_tx diff --git a/mining/src/mempool/config.rs b/mining/src/mempool/config.rs index cfd07f383..c3efa0f48 100644 --- a/mining/src/mempool/config.rs +++ b/mining/src/mempool/config.rs @@ -1,7 +1,7 @@ use kaspa_consensus_core::constants::TX_VERSION; pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: usize = 1_000_000; -pub(crate) const DEFAULT_MEMPOOL_COMPUTE_MASS_LIMIT: u64 = 500_000_000; // This limits the mempool to about 500 MB. +pub(crate) const DEFAULT_MEMPOOL_SIZE_LIMIT: u64 = 500_000_000; pub(crate) const DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS: u64 = 5; pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 24 * 60 * 60; @@ -28,7 +28,7 @@ pub(crate) const DEFAULT_MAXIMUM_STANDARD_TRANSACTION_VERSION: u16 = TX_VERSION; #[derive(Clone, Debug)] pub struct Config { pub maximum_transaction_count: usize, - pub mempool_compute_mass_limit: u64, + pub mempool_size_limit: u64, pub maximum_build_block_template_attempts: u64, pub transaction_expire_interval_daa_score: u64, pub transaction_expire_scan_interval_daa_score: u64, @@ -52,7 +52,7 @@ impl Config { #[allow(clippy::too_many_arguments)] pub fn new( maximum_transaction_count: usize, - mempool_compute_mass_limit: u64, + mempool_size_limit: u64, maximum_build_block_template_attempts: u64, transaction_expire_interval_daa_score: u64, transaction_expire_scan_interval_daa_score: u64, @@ -73,7 +73,7 @@ impl Config { ) -> Self { Self { maximum_transaction_count, - mempool_compute_mass_limit, + mempool_size_limit, maximum_build_block_template_attempts, transaction_expire_interval_daa_score, transaction_expire_scan_interval_daa_score, @@ -99,7 +99,7 @@ impl Config { pub const fn build_default(target_milliseconds_per_block: u64, relay_non_std_transactions: bool, max_block_mass: u64) -> Self { Self { maximum_transaction_count: DEFAULT_MAXIMUM_TRANSACTION_COUNT, - mempool_compute_mass_limit: DEFAULT_MEMPOOL_COMPUTE_MASS_LIMIT, + mempool_size_limit: DEFAULT_MEMPOOL_SIZE_LIMIT, maximum_build_block_template_attempts: DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS, transaction_expire_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS * 1000 / target_milliseconds_per_block, transaction_expire_scan_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS * 1000 diff --git a/mining/src/mempool/mod.rs b/mining/src/mempool/mod.rs index 48c1e81c3..1d9aaf35b 100644 --- a/mining/src/mempool/mod.rs +++ b/mining/src/mempool/mod.rs @@ -165,7 +165,7 @@ impl Mempool { #[cfg(test)] pub(crate) fn get_total_compute_mass(&self) -> u64 { - self.transaction_pool.get_total_compute_mass() + self.transaction_pool.get_estimated_size() } } diff --git a/mining/src/mempool/model/transactions_pool.rs b/mining/src/mempool/model/transactions_pool.rs index 2f75af68f..af30f5791 100644 --- a/mining/src/mempool/model/transactions_pool.rs +++ b/mining/src/mempool/model/transactions_pool.rs @@ -64,7 +64,7 @@ pub(crate) struct TransactionsPool { /// last expire scan time in milliseconds last_expire_scan_time: u64, - total_compute_mass: u64, + estimated_size: u64, /// Store of UTXOs utxo_set: MempoolUtxoSet, @@ -81,7 +81,7 @@ impl TransactionsPool { last_expire_scan_daa_score: 0, last_expire_scan_time: unix_now(), utxo_set: MempoolUtxoSet::new(), - total_compute_mass: 0, + estimated_size: 0, } } @@ -120,7 +120,7 @@ impl TransactionsPool { } self.utxo_set.add_transaction(&transaction.mtx); - self.total_compute_mass += transaction.calculated_compute_mass().expect("we expect this field to be already calculated"); + self.estimated_size += transaction.estimated_size().expect("we expect this field to be already calculated"); self.all_transactions.insert(id, transaction); trace!("Added transaction {}", id); Ok(()) @@ -163,7 +163,7 @@ impl TransactionsPool { // Remove the transaction from the mempool UTXO set self.utxo_set.remove_transaction(&removed_tx.mtx, &parent_ids); - self.total_compute_mass -= removed_tx.calculated_compute_mass().unwrap(); + self.estimated_size -= removed_tx.estimated_size().unwrap(); Ok(removed_tx) } @@ -198,7 +198,7 @@ impl TransactionsPool { // The caller is golang validateAndInsertTransaction equivalent. // This behavior differs from golang impl. let mut txs_to_remove = Vec::with_capacity(1); - let mut selected_mass = 0; + let mut selected_size = 0; let mut num_selected = 0; #[allow(clippy::explicit_counter_loop)] for tx in self @@ -208,8 +208,7 @@ impl TransactionsPool { .filter(|mtx| mtx.priority == Priority::Low && !mtx.is_parent_of(transaction)) { if self.len() + 1 - num_selected <= self.config.maximum_transaction_count - && self.total_compute_mass + transaction.calculated_compute_mass.unwrap() - selected_mass - <= self.config.mempool_compute_mass_limit + && self.estimated_size + transaction.estimated_size.unwrap() - selected_size <= self.config.mempool_size_limit { break; } @@ -221,15 +220,15 @@ impl TransactionsPool { } txs_to_remove.push(tx.id()); - selected_mass += tx.calculated_compute_mass().unwrap(); + selected_size += tx.estimated_size().unwrap(); num_selected += 1; } Ok(txs_to_remove) } - pub(crate) fn get_total_compute_mass(&self) -> u64 { - self.total_compute_mass + pub(crate) fn get_estimated_size(&self) -> u64 { + self.estimated_size } pub(crate) fn all_transaction_ids_with_priority(&self, priority: Priority) -> Vec { diff --git a/mining/src/mempool/model/tx.rs b/mining/src/mempool/model/tx.rs index 86a68087f..efb15788a 100644 --- a/mining/src/mempool/model/tx.rs +++ b/mining/src/mempool/model/tx.rs @@ -33,8 +33,8 @@ impl MempoolTransaction { transaction.tx.inputs.iter().any(|x| x.previous_outpoint.transaction_id == parent_id) } - pub(crate) fn calculated_compute_mass(&self) -> Option { - self.mtx.calculated_compute_mass + pub(crate) fn estimated_size(&self) -> Option { + self.mtx.estimated_size } } diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index 3a2ecdb01..545d2a8eb 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -10,6 +10,7 @@ use crate::mempool::{ use kaspa_consensus_core::{ api::ConsensusApi, constants::UNACCEPTED_DAA_SCORE, + mass::transaction_estimated_serialized_size, tx::{MutableTransaction, Transaction, TransactionId, TransactionOutpoint, UtxoEntry}, }; use kaspa_core::{debug, info}; @@ -22,11 +23,14 @@ impl Mempool { rbf_policy: RbfPolicy, ) -> RuleResult { self.validate_transaction_unacceptance(&transaction)?; - // Populate mass in the beginning, it will be used in multiple places throughout the validation and insertion. + // Populate mass and estimated_size in the beginning, it will be used in multiple places throughout the validation and insertion. // We only populate if it's `None` to allow tests set arbitrary values. if transaction.calculated_compute_mass.is_none() { transaction.calculated_compute_mass = Some(consensus.calculate_transaction_compute_mass(&transaction.tx)); } + if transaction.estimated_size.is_none() { + transaction.estimated_size = Some(transaction_estimated_serialized_size(&transaction.tx)); + } self.validate_transaction_in_isolation(&transaction)?; let feerate_threshold = self.get_replace_by_fee_constraint(&transaction, rbf_policy)?; self.populate_mempool_entries(&mut transaction); @@ -85,8 +89,7 @@ impl Mempool { // also removes all transactions dependant on `x` we might already have enough room. #[allow(clippy::int_plus_one)] if self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count - && self.transaction_pool.get_total_compute_mass() + transaction.calculated_compute_mass.unwrap() - <= self.config.mempool_compute_mass_limit + && self.transaction_pool.get_estimated_size() + transaction.estimated_size.unwrap() <= self.config.mempool_size_limit { break; } @@ -96,8 +99,8 @@ impl Mempool { { assert!( self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count - && self.transaction_pool.get_total_compute_mass() + transaction.calculated_compute_mass.unwrap() - <= self.config.mempool_compute_mass_limit + && self.transaction_pool.get_estimated_size() + transaction.estimated_size.unwrap() + <= self.config.mempool_size_limit ); } From b7d9aeb433c8206d7267d4660ac5f966996315f5 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Mon, 26 Aug 2024 18:47:59 +0300 Subject: [PATCH 07/14] remove estimated_size field --- consensus/core/src/tx.rs | 11 ++----- mining/errors/src/mempool.rs | 3 ++ mining/src/manager.rs | 4 +-- mining/src/manager_tests.rs | 33 ++++++++++--------- mining/src/mempool/config.rs | 6 ++-- mining/src/mempool/mod.rs | 2 +- mining/src/mempool/model/transactions_pool.rs | 15 +++++---- mining/src/mempool/model/tx.rs | 4 --- .../validate_and_insert_transaction.rs | 27 ++++++++------- 9 files changed, 53 insertions(+), 52 deletions(-) diff --git a/consensus/core/src/tx.rs b/consensus/core/src/tx.rs index e24e7306c..1d8dd8334 100644 --- a/consensus/core/src/tx.rs +++ b/consensus/core/src/tx.rs @@ -375,13 +375,12 @@ pub struct MutableTransaction = std::sync::Arc, /// Populated compute mass (does not include the storage mass) pub calculated_compute_mass: Option, - pub estimated_size: Option, } impl> MutableTransaction { pub fn new(tx: T) -> Self { let num_inputs = tx.as_ref().inputs.len(); - Self { tx, entries: vec![None; num_inputs], calculated_fee: None, calculated_compute_mass: None, estimated_size: None } + Self { tx, entries: vec![None; num_inputs], calculated_fee: None, calculated_compute_mass: None } } pub fn id(&self) -> TransactionId { @@ -390,13 +389,7 @@ impl> MutableTransaction { pub fn with_entries(tx: T, entries: Vec) -> Self { assert_eq!(tx.as_ref().inputs.len(), entries.len()); - Self { - tx, - entries: entries.into_iter().map(Some).collect(), - calculated_fee: None, - calculated_compute_mass: None, - estimated_size: None, - } + Self { tx, entries: entries.into_iter().map(Some).collect(), calculated_fee: None, calculated_compute_mass: None } } /// Returns the tx wrapped as a [`VerifiableTransaction`]. Note that this function diff --git a/mining/errors/src/mempool.rs b/mining/errors/src/mempool.rs index 319aaa484..3ea31a1c2 100644 --- a/mining/errors/src/mempool.rs +++ b/mining/errors/src/mempool.rs @@ -81,6 +81,9 @@ pub enum RuleError { #[error("Rejected tx {0} from mempool due to incomputable storage mass")] RejectStorageMassIncomputable(TransactionId), + + #[error("Rejected tx {0} from mempool because its size ({1}) is greater than the mempool size limit ({2})")] + RejectTxTooBig(TransactionId, usize, usize), } impl From for RuleError { diff --git a/mining/src/manager.rs b/mining/src/manager.rs index 2c09fcef5..a8a51f3cc 100644 --- a/mining/src/manager.rs +++ b/mining/src/manager.rs @@ -838,8 +838,8 @@ impl MiningManager { } #[cfg(test)] - pub(crate) fn get_total_compute_mass(&self) -> u64 { - self.mempool.read().get_total_compute_mass() + pub(crate) fn get_estimated_size(&self) -> usize { + self.mempool.read().get_estimated_size() } } diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index 6aab1dc58..37378737f 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -35,6 +35,7 @@ mod tests { pay_to_address_script, pay_to_script_hash_signature_script, test_helpers::{create_transaction, create_transaction_with_change, op_true_script}, }; + use kaspa_utils::mem_size::MemSizeEstimator; use std::{iter::once, sync::Arc}; use tokio::sync::mpsc::{error::TryRecvError, unbounded_channel}; @@ -1118,20 +1119,16 @@ mod tests { #[test] fn test_evict() { + const TX_COUNT: usize = 10; + let txs = (0..TX_COUNT).map(|i| create_transaction_with_utxo_entry(i as u32, 0)).collect_vec(); + let consensus = Arc::new(ConsensusMock::new()); let counters = Arc::new(MiningCounters::default()); - const TX_COUNT: u32 = 10; - const SIZE_LIMIT: u64 = 10_000; let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS); - config.mempool_size_limit = SIZE_LIMIT; + let tx_size = txs[0].tx.estimate_mem_bytes(); + let size_limit = TX_COUNT * tx_size; + config.mempool_size_limit = size_limit; let mining_manager = MiningManager::with_config(config, None, counters); - let txs = (0..TX_COUNT) - .map(|i| { - let mut tx = create_transaction_with_utxo_entry(i, 0); - tx.estimated_size = Some(1000); - tx - }) - .collect_vec(); for tx in txs { validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), tx).unwrap(); @@ -1139,13 +1136,18 @@ mod tests { } assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); - let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT, 0); - heavy_tx.estimated_size = Some(5000); - heavy_tx.calculated_fee = Some(5000); + let heavy_tx = { + let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32, 0); + let mut inner_tx = (*(heavy_tx.tx)).clone(); + inner_tx.payload = vec![0u8; TX_COUNT / 2 * tx_size - inner_tx.estimate_mem_bytes()]; + heavy_tx.tx = inner_tx.into(); + heavy_tx.calculated_fee = Some(500_000); + heavy_tx + }; assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx.clone()).unwrap(); - assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize - 4); - assert!(mining_manager.get_total_compute_mass() <= SIZE_LIMIT); + assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize - 5); + assert!(mining_manager.get_estimated_size() <= size_limit); } fn validate_and_insert_mutable_transaction( @@ -1323,7 +1325,6 @@ mod tests { mutable_tx.calculated_fee = Some(DEFAULT_MINIMUM_RELAY_TRANSACTION_FEE); // Please note: this is the ConsensusMock version of the calculated_mass which differs from Consensus mutable_tx.calculated_compute_mass = Some(transaction_estimated_serialized_size(&mutable_tx.tx)); - mutable_tx.estimated_size = Some(transaction_estimated_serialized_size(&mutable_tx.tx)); mutable_tx.entries[0] = Some(entry); mutable_tx diff --git a/mining/src/mempool/config.rs b/mining/src/mempool/config.rs index c3efa0f48..1ed3a376e 100644 --- a/mining/src/mempool/config.rs +++ b/mining/src/mempool/config.rs @@ -1,7 +1,7 @@ use kaspa_consensus_core::constants::TX_VERSION; pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: usize = 1_000_000; -pub(crate) const DEFAULT_MEMPOOL_SIZE_LIMIT: u64 = 500_000_000; +pub(crate) const DEFAULT_MEMPOOL_SIZE_LIMIT: usize = 500_000_000; pub(crate) const DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS: u64 = 5; pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 24 * 60 * 60; @@ -28,7 +28,7 @@ pub(crate) const DEFAULT_MAXIMUM_STANDARD_TRANSACTION_VERSION: u16 = TX_VERSION; #[derive(Clone, Debug)] pub struct Config { pub maximum_transaction_count: usize, - pub mempool_size_limit: u64, + pub mempool_size_limit: usize, pub maximum_build_block_template_attempts: u64, pub transaction_expire_interval_daa_score: u64, pub transaction_expire_scan_interval_daa_score: u64, @@ -52,7 +52,7 @@ impl Config { #[allow(clippy::too_many_arguments)] pub fn new( maximum_transaction_count: usize, - mempool_size_limit: u64, + mempool_size_limit: usize, maximum_build_block_template_attempts: u64, transaction_expire_interval_daa_score: u64, transaction_expire_scan_interval_daa_score: u64, diff --git a/mining/src/mempool/mod.rs b/mining/src/mempool/mod.rs index 1d9aaf35b..845b16aa6 100644 --- a/mining/src/mempool/mod.rs +++ b/mining/src/mempool/mod.rs @@ -164,7 +164,7 @@ impl Mempool { } #[cfg(test)] - pub(crate) fn get_total_compute_mass(&self) -> u64 { + pub(crate) fn get_estimated_size(&self) -> usize { self.transaction_pool.get_estimated_size() } } diff --git a/mining/src/mempool/model/transactions_pool.rs b/mining/src/mempool/model/transactions_pool.rs index af30f5791..c0b5bfa57 100644 --- a/mining/src/mempool/model/transactions_pool.rs +++ b/mining/src/mempool/model/transactions_pool.rs @@ -19,6 +19,7 @@ use kaspa_consensus_core::{ tx::{MutableTransaction, TransactionId, TransactionOutpoint}, }; use kaspa_core::{time::unix_now, trace, warn}; +use kaspa_utils::mem_size::MemSizeEstimator; use std::{ collections::{hash_map::Keys, hash_set::Iter}, sync::Arc, @@ -64,7 +65,7 @@ pub(crate) struct TransactionsPool { /// last expire scan time in milliseconds last_expire_scan_time: u64, - estimated_size: u64, + estimated_size: usize, /// Store of UTXOs utxo_set: MempoolUtxoSet, @@ -120,7 +121,7 @@ impl TransactionsPool { } self.utxo_set.add_transaction(&transaction.mtx); - self.estimated_size += transaction.estimated_size().expect("we expect this field to be already calculated"); + self.estimated_size += transaction.mtx.tx.estimate_mem_bytes(); self.all_transactions.insert(id, transaction); trace!("Added transaction {}", id); Ok(()) @@ -163,7 +164,7 @@ impl TransactionsPool { // Remove the transaction from the mempool UTXO set self.utxo_set.remove_transaction(&removed_tx.mtx, &parent_ids); - self.estimated_size -= removed_tx.estimated_size().unwrap(); + self.estimated_size -= removed_tx.mtx.tx.estimate_mem_bytes(); Ok(removed_tx) } @@ -197,6 +198,7 @@ impl TransactionsPool { // Returns a vector of transactions to be removed that the caller has to remove actually. // The caller is golang validateAndInsertTransaction equivalent. // This behavior differs from golang impl. + let transaction_size = transaction.tx.estimate_mem_bytes(); let mut txs_to_remove = Vec::with_capacity(1); let mut selected_size = 0; let mut num_selected = 0; @@ -208,26 +210,27 @@ impl TransactionsPool { .filter(|mtx| mtx.priority == Priority::Low && !mtx.is_parent_of(transaction)) { if self.len() + 1 - num_selected <= self.config.maximum_transaction_count - && self.estimated_size + transaction.estimated_size.unwrap() - selected_size <= self.config.mempool_size_limit + && self.estimated_size + transaction_size - selected_size <= self.config.mempool_size_limit { break; } if tx.fee_rate() > transaction.calculated_feerate().unwrap() { + // panic!("{} {}", tx.fee_rate(), transaction.calculated_feerate().unwrap()); let err = RuleError::RejectMempoolIsFull; warn!("{}", err); return Err(err); } txs_to_remove.push(tx.id()); - selected_size += tx.estimated_size().unwrap(); + selected_size += tx.mtx.tx.estimate_mem_bytes(); num_selected += 1; } Ok(txs_to_remove) } - pub(crate) fn get_estimated_size(&self) -> u64 { + pub(crate) fn get_estimated_size(&self) -> usize { self.estimated_size } diff --git a/mining/src/mempool/model/tx.rs b/mining/src/mempool/model/tx.rs index efb15788a..9b65faeb2 100644 --- a/mining/src/mempool/model/tx.rs +++ b/mining/src/mempool/model/tx.rs @@ -32,10 +32,6 @@ impl MempoolTransaction { let parent_id = self.id(); transaction.tx.inputs.iter().any(|x| x.previous_outpoint.transaction_id == parent_id) } - - pub(crate) fn estimated_size(&self) -> Option { - self.mtx.estimated_size - } } impl RbfPolicy { diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index 545d2a8eb..0faa6747a 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -10,10 +10,10 @@ use crate::mempool::{ use kaspa_consensus_core::{ api::ConsensusApi, constants::UNACCEPTED_DAA_SCORE, - mass::transaction_estimated_serialized_size, tx::{MutableTransaction, Transaction, TransactionId, TransactionOutpoint, UtxoEntry}, }; use kaspa_core::{debug, info}; +use kaspa_utils::mem_size::MemSizeEstimator; impl Mempool { pub(crate) fn pre_validate_and_populate_transaction( @@ -24,13 +24,7 @@ impl Mempool { ) -> RuleResult { self.validate_transaction_unacceptance(&transaction)?; // Populate mass and estimated_size in the beginning, it will be used in multiple places throughout the validation and insertion. - // We only populate if it's `None` to allow tests set arbitrary values. - if transaction.calculated_compute_mass.is_none() { - transaction.calculated_compute_mass = Some(consensus.calculate_transaction_compute_mass(&transaction.tx)); - } - if transaction.estimated_size.is_none() { - transaction.estimated_size = Some(transaction_estimated_serialized_size(&transaction.tx)); - } + transaction.calculated_compute_mass = Some(consensus.calculate_transaction_compute_mass(&transaction.tx)); self.validate_transaction_in_isolation(&transaction)?; let feerate_threshold = self.get_replace_by_fee_constraint(&transaction, rbf_policy)?; self.populate_mempool_entries(&mut transaction); @@ -80,6 +74,7 @@ impl Mempool { self.validate_transaction_in_context(&transaction)?; // Before adding the transaction, check if there is room in the pool + let transaction_size = transaction.tx.estimate_mem_bytes(); let txs_to_remove = self.transaction_pool.limit_transaction_count(&transaction)?; for x in txs_to_remove.iter() { self.remove_transaction(x, true, TxRemovalReason::MakingRoom, format!(" for {}", transaction_id).as_str())?; @@ -89,7 +84,7 @@ impl Mempool { // also removes all transactions dependant on `x` we might already have enough room. #[allow(clippy::int_plus_one)] if self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count - && self.transaction_pool.get_estimated_size() + transaction.estimated_size.unwrap() <= self.config.mempool_size_limit + && self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit { break; } @@ -99,8 +94,12 @@ impl Mempool { { assert!( self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count - && self.transaction_pool.get_estimated_size() + transaction.estimated_size.unwrap() - <= self.config.mempool_size_limit + && self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit, + "Transactions in mempool: {}, max: {}, mempool size: {}, max: {}", + self.transaction_pool.len() + 1, + self.config.maximum_transaction_count, + self.transaction_pool.get_estimated_size() + transaction_size, + self.config.mempool_size_limit, ); } @@ -125,6 +124,12 @@ impl Mempool { if self.transaction_pool.has(&transaction_id) { return Err(RuleError::RejectDuplicate(transaction_id)); } + + let tx_size = transaction.tx.estimate_mem_bytes(); + if tx_size > self.config.mempool_size_limit { + return Err(RuleError::RejectTxTooBig(transaction_id, tx_size, self.config.mempool_size_limit)); + } + if !self.config.accept_non_standard { self.check_transaction_standard_in_isolation(transaction)?; } From 9066e6e4570a4efec564703b2edfb9d2b6c45280 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Mon, 26 Aug 2024 18:49:49 +0300 Subject: [PATCH 08/14] remove clippy allow --- mining/src/mempool/model/transactions_pool.rs | 6 ++--- .../validate_and_insert_transaction.rs | 24 ++++++++----------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/mining/src/mempool/model/transactions_pool.rs b/mining/src/mempool/model/transactions_pool.rs index c0b5bfa57..ecdf0a66f 100644 --- a/mining/src/mempool/model/transactions_pool.rs +++ b/mining/src/mempool/model/transactions_pool.rs @@ -201,13 +201,12 @@ impl TransactionsPool { let transaction_size = transaction.tx.estimate_mem_bytes(); let mut txs_to_remove = Vec::with_capacity(1); let mut selected_size = 0; - let mut num_selected = 0; - #[allow(clippy::explicit_counter_loop)] - for tx in self + for (num_selected, tx) in self .ready_transactions .ascending_iter() .map(|tx| self.all_transactions.get(&tx.id()).unwrap()) .filter(|mtx| mtx.priority == Priority::Low && !mtx.is_parent_of(transaction)) + .enumerate() { if self.len() + 1 - num_selected <= self.config.maximum_transaction_count && self.estimated_size + transaction_size - selected_size <= self.config.mempool_size_limit @@ -224,7 +223,6 @@ impl TransactionsPool { txs_to_remove.push(tx.id()); selected_size += tx.mtx.tx.estimate_mem_bytes(); - num_selected += 1; } Ok(txs_to_remove) diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index 0faa6747a..7b7f0a98f 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -82,26 +82,22 @@ impl Mempool { // smallest prefix of `ready_transactions` (sorted by ascending fee-rate) // that makes enough room for `transaction`, but since each call to `self.remove_transaction` // also removes all transactions dependant on `x` we might already have enough room. - #[allow(clippy::int_plus_one)] - if self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count + if self.transaction_pool.len() < self.config.maximum_transaction_count && self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit { break; } } - #[allow(clippy::int_plus_one)] - { - assert!( - self.transaction_pool.len() + 1 <= self.config.maximum_transaction_count - && self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit, - "Transactions in mempool: {}, max: {}, mempool size: {}, max: {}", - self.transaction_pool.len() + 1, - self.config.maximum_transaction_count, - self.transaction_pool.get_estimated_size() + transaction_size, - self.config.mempool_size_limit, - ); - } + assert!( + self.transaction_pool.len() < self.config.maximum_transaction_count + && self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit, + "Transactions in mempool: {}, max: {}, mempool size: {}, max: {}", + self.transaction_pool.len() + 1, + self.config.maximum_transaction_count, + self.transaction_pool.get_estimated_size() + transaction_size, + self.config.mempool_size_limit, + ); // Add the transaction to the mempool as a MempoolTransaction and return a clone of the embedded Arc let accepted_transaction = From 4dfedc0984b5856de45b6db6f292219f62dca160 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Mon, 26 Aug 2024 20:08:55 +0300 Subject: [PATCH 09/14] Fix clippy warnings --- mining/src/manager_tests.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index 37378737f..c53829e90 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -1134,7 +1134,7 @@ mod tests { validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), tx).unwrap(); assert!(!mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.is_empty()); } - assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); + assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT); let heavy_tx = { let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32, 0); @@ -1144,9 +1144,9 @@ mod tests { heavy_tx.calculated_fee = Some(500_000); heavy_tx }; - assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize); + assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT); validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx.clone()).unwrap(); - assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT as usize - 5); + assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT - 5); assert!(mining_manager.get_estimated_size() <= size_limit); } From 51a709ad400571856968a3f8d32e24cf445ad9af Mon Sep 17 00:00:00 2001 From: Michael Sutton Date: Tue, 27 Aug 2024 16:36:43 +0300 Subject: [PATCH 10/14] Mempool evict policy -- a few fixes (#7) * comments and logs * validate in context before execute rbf + note * use a dedicated mempool_estimated_bytes fn * fix redeemers check * make sure we found enough space (not sure about this, might be better to remove the assert in the calling function and turn into if) * reorganize checks * include tx itself in redeemers --- consensus/core/src/tx.rs | 32 ++++++++ mining/src/manager_tests.rs | 2 +- mining/src/mempool/model/frontier.rs | 1 + mining/src/mempool/model/transactions_pool.rs | 79 ++++++++++++------- mining/src/mempool/model/tx.rs | 5 -- .../validate_and_insert_transaction.rs | 34 +++++--- 6 files changed, 111 insertions(+), 42 deletions(-) diff --git a/consensus/core/src/tx.rs b/consensus/core/src/tx.rs index 1d8dd8334..70f9da09e 100644 --- a/consensus/core/src/tx.rs +++ b/consensus/core/src/tx.rs @@ -8,6 +8,7 @@ pub use script_public_key::{ scriptvec, ScriptPublicKey, ScriptPublicKeyT, ScriptPublicKeyVersion, ScriptPublicKeys, ScriptVec, SCRIPT_VECTOR_SIZE, }; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::SeqCst; use std::{ @@ -437,6 +438,37 @@ impl> MutableTransaction { None } } + + /// A function for estimating the amount of memory bytes used by this transaction (dedicated to mempool usage). + /// We need consistency between estimation calls so only this function should be used for this purpose since + /// `estimate_mem_bytes` is sensitive to pointer wrappers such as Arc + pub fn mempool_estimated_bytes(&self) -> usize { + self.estimate_mem_bytes() + } + + pub fn has_parent(&self, possible_parent: TransactionId) -> bool { + self.tx.as_ref().inputs.iter().any(|x| x.previous_outpoint.transaction_id == possible_parent) + } + + pub fn has_parent_in_set(&self, possible_parents: &HashSet) -> bool { + self.tx.as_ref().inputs.iter().any(|x| possible_parents.contains(&x.previous_outpoint.transaction_id)) + } +} + +impl> MemSizeEstimator for MutableTransaction { + fn estimate_mem_bytes(&self) -> usize { + size_of::() + + self + .entries + .iter() + .map(|op| { + // size_of::>() already counts SCRIPT_VECTOR_SIZE bytes within, so we only add the delta + size_of::>() + + op.as_ref().map_or(0, |e| e.script_public_key.script().len().saturating_sub(SCRIPT_VECTOR_SIZE)) + }) + .sum::() + + self.tx.as_ref().estimate_mem_bytes() + } } impl> AsRef for MutableTransaction { diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index c53829e90..031aa6d9a 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -1125,7 +1125,7 @@ mod tests { let consensus = Arc::new(ConsensusMock::new()); let counters = Arc::new(MiningCounters::default()); let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS); - let tx_size = txs[0].tx.estimate_mem_bytes(); + let tx_size = txs[0].mempool_estimated_bytes(); let size_limit = TX_COUNT * tx_size; config.mempool_size_limit = size_limit; let mining_manager = MiningManager::with_config(config, None, counters); diff --git a/mining/src/mempool/model/frontier.rs b/mining/src/mempool/model/frontier.rs index cd719feaa..3a7a41a07 100644 --- a/mining/src/mempool/model/frontier.rs +++ b/mining/src/mempool/model/frontier.rs @@ -255,6 +255,7 @@ impl Frontier { estimator } + /// Returns an iterator to the transactions in the frontier in increasing feerate order pub fn ascending_iter(&self) -> impl DoubleEndedIterator> + ExactSizeIterator + FusedIterator { self.search_tree.ascending_iter().map(|key| &key.tx) } diff --git a/mining/src/mempool/model/transactions_pool.rs b/mining/src/mempool/model/transactions_pool.rs index ecdf0a66f..5449606c4 100644 --- a/mining/src/mempool/model/transactions_pool.rs +++ b/mining/src/mempool/model/transactions_pool.rs @@ -11,17 +11,17 @@ use crate::{ }, tx::Priority, }, - model::topological_index::TopologicalIndex, + model::{topological_index::TopologicalIndex, TransactionIdSet}, Policy, }; use kaspa_consensus_core::{ block::TemplateTransactionSelector, tx::{MutableTransaction, TransactionId, TransactionOutpoint}, }; -use kaspa_core::{time::unix_now, trace, warn}; -use kaspa_utils::mem_size::MemSizeEstimator; +use kaspa_core::{debug, time::unix_now, trace}; use std::{ collections::{hash_map::Keys, hash_set::Iter}, + iter::once, sync::Arc, }; @@ -92,15 +92,16 @@ impl TransactionsPool { transaction: MutableTransaction, virtual_daa_score: u64, priority: Priority, + transaction_size: usize, ) -> RuleResult<&MempoolTransaction> { let transaction = MempoolTransaction::new(transaction, priority, virtual_daa_score); let id = transaction.id(); - self.add_mempool_transaction(transaction)?; + self.add_mempool_transaction(transaction, transaction_size)?; Ok(self.get(&id).unwrap()) } /// Add a mempool transaction to the pool - pub(crate) fn add_mempool_transaction(&mut self, transaction: MempoolTransaction) -> RuleResult<()> { + pub(crate) fn add_mempool_transaction(&mut self, transaction: MempoolTransaction, transaction_size: usize) -> RuleResult<()> { let id = transaction.id(); assert!(!self.all_transactions.contains_key(&id), "transaction {id} to be added already exists in the transactions pool"); @@ -121,7 +122,7 @@ impl TransactionsPool { } self.utxo_set.add_transaction(&transaction.mtx); - self.estimated_size += transaction.mtx.tx.estimate_mem_bytes(); + self.estimated_size += transaction_size; self.all_transactions.insert(id, transaction); trace!("Added transaction {}", id); Ok(()) @@ -164,7 +165,7 @@ impl TransactionsPool { // Remove the transaction from the mempool UTXO set self.utxo_set.remove_transaction(&removed_tx.mtx, &parent_ids); - self.estimated_size -= removed_tx.mtx.tx.estimate_mem_bytes(); + self.estimated_size -= removed_tx.mtx.mempool_estimated_bytes(); Ok(removed_tx) } @@ -193,39 +194,63 @@ impl TransactionsPool { /// `transaction`. /// /// An error is returned if the mempool is filled with high priority transactions, or - /// at least one candidate for removal has a higher fee rate than `transaction`. - pub(crate) fn limit_transaction_count(&self, transaction: &MutableTransaction) -> RuleResult> { - // Returns a vector of transactions to be removed that the caller has to remove actually. - // The caller is golang validateAndInsertTransaction equivalent. - // This behavior differs from golang impl. - let transaction_size = transaction.tx.estimate_mem_bytes(); - let mut txs_to_remove = Vec::with_capacity(1); - let mut selected_size = 0; - for (num_selected, tx) in self + /// there are not enough lower feerate transactions that can be removed to accommodate `transaction` + pub(crate) fn limit_transaction_count( + &self, + transaction: &MutableTransaction, + transaction_size: usize, + ) -> RuleResult> { + // No eviction needed -- return + if self.len() < self.config.maximum_transaction_count + && self.estimated_size + transaction_size <= self.config.mempool_size_limit + { + return Ok(Default::default()); + } + + // Returns a vector of transactions to be removed (the caller has to actually remove) + let feerate_threshold = transaction.calculated_feerate().unwrap(); + let mut txs_to_remove = Vec::with_capacity(1); // Normally we expect a single removal + let mut selection_overall_size = 0; + for tx in self .ready_transactions .ascending_iter() .map(|tx| self.all_transactions.get(&tx.id()).unwrap()) - .filter(|mtx| mtx.priority == Priority::Low && !mtx.is_parent_of(transaction)) - .enumerate() + .filter(|mtx| mtx.priority == Priority::Low) { - if self.len() + 1 - num_selected <= self.config.maximum_transaction_count - && self.estimated_size + transaction_size - selected_size <= self.config.mempool_size_limit - { - break; + // TODO (optimization): inline the `has_parent_in_set` check within the redeemer traversal and exit early if possible + let redeemers = self.get_redeemer_ids_in_pool(&tx.id()).into_iter().chain(once(tx.id())).collect::(); + if transaction.has_parent_in_set(&redeemers) { + continue; } - if tx.fee_rate() > transaction.calculated_feerate().unwrap() { - // panic!("{} {}", tx.fee_rate(), transaction.calculated_feerate().unwrap()); + // We are iterating ready txs by ascending feerate so the pending tx has lower feerate than all remaining txs + if tx.fee_rate() > feerate_threshold { let err = RuleError::RejectMempoolIsFull; - warn!("{}", err); + debug!("Transaction {} with feerate {} has been rejected: {}", transaction.id(), feerate_threshold, err); return Err(err); } txs_to_remove.push(tx.id()); - selected_size += tx.mtx.tx.estimate_mem_bytes(); + selection_overall_size += tx.mtx.mempool_estimated_bytes(); + + if self.len() + 1 - txs_to_remove.len() <= self.config.maximum_transaction_count + && self.estimated_size + transaction_size - selection_overall_size <= self.config.mempool_size_limit + { + return Ok(txs_to_remove); + } } - Ok(txs_to_remove) + // We could not find sufficient space for the pending transaction + debug!( + "Mempool is filled with high-priority/ancestor txs (count: {}, bytes: {}). Transaction {} with feerate {} and size {} has been rejected: {}", + self.len(), + self.estimated_size, + transaction.id(), + feerate_threshold, + transaction_size, + RuleError::RejectMempoolIsFull + ); + Err(RuleError::RejectMempoolIsFull) } pub(crate) fn get_estimated_size(&self) -> usize { diff --git a/mining/src/mempool/model/tx.rs b/mining/src/mempool/model/tx.rs index 9b65faeb2..27bb87d09 100644 --- a/mining/src/mempool/model/tx.rs +++ b/mining/src/mempool/model/tx.rs @@ -27,11 +27,6 @@ impl MempoolTransaction { assert!(contextual_mass > 0, "expected to be called for validated txs only"); self.mtx.calculated_fee.unwrap() as f64 / contextual_mass as f64 } - - pub(crate) fn is_parent_of(&self, transaction: &MutableTransaction) -> bool { - let parent_id = self.id(); - transaction.tx.inputs.iter().any(|x| x.previous_outpoint.transaction_id == parent_id) - } } impl RbfPolicy { diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index 7b7f0a98f..0eead6244 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -13,7 +13,6 @@ use kaspa_consensus_core::{ tx::{MutableTransaction, Transaction, TransactionId, TransactionOutpoint, UtxoEntry}, }; use kaspa_core::{debug, info}; -use kaspa_utils::mem_size::MemSizeEstimator; impl Mempool { pub(crate) fn pre_validate_and_populate_transaction( @@ -68,20 +67,33 @@ impl Mempool { } } + // Perform mempool in-context validations prior to possible RBF replacements + self.validate_transaction_in_context(&transaction)?; + // Check double spends and try to remove them if the RBF policy requires it let removed_transaction = self.execute_replace_by_fee(&transaction, rbf_policy)?; - self.validate_transaction_in_context(&transaction)?; + // + // Note: there exists a case below where `limit_transaction_count` returns an error signaling that + // this tx should be rejected due to mempool size limits (rather than evicting others). However, + // if this tx happened to be an RBF tx, it might have already caused an eviction in the line + // above. We choose to ignore this rare case for now, as it essentially means that even the increased + // feerate of the replacement tx is very low relative to the mempool overall. + // // Before adding the transaction, check if there is room in the pool - let transaction_size = transaction.tx.estimate_mem_bytes(); - let txs_to_remove = self.transaction_pool.limit_transaction_count(&transaction)?; + let transaction_size = transaction.mempool_estimated_bytes(); + let txs_to_remove = self.transaction_pool.limit_transaction_count(&transaction, transaction_size)?; for x in txs_to_remove.iter() { self.remove_transaction(x, true, TxRemovalReason::MakingRoom, format!(" for {}", transaction_id).as_str())?; // self.transaction_pool.limit_transaction_count(&transaction) returns the // smallest prefix of `ready_transactions` (sorted by ascending fee-rate) // that makes enough room for `transaction`, but since each call to `self.remove_transaction` - // also removes all transactions dependant on `x` we might already have enough room. + // also removes all transactions dependant on `x` we might already have sufficient space, so + // we constantly check the break condition. + // + // Note that self.transaction_pool.len() < self.config.maximum_transaction_count means we have + // at least one available slot in terms of the count limit if self.transaction_pool.len() < self.config.maximum_transaction_count && self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit { @@ -92,7 +104,7 @@ impl Mempool { assert!( self.transaction_pool.len() < self.config.maximum_transaction_count && self.transaction_pool.get_estimated_size() + transaction_size <= self.config.mempool_size_limit, - "Transactions in mempool: {}, max: {}, mempool size: {}, max: {}", + "Transactions in mempool: {}, max: {}, mempool bytes size: {}, max: {}", self.transaction_pool.len() + 1, self.config.maximum_transaction_count, self.transaction_pool.get_estimated_size() + transaction_size, @@ -100,8 +112,12 @@ impl Mempool { ); // Add the transaction to the mempool as a MempoolTransaction and return a clone of the embedded Arc - let accepted_transaction = - self.transaction_pool.add_transaction(transaction, consensus.get_virtual_daa_score(), priority)?.mtx.tx.clone(); + let accepted_transaction = self + .transaction_pool + .add_transaction(transaction, consensus.get_virtual_daa_score(), priority, transaction_size)? + .mtx + .tx + .clone(); Ok(TransactionPostValidation { removed: removed_transaction, accepted: Some(accepted_transaction) }) } @@ -121,7 +137,7 @@ impl Mempool { return Err(RuleError::RejectDuplicate(transaction_id)); } - let tx_size = transaction.tx.estimate_mem_bytes(); + let tx_size = transaction.mempool_estimated_bytes(); if tx_size > self.config.mempool_size_limit { return Err(RuleError::RejectTxTooBig(transaction_id, tx_size, self.config.mempool_size_limit)); } From bb8cfab246605eedd99d7b039a27389086c539ed Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 27 Aug 2024 16:43:49 +0300 Subject: [PATCH 11/14] Add comment to test_evict --- mining/src/manager_tests.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index 031aa6d9a..c6f6bf583 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -1117,6 +1117,11 @@ mod tests { // TODO: extend the test according to the golang scenario } + // This is a sanity test for the mempool eviction policy. We check that if the mempool reached to its maximum + // (in bytes) a high paying transaction will evict as much transactions as needed so it can enter the + // mempool. + // TODO: Add tests to the case where the transaction doesn't have enough fee rate. We need to check that the + // transaction is rejected and the mempool remains untouched. #[test] fn test_evict() { const TX_COUNT: usize = 10; From fd043b8aa688999ff84fcc0280d5e6c0e0183057 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 27 Aug 2024 16:56:12 +0300 Subject: [PATCH 12/14] Add case to test_evict --- mining/src/manager_tests.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index c6f6bf583..32fb327a6 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -1120,8 +1120,8 @@ mod tests { // This is a sanity test for the mempool eviction policy. We check that if the mempool reached to its maximum // (in bytes) a high paying transaction will evict as much transactions as needed so it can enter the // mempool. - // TODO: Add tests to the case where the transaction doesn't have enough fee rate. We need to check that the - // transaction is rejected and the mempool remains untouched. + // TODO: Add a test where we try to add a heavy transaction with fee rate that's higher than some of the mempool + // transactions, but not enough, so the transaction will be rejected nonetheless. #[test] fn test_evict() { const TX_COUNT: usize = 10; @@ -1137,20 +1137,29 @@ mod tests { for tx in txs { validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), tx).unwrap(); - assert!(!mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.is_empty()); } assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT); - let heavy_tx = { + let heavy_tx_low_fee = { let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32, 0); let mut inner_tx = (*(heavy_tx.tx)).clone(); inner_tx.payload = vec![0u8; TX_COUNT / 2 * tx_size - inner_tx.estimate_mem_bytes()]; heavy_tx.tx = inner_tx.into(); - heavy_tx.calculated_fee = Some(500_000); + heavy_tx.calculated_fee = Some(2081); heavy_tx }; + assert!(validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx_low_fee.clone()).is_err()); assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT); - validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx.clone()).unwrap(); + + let heavy_tx_high_fee = { + let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32, 0); + let mut inner_tx = (*(heavy_tx.tx)).clone(); + inner_tx.payload = vec![0u8; TX_COUNT / 2 * tx_size - inner_tx.estimate_mem_bytes()]; + heavy_tx.tx = inner_tx.into(); + heavy_tx.calculated_fee = Some(500_000); + heavy_tx + }; + validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx_high_fee.clone()).unwrap(); assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT - 5); assert!(mining_manager.get_estimated_size() <= size_limit); } From e2894403f70aa5a03433bc01034525f3c12acc25 Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 27 Aug 2024 17:36:38 +0300 Subject: [PATCH 13/14] Remove explicit check for too big transactions --- mining/errors/src/mempool.rs | 3 --- mining/src/manager_tests.rs | 12 +++++++++++- .../src/mempool/validate_and_insert_transaction.rs | 5 ----- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/mining/errors/src/mempool.rs b/mining/errors/src/mempool.rs index 3ea31a1c2..319aaa484 100644 --- a/mining/errors/src/mempool.rs +++ b/mining/errors/src/mempool.rs @@ -81,9 +81,6 @@ pub enum RuleError { #[error("Rejected tx {0} from mempool due to incomputable storage mass")] RejectStorageMassIncomputable(TransactionId), - - #[error("Rejected tx {0} from mempool because its size ({1}) is greater than the mempool size limit ({2})")] - RejectTxTooBig(TransactionId, usize, usize), } impl From for RuleError { diff --git a/mining/src/manager_tests.rs b/mining/src/manager_tests.rs index 32fb327a6..6ddc86e45 100644 --- a/mining/src/manager_tests.rs +++ b/mining/src/manager_tests.rs @@ -1152,7 +1152,7 @@ mod tests { assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT); let heavy_tx_high_fee = { - let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32, 0); + let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32 + 1, 0); let mut inner_tx = (*(heavy_tx.tx)).clone(); inner_tx.payload = vec![0u8; TX_COUNT / 2 * tx_size - inner_tx.estimate_mem_bytes()]; heavy_tx.tx = inner_tx.into(); @@ -1162,6 +1162,16 @@ mod tests { validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx_high_fee.clone()).unwrap(); assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT - 5); assert!(mining_manager.get_estimated_size() <= size_limit); + + let too_big_tx = { + let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32 + 2, 0); + let mut inner_tx = (*(heavy_tx.tx)).clone(); + inner_tx.payload = vec![0u8; size_limit]; + heavy_tx.tx = inner_tx.into(); + heavy_tx.calculated_fee = Some(500_000); + heavy_tx + }; + assert!(validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), too_big_tx.clone()).is_err()); } fn validate_and_insert_mutable_transaction( diff --git a/mining/src/mempool/validate_and_insert_transaction.rs b/mining/src/mempool/validate_and_insert_transaction.rs index 0eead6244..3eddac459 100644 --- a/mining/src/mempool/validate_and_insert_transaction.rs +++ b/mining/src/mempool/validate_and_insert_transaction.rs @@ -137,11 +137,6 @@ impl Mempool { return Err(RuleError::RejectDuplicate(transaction_id)); } - let tx_size = transaction.mempool_estimated_bytes(); - if tx_size > self.config.mempool_size_limit { - return Err(RuleError::RejectTxTooBig(transaction_id, tx_size, self.config.mempool_size_limit)); - } - if !self.config.accept_non_standard { self.check_transaction_standard_in_isolation(transaction)?; } From 29c3d61b507a4acd80580a111756b1d4d04048db Mon Sep 17 00:00:00 2001 From: Ori Newman Date: Tue, 27 Aug 2024 17:37:21 +0300 Subject: [PATCH 14/14] Raise DEFAULT_MEMPOOL_SIZE_LIMIT to 1GB --- mining/src/mempool/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mining/src/mempool/config.rs b/mining/src/mempool/config.rs index 1ed3a376e..b90a36577 100644 --- a/mining/src/mempool/config.rs +++ b/mining/src/mempool/config.rs @@ -1,7 +1,7 @@ use kaspa_consensus_core::constants::TX_VERSION; pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: usize = 1_000_000; -pub(crate) const DEFAULT_MEMPOOL_SIZE_LIMIT: usize = 500_000_000; +pub(crate) const DEFAULT_MEMPOOL_SIZE_LIMIT: usize = 1_000_000_000; pub(crate) const DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS: u64 = 5; pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 24 * 60 * 60;