Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mempool tweaks #524

Merged
merged 14 commits into from
Aug 27, 2024
32 changes: 32 additions & 0 deletions consensus/core/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub use script_public_key::{
scriptvec, ScriptPublicKey, ScriptPublicKeyT, ScriptPublicKeyVersion, ScriptPublicKeys, ScriptVec, SCRIPT_VECTOR_SIZE,
};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::SeqCst;
use std::{
Expand Down Expand Up @@ -437,6 +438,37 @@ impl<T: AsRef<Transaction>> MutableTransaction<T> {
None
}
}

/// A function for estimating the amount of memory bytes used by this transaction (dedicated to mempool usage).
/// We need consistency between estimation calls so only this function should be used for this purpose since
/// `estimate_mem_bytes` is sensitive to pointer wrappers such as Arc
pub fn mempool_estimated_bytes(&self) -> usize {
self.estimate_mem_bytes()
}

pub fn has_parent(&self, possible_parent: TransactionId) -> bool {
self.tx.as_ref().inputs.iter().any(|x| x.previous_outpoint.transaction_id == possible_parent)
}

pub fn has_parent_in_set(&self, possible_parents: &HashSet<TransactionId>) -> bool {
self.tx.as_ref().inputs.iter().any(|x| possible_parents.contains(&x.previous_outpoint.transaction_id))
}
}

impl<T: AsRef<Transaction>> MemSizeEstimator for MutableTransaction<T> {
fn estimate_mem_bytes(&self) -> usize {
size_of::<Self>()
+ self
.entries
.iter()
.map(|op| {
// size_of::<Option<UtxoEntry>>() already counts SCRIPT_VECTOR_SIZE bytes within, so we only add the delta
size_of::<Option<UtxoEntry>>()
+ op.as_ref().map_or(0, |e| e.script_public_key.script().len().saturating_sub(SCRIPT_VECTOR_SIZE))
})
.sum::<usize>()
+ self.tx.as_ref().estimate_mem_bytes()
}
}

impl<T: AsRef<Transaction>> AsRef<Transaction> for MutableTransaction<T> {
Expand Down
6 changes: 3 additions & 3 deletions mining/errors/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ pub enum RuleError {
#[error("replace by fee found more than one double spending transaction in the mempool")]
RejectRbfTooManyDoubleSpendingTransactions,

/// New behavior: a transaction is rejected if the mempool is full
#[error("number of high-priority transactions in mempool ({0}) has reached the maximum allowed ({1})")]
RejectMempoolIsFull(usize, u64),
/// a transaction is rejected if the mempool is full
#[error("transaction could not be added to the mempool because it's full with transactions with higher priority")]
RejectMempoolIsFull,

/// An error emitted by mining\src\mempool\check_transaction_standard.rs
#[error("transaction {0} is not standard: {1}")]
Expand Down
5 changes: 5 additions & 0 deletions mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,11 @@ impl MiningManager {
pub fn unknown_transactions(&self, transactions: Vec<TransactionId>) -> Vec<TransactionId> {
self.mempool.read().unknown_transactions(transactions)
}

#[cfg(test)]
pub(crate) fn get_estimated_size(&self) -> usize {
self.mempool.read().get_estimated_size()
}
}

/// Async proxy for the mining manager
Expand Down
68 changes: 67 additions & 1 deletion mining/src/manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod tests {
model::frontier::selectors::TakeAllSelector,
tx::{Orphan, Priority, RbfPolicy},
},
model::tx_query::TransactionQuery,
model::{tx_insert::TransactionInsertion, tx_query::TransactionQuery},
testutils::consensus_mock::ConsensusMock,
MiningCounters,
};
Expand All @@ -35,6 +35,7 @@ mod tests {
pay_to_address_script, pay_to_script_hash_signature_script,
test_helpers::{create_transaction, create_transaction_with_change, op_true_script},
};
use kaspa_utils::mem_size::MemSizeEstimator;
use std::{iter::once, sync::Arc};
use tokio::sync::mpsc::{error::TryRecvError, unbounded_channel};

Expand Down Expand Up @@ -1116,6 +1117,71 @@ mod tests {
// TODO: extend the test according to the golang scenario
}

// This is a sanity test for the mempool eviction policy. We check that if the mempool reached to its maximum
// (in bytes) a high paying transaction will evict as much transactions as needed so it can enter the
// mempool.
// TODO: Add a test where we try to add a heavy transaction with fee rate that's higher than some of the mempool
// transactions, but not enough, so the transaction will be rejected nonetheless.
#[test]
fn test_evict() {
coderofstuff marked this conversation as resolved.
Show resolved Hide resolved
const TX_COUNT: usize = 10;
let txs = (0..TX_COUNT).map(|i| create_transaction_with_utxo_entry(i as u32, 0)).collect_vec();

let consensus = Arc::new(ConsensusMock::new());
let counters = Arc::new(MiningCounters::default());
let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS);
let tx_size = txs[0].mempool_estimated_bytes();
let size_limit = TX_COUNT * tx_size;
config.mempool_size_limit = size_limit;
let mining_manager = MiningManager::with_config(config, None, counters);

for tx in txs {
validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), tx).unwrap();
}
assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT);

