Skip to content

Commit

Permalink
mempool tweaks
Browse files Browse the repository at this point in the history
* add case to test_evict
* remove explicit check for too big transactions
* raise DEFAULT_MEMPOOL_SIZE_LIMIT to 1GB
* move to estimated size
* remove estimated_size field
  • Loading branch information
x100111010 committed Oct 20, 2024
1 parent 8d49801 commit 1ced37a
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 84 deletions.
32 changes: 32 additions & 0 deletions consensus/core/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use serde::{Deserialize, Serialize};
use spectre_utils::hex::ToHex;
use spectre_utils::mem_size::MemSizeEstimator;
use spectre_utils::{serde_bytes, serde_bytes_fixed_ref};
use std::collections::HashSet;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::SeqCst;
use std::{
Expand Down Expand Up @@ -437,6 +438,37 @@ impl<T: AsRef<Transaction>> MutableTransaction<T> {
None
}
}

/// A function for estimating the amount of memory bytes used by this transaction (dedicated to mempool usage).
/// We need consistency between estimation calls so only this function should be used for this purpose since
/// `estimate_mem_bytes` is sensitive to pointer wrappers such as Arc
pub fn mempool_estimated_bytes(&self) -> usize {
self.estimate_mem_bytes()
}

pub fn has_parent(&self, possible_parent: TransactionId) -> bool {
self.tx.as_ref().inputs.iter().any(|x| x.previous_outpoint.transaction_id == possible_parent)
}

pub fn has_parent_in_set(&self, possible_parents: &HashSet<TransactionId>) -> bool {
self.tx.as_ref().inputs.iter().any(|x| possible_parents.contains(&x.previous_outpoint.transaction_id))
}
}

impl<T: AsRef<Transaction>> MemSizeEstimator for MutableTransaction<T> {
fn estimate_mem_bytes(&self) -> usize {
size_of::<Self>()
+ self
.entries
.iter()
.map(|op| {
// size_of::<Option<UtxoEntry>>() already counts SCRIPT_VECTOR_SIZE bytes within, so we only add the delta
size_of::<Option<UtxoEntry>>()
+ op.as_ref().map_or(0, |e| e.script_public_key.script().len().saturating_sub(SCRIPT_VECTOR_SIZE))
})
.sum::<usize>()
+ self.tx.as_ref().estimate_mem_bytes()
}
}

impl<T: AsRef<Transaction>> AsRef<Transaction> for MutableTransaction<T> {
Expand Down
6 changes: 3 additions & 3 deletions mining/errors/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ pub enum RuleError {
#[error("replace by fee found more than one double spending transaction in the mempool")]
RejectRbfTooManyDoubleSpendingTransactions,

/// New behavior: a transaction is rejected if the mempool is full
#[error("number of high-priority transactions in mempool ({0}) has reached the maximum allowed ({1})")]
RejectMempoolIsFull(usize, u64),
/// a transaction is rejected if the mempool is full
#[error("transaction could not be added to the mempool because it's full with transactions with higher priority")]
RejectMempoolIsFull,

/// An error emitted by mining\src\mempool\check_transaction_standard.rs
#[error("transaction {0} is not standard: {1}")]
Expand Down
5 changes: 5 additions & 0 deletions mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,11 @@ impl MiningManager {
pub fn unknown_transactions(&self, transactions: Vec<TransactionId>) -> Vec<TransactionId> {
self.mempool.read().unknown_transactions(transactions)
}

#[cfg(test)]
pub(crate) fn get_estimated_size(&self) -> usize {
self.mempool.read().get_estimated_size()
}
}

/// Async proxy for the mining manager
Expand Down
68 changes: 67 additions & 1 deletion mining/src/manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod tests {
model::frontier::selectors::TakeAllSelector,
tx::{Orphan, Priority, RbfPolicy},
},
model::tx_query::TransactionQuery,
model::{tx_insert::TransactionInsertion, tx_query::TransactionQuery},
testutils::consensus_mock::ConsensusMock,
MiningCounters,
};
Expand All @@ -35,6 +35,7 @@ mod tests {
pay_to_address_script, pay_to_script_hash_signature_script,
test_helpers::{create_transaction, create_transaction_with_change, op_true_script},
};
use spectre_utils::mem_size::MemSizeEstimator;
use std::{iter::once, sync::Arc};
use tokio::sync::mpsc::{error::TryRecvError, unbounded_channel};

Expand Down Expand Up @@ -1116,6 +1117,71 @@ mod tests {
// TODO: extend the test according to the golang scenario
}

// This is a sanity test for the mempool eviction policy. We check that if the mempool reached to its maximum
// (in bytes) a high paying transaction will evict as much transactions as needed so it can enter the
// mempool.
// TODO: Add a test where we try to add a heavy transaction with fee rate that's higher than some of the mempool
// transactions, but not enough, so the transaction will be rejected nonetheless.
#[test]
fn test_evict() {
const TX_COUNT: usize = 10;
let txs = (0..TX_COUNT).map(|i| create_transaction_with_utxo_entry(i as u32, 0)).collect_vec();

let consensus = Arc::new(ConsensusMock::new());
let counters = Arc::new(MiningCounters::default());
let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS);
let tx_size = txs[0].mempool_estimated_bytes();
let size_limit = TX_COUNT * tx_size;
config.mempool_size_limit = size_limit;
let mining_manager = MiningManager::with_config(config, None, counters);

for tx in txs {
validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), tx).unwrap();
}
assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT);

