diff --git a/Cargo.lock b/Cargo.lock index c0146d17fe..bf353479fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3420,6 +3420,7 @@ dependencies = [ "ipnet", "itertools 0.11.0", "log", + "once_cell", "parking_lot", "rand 0.8.5", "rlimit", @@ -3803,6 +3804,7 @@ name = "kaspad" version = "0.14.1" dependencies = [ "async-channel 2.2.1", + "cfg-if 1.0.0", "clap 4.5.4", "dhat", "dirs", @@ -5637,6 +5639,7 @@ name = "simpa" version = "0.14.1" dependencies = [ "async-channel 2.2.1", + "cfg-if 1.0.0", "clap 4.5.4", "dhat", "futures", diff --git a/kaspad/Cargo.toml b/kaspad/Cargo.toml index 9f3290a51c..15a408dad5 100644 --- a/kaspad/Cargo.toml +++ b/kaspad/Cargo.toml @@ -41,22 +41,25 @@ kaspa-utxoindex.workspace = true kaspa-wrpc-server.workspace = true async-channel.workspace = true +cfg-if.workspace = true clap.workspace = true dhat = { workspace = true, optional = true } -serde.workspace = true dirs.workspace = true futures-util.workspace = true log.workspace = true num_cpus.workspace = true rand.workspace = true rayon.workspace = true +serde.workspace = true tempfile.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] } workflow-log.workspace = true + toml = "0.8.10" serde_with = "3.7.0" [features] heap = ["dhat", "kaspa-alloc/heap"] devnet-prealloc = ["kaspa-consensus/devnet-prealloc"] +semaphore-trace = ["kaspa-utils/semaphore-trace"] diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index ce4c190335..9ba7a62d3b 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -161,7 +161,13 @@ impl Runtime { let log_dir = get_log_dir(args); // Initialize the logger - kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level); + cfg_if::cfg_if! { + if #[cfg(feature = "semaphore-trace")] { + kaspa_core::log::init_logger(log_dir.as_deref(), &format!("{},{}=debug", args.log_level, kaspa_utils::sync::semaphore_module_path())); + } else { + kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level); + } + }; // Configure the panic behavior // As we log the panic, we want to set it up after the logger diff --git a/mining/src/manager.rs b/mining/src/manager.rs index 477e3deff1..081854698e 100644 --- a/mining/src/manager.rs +++ b/mining/src/manager.rs @@ -35,7 +35,7 @@ use kaspa_consensusmanager::{spawn_blocking, ConsensusProxy}; use kaspa_core::{debug, error, info, time::Stopwatch, warn}; use kaspa_mining_errors::{manager::MiningManagerError, mempool::RuleError}; use parking_lot::RwLock; -use std::{ops::Mul, sync::Arc}; +use std::sync::Arc; use tokio::sync::mpsc::UnboundedSender; pub struct MiningManager { @@ -1044,29 +1044,28 @@ fn feerate_stats(transactions: Vec, calculated_fees: Vec) -> O return None; } if transactions.len() != calculated_fees.len() + 1 { - error!("Transactions length must be one more than `calculated_fees` length"); + error!( + "[feerate_stats] block template transactions length ({}) is expected to be one more than `calculated_fees` length ({})", + transactions.len(), + calculated_fees.len() + ); return None; } debug_assert!(transactions[0].is_coinbase()); - let mut fees_and_masses = calculated_fees + let mut feerates = calculated_fees .into_iter() .zip(transactions .iter() // skip coinbase tx .skip(1) .map(Transaction::mass)) + .map(|(fee, mass)| fee as f64 / mass as f64) .collect_vec(); + feerates.sort_unstable_by(f64::total_cmp); - // Sort by fee rate without performing division for each comparison. - // Using multiplication instead of division is faster and avoids the need - // to convert all values to floats. Division is only performed later when - // calculating the min, max, and median fee rates. - fees_and_masses.sort_unstable_by(|(lhs_fee, lhs_mass), (rhs_fee, rhs_mass)| lhs_fee.mul(rhs_mass).cmp(&rhs_fee.mul(lhs_mass))); - - let div_as_f64 = |(fee, mass)| fee as f64 / mass as f64; - let max = div_as_f64(fees_and_masses[fees_and_masses.len() - 1]); - let min = div_as_f64(fees_and_masses[0]); - let median = div_as_f64(fees_and_masses[fees_and_masses.len() / 2]); + let max = feerates[feerates.len() - 1]; + let min = feerates[0]; + let median = feerates[feerates.len() / 2]; Some(Stats { max, median, min }) } diff --git a/simpa/Cargo.toml b/simpa/Cargo.toml index b52aa6fd93..30162ba4f1 100644 --- a/simpa/Cargo.toml +++ b/simpa/Cargo.toml @@ -22,6 +22,7 @@ kaspa-perf-monitor.workspace = true kaspa-utils.workspace = true async-channel.workspace = true +cfg-if.workspace = true clap.workspace = true dhat = { workspace = true, optional = true } futures-util.workspace = true @@ -38,3 +39,4 @@ tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] } [features] heap = ["dhat", "kaspa-alloc/heap"] +semaphore-trace = ["kaspa-utils/semaphore-trace"] diff --git a/simpa/src/main.rs b/simpa/src/main.rs index 1baecc3e78..1d14a3c681 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -20,7 +20,12 @@ use kaspa_consensus_core::{ BlockHashSet, BlockLevel, HashMapCustomHasher, }; use kaspa_consensus_notify::root::ConsensusNotificationRoot; -use kaspa_core::{info, task::service::AsyncService, task::tick::TickService, time::unix_now, trace, warn}; +use kaspa_core::{ + info, + task::{service::AsyncService, tick::TickService}, + time::unix_now, + trace, warn, +}; use kaspa_database::prelude::ConnBuilder; use kaspa_database::{create_temp_db, load_existing_db}; use kaspa_hashes::Hash; @@ -133,7 +138,13 @@ fn main() { let args = Args::parse(); // Initialize the logger - kaspa_core::log::init_logger(None, &args.log_level); + cfg_if::cfg_if! { + if #[cfg(feature = "semaphore-trace")] { + kaspa_core::log::init_logger(None, &format!("{},{}=debug", args.log_level, kaspa_utils::sync::semaphore_module_path())); + } else { + kaspa_core::log::init_logger(None, &args.log_level); + } + }; // Configure the panic behavior // As we log the panic, we want to set it up after the logger diff --git a/utils/Cargo.toml b/utils/Cargo.toml index dda05cb0ea..641ecb61ae 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -11,7 +11,6 @@ repository.workspace = true [dependencies] arc-swap.workspace = true -parking_lot.workspace = true async-channel.workspace = true borsh.workspace = true cfg-if.workspace = true @@ -19,12 +18,14 @@ event-listener.workspace = true faster-hex.workspace = true ipnet.workspace = true itertools.workspace = true +log.workspace = true +once_cell.workspace = true +parking_lot.workspace = true serde.workspace = true smallvec.workspace = true thiserror.workspace = true triggered.workspace = true uuid.workspace = true -log.workspace = true wasm-bindgen.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -42,3 +43,6 @@ rand.workspace = true [[bench]] name = "bench" harness = false + +[features] +semaphore-trace = [] diff --git a/utils/src/sync/mod.rs b/utils/src/sync/mod.rs index 40fb147cb3..14afe79772 100644 --- a/utils/src/sync/mod.rs +++ b/utils/src/sync/mod.rs @@ -1,2 +1,7 @@ pub mod rwlock; pub(crate) mod semaphore; + +#[cfg(feature = "semaphore-trace")] +pub fn semaphore_module_path() -> &'static str { + semaphore::get_module_path() +} diff --git a/utils/src/sync/semaphore.rs b/utils/src/sync/semaphore.rs index 4b94e8f2f5..2ea6dcc031 100644 --- a/utils/src/sync/semaphore.rs +++ b/utils/src/sync/semaphore.rs @@ -4,6 +4,64 @@ use std::{ time::Duration, }; +#[cfg(feature = "semaphore-trace")] +mod trace { + use super::*; + use log::debug; + use once_cell::sync::Lazy; + use std::sync::atomic::AtomicU64; + use std::time::SystemTime; + + static SYS_START: Lazy = Lazy::new(SystemTime::now); + + #[inline] + pub(super) fn sys_now() -> u64 { + SystemTime::now().duration_since(*SYS_START).unwrap_or_default().as_micros() as u64 + } + + #[derive(Debug, Default)] + pub struct TraceInner { + readers_start: AtomicU64, + readers_time: AtomicU64, + log_time: AtomicU64, + log_value: AtomicU64, + } + + impl TraceInner { + pub(super) fn mark_readers_start(&self) { + self.readers_start.store(sys_now(), Ordering::Relaxed); + } + + pub(super) fn mark_readers_end(&self) { + let start = self.readers_start.load(Ordering::Relaxed); + let now = sys_now(); + if start < now { + let readers_time = self.readers_time.fetch_add(now - start, Ordering::Relaxed) + now - start; + let log_time = self.log_time.load(Ordering::Relaxed); + if log_time + (Duration::from_secs(10).as_micros() as u64) < now { + let log_value = self.log_value.load(Ordering::Relaxed); + debug!( + "Semaphore: log interval: {:?}, readers time: {:?}, fraction: {:.2}", + Duration::from_micros(now - log_time), + Duration::from_micros(readers_time - log_value), + (readers_time - log_value) as f64 / (now - log_time) as f64 + ); + self.log_value.store(readers_time, Ordering::Relaxed); + self.log_time.store(now, Ordering::Relaxed); + } + } + } + } +} + +#[cfg(feature = "semaphore-trace")] +use trace::*; + +#[cfg(feature = "semaphore-trace")] +pub(crate) fn get_module_path() -> &'static str { + module_path!() +} + /// A low-level non-fair semaphore. The semaphore is non-fair in the sense that clients acquiring /// a lower number of permits might get their allocation before earlier clients which requested more /// permits -- if the semaphore can provide the lower allocation but not the larger. This non-fairness @@ -15,13 +73,28 @@ use std::{ pub(crate) struct Semaphore { counter: AtomicUsize, signal: Event, + #[cfg(feature = "semaphore-trace")] + trace_inner: TraceInner, } impl Semaphore { pub const MAX_PERMITS: usize = usize::MAX; - pub const fn new(available_permits: usize) -> Semaphore { - Semaphore { counter: AtomicUsize::new(available_permits), signal: Event::new() } + pub fn new(available_permits: usize) -> Semaphore { + cfg_if::cfg_if! { + if #[cfg(feature = "semaphore-trace")] { + Semaphore { + counter: AtomicUsize::new(available_permits), + signal: Event::new(), + trace_inner: Default::default(), + } + } else { + Semaphore { + counter: AtomicUsize::new(available_permits), + signal: Event::new(), + } + } + } } /// Tries to acquire `permits` slots from the semaphore. Upon success, returns the acquired slot @@ -33,7 +106,14 @@ impl Semaphore { } match self.counter.compare_exchange_weak(count, count - permits, Ordering::AcqRel, Ordering::Acquire) { - Ok(_) => return Some(count), + Ok(_) => { + #[cfg(feature = "semaphore-trace")] + if permits == 1 && count == Self::MAX_PERMITS { + // permits == 1 indicates a reader, count == Self::MAX_PERMITS indicates it is the first reader + self.trace_inner.mark_readers_start(); + } + return Some(count); + } Err(c) => count = c, } } @@ -75,6 +155,12 @@ impl Semaphore { /// Returns the released slot pub fn release(&self, permits: usize) -> usize { let slot = self.counter.fetch_add(permits, Ordering::AcqRel) + permits; + + #[cfg(feature = "semaphore-trace")] + if permits == 1 && slot == Self::MAX_PERMITS { + // permits == 1 indicates a reader, slot == Self::MAX_PERMITS indicates it is the last reader + self.trace_inner.mark_readers_end(); + } self.signal.notify(permits); slot }