Skip to content

Commit

Permalink
Log some mempool metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
tiram88 committed Sep 19, 2023
1 parent 1f0c44e commit bb56c70
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 26 deletions.
10 changes: 9 additions & 1 deletion kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ use kaspa_consensus::{consensus::factory::Factory as ConsensusFactory, pipeline:
use kaspa_consensusmanager::ConsensusManager;
use kaspa_core::task::runtime::AsyncRuntime;
use kaspa_index_processor::service::IndexService;
use kaspa_mining::manager::{MiningManager, MiningManagerProxy};
use kaspa_mining::{
manager::{MiningManager, MiningManagerProxy},
monitor::MiningMonitor,
MiningCounters,
};
use kaspa_p2p_flows::{flow_context::FlowContext, service::P2pService};

use kaspa_perf_monitor::builder::Builder as PerfMonitorBuilder;
Expand Down Expand Up @@ -233,6 +237,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
let (notification_send, notification_recv) = unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_send));
let processing_counters = Arc::new(ProcessingCounters::default());
let mining_counters = Arc::new(MiningCounters::default());
let wrpc_borsh_counters = Arc::new(WrpcServerCounters::default());
let wrpc_json_counters = Arc::new(WrpcServerCounters::default());

Expand Down Expand Up @@ -280,11 +285,13 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
let cache_lifetime: Option<u64> = None;
#[cfg(feature = "devnet-prealloc")]
let cache_lifetime = config.block_template_cache_lifetime;
let mining_monitor = Arc::new(MiningMonitor::new(mining_counters.clone(), tick_service.clone()));
let mining_manager = MiningManagerProxy::new(Arc::new(MiningManager::new(
config.target_time_per_block,
false,
config.max_block_mass,
cache_lifetime,
mining_counters,
)));