let heavy_tx_low_fee = {
let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32, 0);
let mut inner_tx = (*(heavy_tx.tx)).clone();
inner_tx.payload = vec![0u8; TX_COUNT / 2 * tx_size - inner_tx.estimate_mem_bytes()];
heavy_tx.tx = inner_tx.into();
heavy_tx.calculated_fee = Some(2081);
heavy_tx
};
assert!(validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx_low_fee.clone()).is_err());
assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT);

let heavy_tx_high_fee = {
let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32 + 1, 0);
let mut inner_tx = (*(heavy_tx.tx)).clone();
inner_tx.payload = vec![0u8; TX_COUNT / 2 * tx_size - inner_tx.estimate_mem_bytes()];
heavy_tx.tx = inner_tx.into();
heavy_tx.calculated_fee = Some(500_000);
heavy_tx
};
validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx_high_fee.clone()).unwrap();
assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT - 5);
assert!(mining_manager.get_estimated_size() <= size_limit);

let too_big_tx = {
let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32 + 2, 0);
let mut inner_tx = (*(heavy_tx.tx)).clone();
inner_tx.payload = vec![0u8; size_limit];
heavy_tx.tx = inner_tx.into();
heavy_tx.calculated_fee = Some(500_000);
heavy_tx
};
assert!(validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), too_big_tx.clone()).is_err());
}

fn validate_and_insert_mutable_transaction(
mining_manager: &MiningManager,
consensus: &dyn ConsensusApi,
tx: MutableTransaction,
) -> Result<TransactionInsertion, MiningManagerError> {
mining_manager.validate_and_insert_mutable_transaction(consensus, tx, Priority::Low, Orphan::Allowed, RbfPolicy::Forbidden)
}