let heavy_tx_low_fee = {
let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32, 0);
let mut inner_tx = (*(heavy_tx.tx)).clone();
inner_tx.payload = vec![0u8; TX_COUNT / 2 * tx_size - inner_tx.estimate_mem_bytes()];
heavy_tx.tx = inner_tx.into();
heavy_tx.calculated_fee = Some(2081);
heavy_tx
};
assert!(validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx_low_fee.clone()).is_err());
assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT);

let heavy_tx_high_fee = {
let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32 + 1, 0);
let mut inner_tx = (*(heavy_tx.tx)).clone();
inner_tx.payload = vec![0u8; TX_COUNT / 2 * tx_size - inner_tx.estimate_mem_bytes()];
heavy_tx.tx = inner_tx.into();
heavy_tx.calculated_fee = Some(500_000);
heavy_tx
};
validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), heavy_tx_high_fee.clone()).unwrap();
assert_eq!(mining_manager.get_all_transactions(TransactionQuery::TransactionsOnly).0.len(), TX_COUNT - 5);
assert!(mining_manager.get_estimated_size() <= size_limit);

let too_big_tx = {
let mut heavy_tx = create_transaction_with_utxo_entry(TX_COUNT as u32 + 2, 0);
let mut inner_tx = (*(heavy_tx.tx)).clone();
inner_tx.payload = vec![0u8; size_limit];
heavy_tx.tx = inner_tx.into();
heavy_tx.calculated_fee = Some(500_000);
heavy_tx
};
assert!(validate_and_insert_mutable_transaction(&mining_manager, consensus.as_ref(), too_big_tx.clone()).is_err());
}

fn validate_and_insert_mutable_transaction(
mining_manager: &MiningManager,
consensus: &dyn ConsensusApi,
tx: MutableTransaction,
) -> Result<TransactionInsertion, MiningManagerError> {
mining_manager.validate_and_insert_mutable_transaction(consensus, tx, Priority::Low, Orphan::Allowed, RbfPolicy::Forbidden)
}

