Skip to content

Commit

Permalink
Enhance Mempool performance (kaspanet#226)
Browse files Browse the repository at this point in the history
* Split mempool atomic validate and insert transaction in 3 steps

* Process tx relay flow received txs in batch

* Use a single blocking task per MiningManagerProxy fn

* Split parallel txs validation in chunks of max block mass

* Abstract expire_low_priority_transactions into Pool trait

* Making room in the mempool for a new transaction won't remove chained txs nor parent txs of the new transaction

* Refine lock granularity on Mempool and Consensus while processing unorphaned transactions (wip)

* Fix failing test

* Enhance performance & refine lock granularity on Mempool and Consensus while revalidating high priority transactions

* Comments

* Fix upper bound of transactions chunk

* Ensure a chunk has at least 1 tx

* Prevent add twice the same tx to the mempool

* Clear transaction entries before revalidation

* Add some logs and comments

* Add logs to debug transactions removals

* On accepted block do not remove orphan tx redeemers

* Add 2 TODOs

* Fix a bug of high priority transactions being unexpectedly orphaned or rejected

* Refactor transaction removal reason into an enum

* Add an accepted transaction ids cache to the mempool and use it to prevent reentrance in mempool, broadcasting to and asking from peers

* Improve the filtering of unknown transactions in tx relay

* Enhance tx removal logging

* Add mempool stats

* Process new and unorphaned blocks in topological order

* Run revalidation of HP txs in a dedicated task

* Some profiling and debug logs

* Run expiration of LP txs in a dedicated task

* remove some stopwatch calls which were timing locks

* crucial: fix exploding complexity of `handle_new_block_transactions`/`remove_transaction`

* fixes in `on_new_block`

* refactor block template cache into `Inner`

* make `block_template_cache` a non-blocking call (never blocks)

* Log build_block_template retries

* While revalidating HP txs, only recheck transaction entries

* Fix accepted count during revalidation

* mempool bmk: use client pools + various improvements

* Improve the topological sorting of transactions

* Return transaction descendants BFS ordered + some optimizations

* Group expiration and revalidation of mempool txs in one task

* Refine the schedule of the cleaning task

* ignore perf logs

* maintain mempool ready transactions in a dedicated set

* Bound the returned candidate transactions to a maximum

* Reduces the max execution time of build block template

* lint

* Add mempool lock granularity to get_all_transactions

* Restore block template cache lifetime & make it customizable in devnet-prealloc feature

* Restore block template cache lifetime & make it customizable in devnet-prealloc feature

* Relax a bit the BBT maximum attempts constraint

* Refactor multiple `contained_by_txs` fns into one generic

* Test selector transaction rejects & fix empty template returned by `select_transactions` upon selector reuse

* Log some mempool metrics

* Handle new block and then new block template

* turn tx selector into an ongoing process with persistent state (wip: some tests are broken; selector is not used correctly by builder)

* use tx selector for BBT (wip: virtual processor retry logic)

* virtual processor selector retry logic

* make BBT fallible by some selector criteria + comments and some docs

* add an infallible mode to virtual processor `build_block_template()`

* constants for tx selector successful decision

* Add e-tps to logged mempool metrics

* avoid realloc

* Address review comments

* Use number of ready txs in e-tps & enhance mempool lock

* Ignore failing send for clean tokio shutdown

* Log double spends

* Log tx script cache stats (wip)

* Ease atomic lock ordering & enhance counter updates

* Enhance tx throughput stats log line

* More robust management of cached data life cycle

* Log mempool sampled instead of exact lengths

* avoid passing consensus to orphan pool

* rename ro `validate_transaction_unacceptance` and move to before the orphan case (accepted txs will usually be orphan)

* rename `cleaning` -> `mempool_scanning`

* keep intervals aligned using a round-up formula (rather than a loop)

* design fix: avoid exposing full collections as mut. This violates encapsulation logic since collections can be completely modified externally; while in tx pools it is important to make sure various internal collections are maintained consistently (for instance the `ready_transactions` field on `TransactionsPool` needs careful maintenance)

* minor: close all pool receivers on op error

* `remove_transaction`: no need to manually update parent-child relations in the case `remove_redeemers=false`. This is already done via `remove_transaction_from_sets` -> `transaction_pool.remove_transaction`. + a few minor changes

* encapsulate `remove_transaction_utxos` into `transaction_pool`

* no need to `remove_redeemers_of` for the initial removed tx since this happens as part of:
`remove_from_transaction_pool_and_update_orphans` -> `orphan_pool.update_orphans_after_transaction_removed` -> `orphan_pool.remove_redeemers_of`

* inline `remove_from_transaction_pool_and_update_orphans`

* remove redeemers of expired low-prio txs + register scan time and daa score after collection (bug fix)

* change mempool monitor logs to debug

* make tps logging more accurate

* import bmk improvements from mempool-perf-stats branch

* make `config.block_template_cache_lifetime` non-feature dependent

---------

Co-authored-by: Michael Sutton <msutton@cs.huji.ac.il>
  • Loading branch information
tiram88 and michaelsutton authored Oct 6, 2023
1 parent 51a1240 commit a59214e
Show file tree
Hide file tree
Showing 61 changed files with 3,006 additions and 728 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 25 additions & 4 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use crate::{
acceptance_data::AcceptanceData,
block::{Block, BlockTemplate},
block::{Block, BlockTemplate, TemplateBuildMode, TemplateTransactionSelector},
block_count::BlockCount,
blockstatus::BlockStatus,
coinbase::MinerData,
Expand All @@ -27,7 +27,12 @@ pub type BlockValidationFuture = BoxFuture<'static, BlockProcessResult<BlockStat
/// Abstracts the consensus external API
#[allow(unused_variables)]
pub trait ConsensusApi: Send + Sync {
fn build_block_template(&self, miner_data: MinerData, txs: Vec<Transaction>) -> Result<BlockTemplate, RuleError> {
fn build_block_template(
&self,
miner_data: MinerData,
tx_selector: Box<dyn TemplateTransactionSelector>,
build_mode: TemplateBuildMode,
) -> Result<BlockTemplate, RuleError> {
unimplemented!()
}

Expand All @@ -40,8 +45,24 @@ pub trait ConsensusApi: Send + Sync {
}

/// Populates the mempool transaction with maximally found UTXO entry data and proceeds to full transaction
/// validation if all are found. If validation is successful, also [`transaction.calculated_fee`] is expected to be populated
fn validate_mempool_transaction_and_populate(&self, transaction: &mut MutableTransaction) -> TxResult<()> {
/// validation if all are found. If validation is successful, also [`transaction.calculated_fee`] is expected to be populated.
fn validate_mempool_transaction(&self, transaction: &mut MutableTransaction) -> TxResult<()> {
unimplemented!()
}

/// Populates the mempool transactions with maximally found UTXO entry data and proceeds to full transactions
/// validation if all are found. If validation is successful, also [`transaction.calculated_fee`] is expected to be populated.
fn validate_mempool_transactions_in_parallel(&self, transactions: &mut [MutableTransaction]) -> Vec<TxResult<()>> {
unimplemented!()
}

/// Populates the mempool transaction with maximally found UTXO entry data.
fn populate_mempool_transaction(&self, transaction: &mut MutableTransaction) -> TxResult<()> {
unimplemented!()
}

/// Populates the mempool transactions with maximally found UTXO entry data.
fn populate_mempool_transactions_in_parallel(&self, transactions: &mut [MutableTransaction]) -> Vec<TxResult<()>> {
unimplemented!()
}

Expand Down
34 changes: 33 additions & 1 deletion consensus/core/src/block.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::sync::Arc;

use crate::{coinbase::MinerData, header::Header, tx::Transaction};
use crate::{
coinbase::MinerData,
header::Header,
tx::{Transaction, TransactionId},
};
use kaspa_hashes::Hash;

/// A mutable block structure where header and transactions within can still be mutated.
Expand Down Expand Up @@ -64,6 +68,34 @@ impl Block {
}
}

/// An abstraction for a recallable transaction selector with persistent state
pub trait TemplateTransactionSelector {
/// Expected to return a batch of transactions which were not previously selected.
/// The batch will typically contain sufficient transactions to fill the block
/// mass (along with the previously unrejected txs), or will drain the selector
fn select_transactions(&mut self) -> Vec<Transaction>;

/// Should be used to report invalid transactions obtained from the *most recent*
/// `select_transactions` call. Implementors should use this call to internally
/// track the selection state and discard the rejected tx from internal occupation calculations
fn reject_selection(&mut self, tx_id: TransactionId);

/// Determine whether this was an overall successful selection episode
fn is_successful(&self) -> bool;
}

/// Block template build mode
#[derive(Clone, Copy, Debug)]
pub enum TemplateBuildMode {
/// Block template build can possibly fail if `TemplateTransactionSelector::is_successful` deems the operation unsuccessful.
///
/// In such a case, the build fails with `BlockRuleError::InvalidTransactionsInNewBlock`.
Standard,

/// Block template build always succeeds. The built block contains only the validated transactions.
Infallible,
}

/// A block template for miners.
#[derive(Debug, Clone)]
pub struct BlockTemplate {
Expand Down
3 changes: 3 additions & 0 deletions consensus/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct Config {

pub externalip: Option<IpAddress>,

pub block_template_cache_lifetime: Option<u64>,

#[cfg(feature = "devnet-prealloc")]
pub initial_utxo_set: Arc<UtxoCollection>,
}
Expand All @@ -82,6 +84,7 @@ impl Config {
user_agent_comments: Default::default(),
externalip: None,
p2p_listen_address: ContextualNetAddress::unspecified(),
block_template_cache_lifetime: None,

#[cfg(feature = "devnet-prealloc")]
initial_utxo_set: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions consensus/core/src/errors/block.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{collections::HashMap, fmt::Display};

use crate::{
constants,
Expand Down Expand Up @@ -140,7 +140,7 @@ pub enum RuleError {
InvalidTransactionsInUtxoContext(usize, usize),

#[error("invalid transactions in new block template")]
InvalidTransactionsInNewBlock(Vec<(TransactionId, TxRuleError)>),
InvalidTransactionsInNewBlock(HashMap<TransactionId, TxRuleError>),

#[error("DAA window data has only {0} entries")]
InsufficientDaaWindowSize(usize),
Expand Down
6 changes: 6 additions & 0 deletions consensus/core/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ impl<T: AsRef<Transaction>> MutableTransaction<T> {
}
}

impl<T: AsRef<Transaction>> AsRef<Transaction> for MutableTransaction<T> {
fn as_ref(&self) -> &Transaction {
self.tx.as_ref()
}
}

/// Private struct used to wrap a [`MutableTransaction`] as a [`VerifiableTransaction`]
struct MutableTransactionVerifiableWrapper<'a, T: AsRef<Transaction>> {
inner: &'a MutableTransaction<T>,
Expand Down
6 changes: 6 additions & 0 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use kaspa_database::{
registry::DatabaseStorePrefixes,
};

use kaspa_txscript::caches::TxScriptCacheCounters;
use parking_lot::RwLock;
use rocksdb::WriteBatch;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -153,6 +154,7 @@ pub struct Factory {
db_parallelism: usize,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
}

impl Factory {
Expand All @@ -163,6 +165,7 @@ impl Factory {
db_parallelism: usize,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
) -> Self {
let mut config = config.clone();
#[cfg(feature = "devnet-prealloc")]
Expand All @@ -175,6 +178,7 @@ impl Factory {
db_parallelism,
notification_root,
counters,
tx_script_cache_counters,
}
}
}
Expand Down Expand Up @@ -208,6 +212,7 @@ impl ConsensusFactory for Factory {
session_lock.clone(),
self.notification_root.clone(),
self.counters.clone(),
self.tx_script_cache_counters.clone(),
entry.creation_timestamp,
));

Expand Down Expand Up @@ -236,6 +241,7 @@ impl ConsensusFactory for Factory {
session_lock.clone(),
self.notification_root.clone(),
self.counters.clone(),
self.tx_script_cache_counters.clone(),
entry.creation_timestamp,
));

Expand Down
32 changes: 26 additions & 6 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
use kaspa_consensus_core::{
acceptance_data::AcceptanceData,
api::{BlockValidationFuture, ConsensusApi},
block::{Block, BlockTemplate},
block::{Block, BlockTemplate, TemplateBuildMode, TemplateTransactionSelector},
block_count::BlockCount,
blockhash::BlockHashExtensions,
blockstatus::BlockStatus,
Expand Down Expand Up @@ -70,6 +70,7 @@ use kaspa_consensusmanager::{SessionLock, SessionReadGuard};
use kaspa_database::prelude::StoreResultExtensions;
use kaspa_hashes::Hash;
use kaspa_muhash::MuHash;
use kaspa_txscript::caches::TxScriptCacheCounters;

use std::thread::{self, JoinHandle};
use std::{
Expand Down Expand Up @@ -132,6 +133,7 @@ impl Consensus {
pruning_lock: SessionLock,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
creation_timestamp: u64,
) -> Self {
let params = &config.params;
Expand All @@ -147,7 +149,7 @@ impl Consensus {
// Services and managers
//

let services = ConsensusServices::new(db.clone(), storage.clone(), config.clone());
let services = ConsensusServices::new(db.clone(), storage.clone(), config.clone(), tx_script_cache_counters);

//
// Processor channels
Expand Down Expand Up @@ -353,8 +355,13 @@ impl Consensus {
}

impl ConsensusApi for Consensus {
fn build_block_template(&self, miner_data: MinerData, txs: Vec<Transaction>) -> Result<BlockTemplate, RuleError> {
self.virtual_processor.build_block_template(miner_data, txs)
fn build_block_template(
&self,
miner_data: MinerData,
tx_selector: Box<dyn TemplateTransactionSelector>,
build_mode: TemplateBuildMode,
) -> Result<BlockTemplate, RuleError> {
self.virtual_processor.build_block_template(miner_data, tx_selector, build_mode)
}

fn validate_and_insert_block(&self, block: Block) -> BlockValidationFuture {
Expand All @@ -367,11 +374,24 @@ impl ConsensusApi for Consensus {
Box::pin(result)
}

fn validate_mempool_transaction_and_populate(&self, transaction: &mut MutableTransaction) -> TxResult<()> {
self.virtual_processor.validate_mempool_transaction_and_populate(transaction)?;
fn validate_mempool_transaction(&self, transaction: &mut MutableTransaction) -> TxResult<()> {
self.virtual_processor.validate_mempool_transaction(transaction)?;
Ok(())
}

fn validate_mempool_transactions_in_parallel(&self, transactions: &mut [MutableTransaction]) -> Vec<TxResult<()>> {
self.virtual_processor.validate_mempool_transactions_in_parallel(transactions)
}

fn populate_mempool_transaction(&self, transaction: &mut MutableTransaction) -> TxResult<()> {
self.virtual_processor.populate_mempool_transaction(transaction)?;
Ok(())
}

fn populate_mempool_transactions_in_parallel(&self, transactions: &mut [MutableTransaction]) -> Vec<TxResult<()>> {
self.virtual_processor.populate_mempool_transactions_in_parallel(transactions)
}

fn calculate_transaction_mass(&self, transaction: &Transaction) -> u64 {
self.services.mass_calculator.calc_tx_mass(transaction)
}
Expand Down
9 changes: 8 additions & 1 deletion consensus/src/consensus/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
};

use itertools::Itertools;
use kaspa_txscript::caches::TxScriptCacheCounters;
use std::sync::Arc;

pub type DbGhostdagManager =
Expand Down Expand Up @@ -65,7 +66,12 @@ pub struct ConsensusServices {
}

impl ConsensusServices {
pub fn new(db: Arc<DB>, storage: Arc<ConsensusStorage>, config: Arc<Config>) -> Arc<Self> {
pub fn new(
db: Arc<DB>,
storage: Arc<ConsensusStorage>,
config: Arc<Config>,
tx_script_cache_counters: Arc<TxScriptCacheCounters>,
) -> Arc<Self> {
let params = &config.params;

let statuses_service = MTStatusesService::new(storage.statuses_store.clone());
Expand Down Expand Up @@ -144,6 +150,7 @@ impl ConsensusServices {
params.ghostdag_k,
params.coinbase_payload_script_public_key_max_len,
params.coinbase_maturity,
tx_script_cache_counters,
);

let pruning_point_manager = PruningPointManager::new(
Expand Down
39 changes: 33 additions & 6 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,17 @@ impl TestConsensus {
/// Creates a test consensus instance based on `config` with the provided `db` and `notification_sender`
pub fn with_db(db: Arc<DB>, config: &Config, notification_sender: Sender<Notification>) -> Self {
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let counters = Default::default();
let tx_script_cache_counters = Default::default();
let consensus = Arc::new(Consensus::new(
db,
Arc::new(config.clone()),
Default::default(),
notification_root,
counters,
tx_script_cache_counters,
0,
));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { params: config.params.clone(), consensus, block_builder, db_lifetime: Default::default() }
Expand All @@ -60,8 +69,17 @@ impl TestConsensus {
pub fn with_notifier(config: &Config, notification_sender: Sender<Notification>) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let counters = Default::default();
let tx_script_cache_counters = Default::default();
let consensus = Arc::new(Consensus::new(
db,
Arc::new(config.clone()),
Default::default(),
notification_root,
counters,
tx_script_cache_counters,
0,
));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { consensus, block_builder, params: config.params.clone(), db_lifetime }
Expand All @@ -72,8 +90,17 @@ impl TestConsensus {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
let counters = Default::default();
let tx_script_cache_counters = Default::default();
let consensus = Arc::new(Consensus::new(
db,
Arc::new(config.clone()),
Default::default(),
notification_root,
counters,
tx_script_cache_counters,
0,
));
let block_builder = TestBlockBuilder::new(consensus.virtual_processor.clone());

Self { consensus, block_builder, params: config.params.clone(), db_lifetime }
Expand Down
Loading

0 comments on commit a59214e

Please sign in to comment.