fn sweep_compare_modified_template_to_built(
consensus: &dyn ConsensusApi,
address_prefix: Prefix,
Expand Down
19 changes: 11 additions & 8 deletions mining/src/mempool/config.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use spectre_consensus_core::constants::TX_VERSION;

pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: u32 = 1_000_000;
pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: usize = 1_000_000;
pub(crate) const DEFAULT_MEMPOOL_SIZE_LIMIT: usize = 1_000_000_000;
pub(crate) const DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS: u64 = 5;

pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 60;
pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 24 * 60 * 60;
pub(crate) const DEFAULT_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10;
pub(crate) const DEFAULT_ACCEPTED_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 120;
pub(crate) const DEFAULT_ACCEPTED_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10;
pub(crate) const DEFAULT_ORPHAN_EXPIRE_INTERVAL_SECONDS: u64 = 60;
pub(crate) const DEFAULT_ORPHAN_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10;

pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_MASS: u64 = 100_000;

// TODO: when rusty-spectre nodes run most of the network, consider increasing this value
pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_COUNT: u64 = 50;
pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_COUNT: u64 = 500;

/// DEFAULT_MINIMUM_RELAY_TRANSACTION_FEE specifies the minimum transaction fee for a transaction to be accepted to
/// the mempool and relayed. It is specified in sompi per 1kg (or 1000 grams) of transaction mass.
Expand All @@ -28,7 +27,8 @@ pub(crate) const DEFAULT_MAXIMUM_STANDARD_TRANSACTION_VERSION: u16 = TX_VERSION;

#[derive(Clone, Debug)]
pub struct Config {
pub maximum_transaction_count: u32,
pub maximum_transaction_count: usize,
pub mempool_size_limit: usize,
pub maximum_build_block_template_attempts: u64,
pub transaction_expire_interval_daa_score: u64,
pub transaction_expire_scan_interval_daa_score: u64,
Expand All @@ -51,7 +51,8 @@ pub struct Config {
impl Config {
#[allow(clippy::too_many_arguments)]
pub fn new(
maximum_transaction_count: u32,
maximum_transaction_count: usize,
mempool_size_limit: usize,
maximum_build_block_template_attempts: u64,
transaction_expire_interval_daa_score: u64,
transaction_expire_scan_interval_daa_score: u64,
Expand All @@ -72,6 +73,7 @@ impl Config {
) -> Self {
Self {
maximum_transaction_count,
mempool_size_limit,
maximum_build_block_template_attempts,
transaction_expire_interval_daa_score,
transaction_expire_scan_interval_daa_score,
Expand All @@ -97,6 +99,7 @@ impl Config {
pub const fn build_default(target_milliseconds_per_block: u64, relay_non_std_transactions: bool, max_block_mass: u64) -> Self {
Self {
maximum_transaction_count: DEFAULT_MAXIMUM_TRANSACTION_COUNT,
mempool_size_limit: DEFAULT_MEMPOOL_SIZE_LIMIT,
maximum_build_block_template_attempts: DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS,
transaction_expire_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS * 1000 / target_milliseconds_per_block,
transaction_expire_scan_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS * 1000
Expand All @@ -121,7 +124,7 @@ impl Config {
}

pub fn apply_ram_scale(mut self, ram_scale: f64) -> Self {
self.maximum_transaction_count = (self.maximum_transaction_count as f64 * ram_scale.min(1.0)) as u32; // Allow only scaling down
self.maximum_transaction_count = (self.maximum_transaction_count as f64 * ram_scale.min(1.0)) as usize; // Allow only scaling down
self
}

Expand Down
5 changes: 5 additions & 0 deletions mining/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ impl Mempool {
.filter(|transaction_id| !(self.transaction_pool.has(transaction_id) || self.orphan_pool.has(transaction_id)));
self.accepted_transactions.unaccepted(&mut not_in_pools_txs)
}

#[cfg(test)]
pub(crate) fn get_estimated_size(&self) -> usize {
self.transaction_pool.get_estimated_size()
}
}

pub mod tx {
Expand Down
9 changes: 7 additions & 2 deletions mining/src/mempool/model/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use feerate_key::FeerateTransactionKey;
use rand::{distributions::Uniform, prelude::Distribution, Rng};
use search_tree::SearchTree;
use selectors::{SequenceSelector, SequenceSelectorInput, TakeAllSelector};
use spectre_consensus_core::block::TemplateTransactionSelector;
use spectre_consensus_core::{block::TemplateTransactionSelector, tx::Transaction};
use spectre_core::trace;
use std::collections::HashSet;
use std::{collections::HashSet, iter::FusedIterator, sync::Arc};

pub(crate) mod feerate_key;
pub(crate) mod search_tree;
Expand Down Expand Up @@ -254,6 +254,11 @@ impl Frontier {
}
estimator
}

/// Returns an iterator to the transactions in the frontier in increasing feerate order
pub fn ascending_iter(&self) -> impl DoubleEndedIterator<Item = &Arc<Transaction>> + ExactSizeIterator + FusedIterator {
self.search_tree.ascending_iter().map(|key| &key.tx)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 1ced37a

Please sign in to comment.