let flow_context = Arc::new(FlowContext::new(
Expand Down Expand Up @@ -333,6 +340,7 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm
async_runtime.register(grpc_service);
async_runtime.register(p2p_service);
async_runtime.register(consensus_monitor);
async_runtime.register(mining_monitor);
async_runtime.register(perf_monitor);
let wrpc_service_tasks: usize = 2; // num_cpus::get() / 2;
// Register wRPC servers based on command line arguments
Expand Down
78 changes: 78 additions & 0 deletions mining/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,88 @@
use std::sync::atomic::{AtomicU64, Ordering};

use mempool::tx::Priority;

mod block_template;
pub(crate) mod cache;
pub mod errors;
pub mod manager;
mod manager_tests;
pub mod mempool;
pub mod model;
pub mod monitor;

#[cfg(test)]
pub mod testutils;

#[derive(Default)]
pub struct MiningCounters {
pub high_priority_tx_counts: AtomicU64,
pub low_priority_tx_counts: AtomicU64,
pub block_tx_counts: AtomicU64,
pub tx_accepted_counts: AtomicU64,
pub input_counts: AtomicU64,
pub output_counts: AtomicU64,
}

impl MiningCounters {
pub fn snapshot(&self) -> MempoolCountersSnapshot {
MempoolCountersSnapshot {
high_priority_tx_counts: self.high_priority_tx_counts.load(Ordering::Relaxed),
low_priority_tx_counts: self.low_priority_tx_counts.load(Ordering::Relaxed),
block_tx_counts: self.block_tx_counts.load(Ordering::Relaxed),
tx_accepted_counts: self.tx_accepted_counts.load(Ordering::Relaxed),
input_counts: self.input_counts.load(Ordering::Relaxed),
output_counts: self.output_counts.load(Ordering::Relaxed),
}
}

pub fn increase_tx_counts(&self, value: u64, priority: Priority) {
match priority {
Priority::Low => {
self.low_priority_tx_counts.fetch_add(value, Ordering::SeqCst);
}
Priority::High => {
self.high_priority_tx_counts.fetch_add(value, Ordering::SeqCst);
}
}
}
}

#[derive(Debug, PartialEq, Eq)]
pub struct MempoolCountersSnapshot {
pub high_priority_tx_counts: u64,
pub low_priority_tx_counts: u64,
pub block_tx_counts: u64,
pub tx_accepted_counts: u64,
pub input_counts: u64,
pub output_counts: u64,
}

impl MempoolCountersSnapshot {
pub fn in_tx_counts(&self) -> u64 {
self.high_priority_tx_counts + self.low_priority_tx_counts
}

pub fn collision_ratio(&self) -> f64 {
if self.block_tx_counts > 0 {
(self.block_tx_counts - self.tx_accepted_counts) as f64 / self.block_tx_counts as f64
} else {
0f64
}
}
}

impl core::ops::Sub for &MempoolCountersSnapshot {
type Output = MempoolCountersSnapshot;

fn sub(self, rhs: Self) -> Self::Output {
Self::Output {
high_priority_tx_counts: self.high_priority_tx_counts.checked_sub(rhs.high_priority_tx_counts).unwrap_or_default(),
low_priority_tx_counts: self.low_priority_tx_counts.checked_sub(rhs.low_priority_tx_counts).unwrap_or_default(),
block_tx_counts: self.block_tx_counts.checked_sub(rhs.block_tx_counts).unwrap_or_default(),
tx_accepted_counts: self.tx_accepted_counts.checked_sub(rhs.tx_accepted_counts).unwrap_or_default(),
input_counts: self.input_counts.checked_sub(rhs.input_counts).unwrap_or_default(),
output_counts: self.output_counts.checked_sub(rhs.output_counts).unwrap_or_default(),
}
}
}
14 changes: 10 additions & 4 deletions mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
owner_txs::{GroupedOwnerTransactions, ScriptPublicKeySet},
topological_sort::IntoIterTopologically,
},
MiningCounters,
};
use itertools::Itertools;
use kaspa_consensus_core::{
Expand All @@ -36,6 +37,7 @@ pub struct MiningManager {
config: Arc<Config>,
block_template_cache: BlockTemplateCache,
mempool: RwLock<Mempool>,
counters: Arc<MiningCounters>,
}

impl MiningManager {
Expand All @@ -44,16 +46,17 @@ impl MiningManager {
relay_non_std_transactions: bool,
max_block_mass: u64,
cache_lifetime: Option<u64>,
counters: Arc<MiningCounters>,
) -> Self {
let config = Config::build_default(target_time_per_block, relay_non_std_transactions, max_block_mass);
Self::with_config(config, cache_lifetime)
Self::with_config(config, cache_lifetime, counters)
}

pub(crate) fn with_config(config: Config, cache_lifetime: Option<u64>) -> Self {
pub(crate) fn with_config(config: Config, cache_lifetime: Option<u64>, counters: Arc<MiningCounters>) -> Self {
let config = Arc::new(config);
let mempool = RwLock::new(Mempool::new(config.clone()));
let mempool = RwLock::new(Mempool::new(config.clone(), counters.clone()));
let block_template_cache = BlockTemplateCache::new(cache_lifetime);
Self { config, block_template_cache, mempool }
Self { config, block_template_cache, mempool, counters }
}

pub fn get_block_template(&self, consensus: &dyn ConsensusApi, miner_data: &MinerData) -> MiningManagerResult<BlockTemplate> {
Expand Down Expand Up @@ -246,6 +249,7 @@ impl MiningManager {
// We include the original accepted transaction as well
accepted_transactions.push(accepted_transaction);
accepted_transactions.extend(self.validate_and_insert_unorphaned_transactions(consensus, unorphaned_transactions));
self.counters.increase_tx_counts(1, priority);

Ok(accepted_transactions)
} else {
Expand Down Expand Up @@ -297,6 +301,7 @@ impl MiningManager {
) {
Ok(Some(accepted_transaction)) => {
accepted_transactions.push(accepted_transaction.clone());
self.counters.increase_tx_counts(1, priority);
mempool.get_unorphaned_transactions_after_accepted_transaction(&accepted_transaction)
}
Ok(None) => vec![],
Expand Down Expand Up @@ -385,6 +390,7 @@ impl MiningManager {
match mempool.post_validate_and_insert_transaction(consensus, validation_result, transaction, priority, orphan) {
Ok(Some(accepted_transaction)) => {
accepted_transactions.push(accepted_transaction.clone());
self.counters.increase_tx_counts(1, priority);
mempool.get_unorphaned_transactions_after_accepted_transaction(&accepted_transaction)
}
Ok(None) => {
Expand Down
31 changes: 21 additions & 10 deletions mining/src/manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod tests {
},
model::candidate_tx::CandidateTransaction,
testutils::consensus_mock::ConsensusMock,
MiningCounters,
};
use kaspa_addresses::{Address, Prefix, Version};
use kaspa_consensus_core::{
Expand Down Expand Up @@ -41,7 +42,8 @@ mod tests {
fn test_validate_and_insert_transaction() {
const TX_COUNT: u32 = 10;
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);
let transactions_to_insert = (0..TX_COUNT).map(|i| create_transaction_with_utxo_entry(i, 0)).collect::<Vec<_>>();
for transaction in transactions_to_insert.iter() {
let result = mining_manager.validate_and_insert_mutable_transaction(
Expand Down Expand Up @@ -113,7 +115,8 @@ mod tests {
#[test]
fn test_simulated_error_in_consensus() {
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);

// Build an invalid transaction with some gas and inform the consensus mock about the result it should return
// when the mempool will submit this transaction for validation.
Expand Down Expand Up @@ -143,7 +146,8 @@ mod tests {
#[test]
fn test_insert_double_transactions_to_mempool() {
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);

let transaction = create_transaction_with_utxo_entry(0, 0);

Expand Down Expand Up @@ -185,7 +189,8 @@ mod tests {
#[test]
fn test_double_spend_in_mempool() {
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);

let transaction = create_child_and_parent_txs_and_add_parent_to_consensus(&consensus);
assert!(
Expand Down Expand Up @@ -233,7 +238,8 @@ mod tests {
#[test]
fn test_handle_new_block_transactions() {
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);

const TX_COUNT: u32 = 10;
let transactions_to_insert = (0..TX_COUNT).map(|i| create_transaction_with_utxo_entry(i, 0)).collect::<Vec<_>>();
Expand Down Expand Up @@ -292,7 +298,8 @@ mod tests {
// will be removed from the mempool.
fn test_double_spend_with_block() {
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);

let transaction_in_the_mempool = create_transaction_with_utxo_entry(0, 0);
let result = mining_manager.validate_and_insert_transaction(
Expand Down Expand Up @@ -322,7 +329,8 @@ mod tests {
#[test]
fn test_orphan_transactions() {
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);

// Before each parent transaction we add a transaction that funds it and insert the funding transaction in the consensus.
const TX_PAIRS_COUNT: usize = 5;
Expand Down Expand Up @@ -575,7 +583,8 @@ mod tests {
let mut config = Config::build_default(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS);
// Limit the orphan pool to 2 transactions
config.maximum_orphan_transaction_count = 2;
let mining_manager = MiningManager::with_config(config.clone(), None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::with_config(config.clone(), None, counters);

// Create pairs of transaction parent-and-child pairs according to the test vector
let (parent_txs, child_txs) = create_arrays_of_parent_and_children_transactions(&consensus, tests.len());
Expand Down Expand Up @@ -655,7 +664,8 @@ mod tests {
#[test]
fn test_revalidate_high_priority_transactions() {
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);

// Create two valid transactions that double-spend each other (child_tx_1, child_tx_2)
let (parent_tx, child_tx_1) = create_parent_and_children_transactions(&consensus, vec![3000 * SOMPI_PER_KASPA]);
Expand Down Expand Up @@ -718,7 +728,8 @@ mod tests {
#[test]
fn test_modify_block_template() {
let consensus = Arc::new(ConsensusMock::new());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None);
let counters = Arc::new(MiningCounters::default());
let mining_manager = MiningManager::new(TARGET_TIME_PER_BLOCK, false, MAX_BLOCK_MASS, None, counters);

// Before each parent transaction we add a transaction that funds it and insert the funding transaction in the consensus.
const TX_PAIRS_COUNT: usize = 12;
Expand Down
14 changes: 10 additions & 4 deletions mining/src/mempool/check_transaction_standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,10 @@ impl Mempool {
#[cfg(test)]
mod tests {
use super::*;
use crate::mempool::config::{Config, DEFAULT_MINIMUM_RELAY_TRANSACTION_FEE};
use crate::{
mempool::config::{Config, DEFAULT_MINIMUM_RELAY_TRANSACTION_FEE},
MiningCounters,
};
use kaspa_addresses::{Address, Prefix, Version};
use kaspa_consensus_core::{
config::params::Params,
Expand Down Expand Up @@ -282,7 +285,8 @@ mod tests {
let params: Params = net.into();
let mut config = Config::build_default(params.target_time_per_block, false, params.max_block_mass);
config.minimum_relay_transaction_fee = test.minimum_relay_transaction_fee;
let mempool = Mempool::new(Arc::new(config));
let counters = Arc::new(MiningCounters::default());
let mempool = Mempool::new(Arc::new(config), counters);

let got = mempool.minimum_required_transaction_relay_fee(test.size);
if got != test.want {
Expand Down Expand Up @@ -366,7 +370,8 @@ mod tests {
let params: Params = net.into();
let mut config = Config::build_default(params.target_time_per_block, false, params.max_block_mass);
config.minimum_relay_transaction_fee = test.minimum_relay_transaction_fee;
let mempool = Mempool::new(Arc::new(config));
let counters = Arc::new(MiningCounters::default());
let mempool = Mempool::new(Arc::new(config), counters);

println!("test_is_transaction_output_dust test '{}' ", test.name);
let res = mempool.is_transaction_output_dust(&test.tx_out);
Expand Down Expand Up @@ -544,7 +549,8 @@ mod tests {
for net in NetworkType::iter() {
let params: Params = net.into();
let config = Config::build_default(params.target_time_per_block, false, params.max_block_mass);
let mempool = Mempool::new(Arc::new(config));
let counters = Arc::new(MiningCounters::default());
let mempool = Mempool::new(Arc::new(config), counters);

// Ensure standard-ness is as expected.
println!("test_check_transaction_standard_in_isolation test '{}' ", test.name);
Expand Down
9 changes: 7 additions & 2 deletions mining/src/mempool/handle_new_block_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use kaspa_consensus_core::{
tx::{Transaction, TransactionId},
};
use kaspa_core::time::Stopwatch;
use std::collections::HashSet;
use std::{collections::HashSet, sync::atomic::Ordering};

impl Mempool {
pub(crate) fn handle_new_block_transactions(
Expand All @@ -32,7 +32,12 @@ impl Mempool {
}
self.remove_double_spends(transaction)?;
self.orphan_pool.remove_orphan(&transaction_id, false, TxRemovalReason::Accepted, "")?;
self.accepted_transactions.add(transaction_id, block_daa_score);
self.counters.block_tx_counts.fetch_add(1, Ordering::SeqCst);
if self.accepted_transactions.add(transaction_id, block_daa_score) {
self.counters.tx_accepted_counts.fetch_add(1, Ordering::SeqCst);
self.counters.input_counts.fetch_add(transaction.inputs.len() as u64, Ordering::SeqCst);
self.counters.output_counts.fetch_add(transaction.outputs.len() as u64, Ordering::SeqCst);
}
unorphaned_transactions.extend(self.get_unorphaned_transactions_after_accepted_transaction(transaction));
}
Ok(unorphaned_transactions)
Expand Down
14 changes: 9 additions & 5 deletions mining/src/mempool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::model::{
candidate_tx::CandidateTransaction,
owner_txs::{GroupedOwnerTransactions, ScriptPublicKeySet},
use crate::{
model::{
candidate_tx::CandidateTransaction,
owner_txs::{GroupedOwnerTransactions, ScriptPublicKeySet},
},
MiningCounters,
};

use self::{
Expand Down Expand Up @@ -46,14 +49,15 @@ pub(crate) struct Mempool {
orphan_pool: OrphanPool,
accepted_transactions: AcceptedTransactions,
last_stat_report_time: u64,
counters: Arc<MiningCounters>,
}

impl Mempool {
pub(crate) fn new(config: Arc<Config>) -> Self {
pub(crate) fn new(config: Arc<Config>, counters: Arc<MiningCounters>) -> Self {
let transaction_pool = TransactionsPool::new(config.clone());
let orphan_pool = OrphanPool::new(config.clone());
let accepted_transactions = AcceptedTransactions::new(config.clone());
Self { config, transaction_pool, orphan_pool, accepted_transactions, last_stat_report_time: unix_now() }
Self { config, transaction_pool, orphan_pool, accepted_transactions, last_stat_report_time: unix_now(), counters }
}

pub(crate) fn get_transaction(
Expand Down
Loading

0 comments on commit bb56c70

Please sign in to comment.