fn sweep_compare_modified_template_to_built(
consensus: &dyn ConsensusApi,
address_prefix: Prefix,
Expand Down
19 changes: 11 additions & 8 deletions mining/src/mempool/config.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
use kaspa_consensus_core::constants::TX_VERSION;

pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: u32 = 1_000_000;
pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: usize = 1_000_000;
pub(crate) const DEFAULT_MEMPOOL_SIZE_LIMIT: usize = 1_000_000_000;
pub(crate) const DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS: u64 = 5;

pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 60;
pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 24 * 60 * 60;
pub(crate) const DEFAULT_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10;
pub(crate) const DEFAULT_ACCEPTED_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 120;
pub(crate) const DEFAULT_ACCEPTED_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10;
pub(crate) const DEFAULT_ORPHAN_EXPIRE_INTERVAL_SECONDS: u64 = 60;
pub(crate) const DEFAULT_ORPHAN_EXPIRE_SCAN_INTERVAL_SECONDS: u64 = 10;

pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_MASS: u64 = 100_000;

// TODO: when rusty-kaspa nodes run most of the network, consider increasing this value
pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_COUNT: u64 = 50;
pub(crate) const DEFAULT_MAXIMUM_ORPHAN_TRANSACTION_COUNT: u64 = 500;

/// DEFAULT_MINIMUM_RELAY_TRANSACTION_FEE specifies the minimum transaction fee for a transaction to be accepted to
/// the mempool and relayed. It is specified in sompi per 1kg (or 1000 grams) of transaction mass.
Expand All @@ -28,7 +27,8 @@ pub(crate) const DEFAULT_MAXIMUM_STANDARD_TRANSACTION_VERSION: u16 = TX_VERSION;

#[derive(Clone, Debug)]
pub struct Config {
pub maximum_transaction_count: u32,
pub maximum_transaction_count: usize,
pub mempool_size_limit: usize,
pub maximum_build_block_template_attempts: u64,
pub transaction_expire_interval_daa_score: u64,
pub transaction_expire_scan_interval_daa_score: u64,
Expand All @@ -51,7 +51,8 @@ pub struct Config {
impl Config {
#[allow(clippy::too_many_arguments)]
pub fn new(
maximum_transaction_count: u32,
maximum_transaction_count: usize,
mempool_size_limit: usize,
maximum_build_block_template_attempts: u64,
transaction_expire_interval_daa_score: u64,
transaction_expire_scan_interval_daa_score: u64,
Expand All @@ -72,6 +73,7 @@ impl Config {
) -> Self {
Self {
maximum_transaction_count,
mempool_size_limit,
maximum_build_block_template_attempts,
transaction_expire_interval_daa_score,
transaction_expire_scan_interval_daa_score,
Expand All @@ -97,6 +99,7 @@ impl Config {
pub const fn build_default(target_milliseconds_per_block: u64, relay_non_std_transactions: bool, max_block_mass: u64) -> Self {
Self {
maximum_transaction_count: DEFAULT_MAXIMUM_TRANSACTION_COUNT,
mempool_size_limit: DEFAULT_MEMPOOL_SIZE_LIMIT,
maximum_build_block_template_attempts: DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS,
transaction_expire_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS * 1000 / target_milliseconds_per_block,
transaction_expire_scan_interval_daa_score: DEFAULT_TRANSACTION_EXPIRE_SCAN_INTERVAL_SECONDS * 1000
Expand All @@ -121,7 +124,7 @@ impl Config {
}

pub fn apply_ram_scale(mut self, ram_scale: f64) -> Self {
self.maximum_transaction_count = (self.maximum_transaction_count as f64 * ram_scale.min(1.0)) as u32; // Allow only scaling down
self.maximum_transaction_count = (self.maximum_transaction_count as f64 * ram_scale.min(1.0)) as usize; // Allow only scaling down
self
}

Expand Down
5 changes: 5 additions & 0 deletions mining/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ impl Mempool {
.filter(|transaction_id| !(self.transaction_pool.has(transaction_id) || self.orphan_pool.has(transaction_id)));
self.accepted_transactions.unaccepted(&mut not_in_pools_txs)
}

#[cfg(test)]
pub(crate) fn get_estimated_size(&self) -> usize {
self.transaction_pool.get_estimated_size()
}
}

pub mod tx {
Expand Down
9 changes: 7 additions & 2 deletions mining/src/mempool/model/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ use crate::{
};

use feerate_key::FeerateTransactionKey;
use kaspa_consensus_core::block::TemplateTransactionSelector;
use kaspa_consensus_core::{block::TemplateTransactionSelector, tx::Transaction};
use kaspa_core::trace;
use rand::{distributions::Uniform, prelude::Distribution, Rng};
use search_tree::SearchTree;
use selectors::{SequenceSelector, SequenceSelectorInput, TakeAllSelector};
use std::collections::HashSet;
use std::{collections::HashSet, iter::FusedIterator, sync::Arc};

pub(crate) mod feerate_key;
pub(crate) mod search_tree;
Expand Down Expand Up @@ -254,6 +254,11 @@ impl Frontier {
}
estimator
}

/// Returns an iterator to the transactions in the frontier in increasing feerate order
pub fn ascending_iter(&self) -> impl DoubleEndedIterator<Item = &Arc<Transaction>> + ExactSizeIterator + FusedIterator {
self.search_tree.ascending_iter().map(|key| &key.tx)
}
}

#[cfg(test)]
Expand Down
Loading