Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Semaphore tracing feature (for tracing prune readers vs writers time) #526

Merged
merged 5 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion kaspad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,25 @@ kaspa-utxoindex.workspace = true
kaspa-wrpc-server.workspace = true

async-channel.workspace = true
cfg-if.workspace = true
clap.workspace = true
dhat = { workspace = true, optional = true }
serde.workspace = true
dirs.workspace = true
futures-util.workspace = true
log.workspace = true
num_cpus.workspace = true
rand.workspace = true
rayon.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] }
workflow-log.workspace = true

toml = "0.8.10"
serde_with = "3.7.0"

[features]
heap = ["dhat", "kaspa-alloc/heap"]
devnet-prealloc = ["kaspa-consensus/devnet-prealloc"]
semaphore-trace = ["kaspa-utils/semaphore-trace"]
8 changes: 7 additions & 1 deletion kaspad/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,13 @@ impl Runtime {
let log_dir = get_log_dir(args);

// Initialize the logger
kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level);
cfg_if::cfg_if! {
if #[cfg(feature = "semaphore-trace")] {
kaspa_core::log::init_logger(log_dir.as_deref(), &format!("{},{}=debug", args.log_level, kaspa_utils::sync::semaphore_module_path()));
} else {
kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level);
}
};

// Configure the panic behavior
// As we log the panic, we want to set it up after the logger
Expand Down
25 changes: 12 additions & 13 deletions mining/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use kaspa_consensusmanager::{spawn_blocking, ConsensusProxy};
use kaspa_core::{debug, error, info, time::Stopwatch, warn};
use kaspa_mining_errors::{manager::MiningManagerError, mempool::RuleError};
use parking_lot::RwLock;
use std::{ops::Mul, sync::Arc};
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;

pub struct MiningManager {
Expand Down Expand Up @@ -1044,29 +1044,28 @@ fn feerate_stats(transactions: Vec<Transaction>, calculated_fees: Vec<u64>) -> O
return None;
}
if transactions.len() != calculated_fees.len() + 1 {
error!("Transactions length must be one more than `calculated_fees` length");
error!(
"[feerate_stats] block template transactions length ({}) is expected to be one more than `calculated_fees` length ({})",
transactions.len(),
calculated_fees.len()
);
return None;
}
debug_assert!(transactions[0].is_coinbase());
let mut fees_and_masses = calculated_fees
let mut feerates = calculated_fees
.into_iter()
.zip(transactions
.iter()
// skip coinbase tx
.skip(1)
.map(Transaction::mass))
.map(|(fee, mass)| fee as f64 / mass as f64)
.collect_vec();
feerates.sort_unstable_by(f64::total_cmp);

// Sort by fee rate without performing division for each comparison.
// Using multiplication instead of division is faster and avoids the need
// to convert all values to floats. Division is only performed later when
// calculating the min, max, and median fee rates.
fees_and_masses.sort_unstable_by(|(lhs_fee, lhs_mass), (rhs_fee, rhs_mass)| lhs_fee.mul(rhs_mass).cmp(&rhs_fee.mul(lhs_mass)));

let div_as_f64 = |(fee, mass)| fee as f64 / mass as f64;
let max = div_as_f64(fees_and_masses[fees_and_masses.len() - 1]);
let min = div_as_f64(fees_and_masses[0]);
let median = div_as_f64(fees_and_masses[fees_and_masses.len() / 2]);
let max = feerates[feerates.len() - 1];
let min = feerates[0];
let median = feerates[feerates.len() / 2];

Some(Stats { max, median, min })
}
Expand Down
2 changes: 2 additions & 0 deletions simpa/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ kaspa-perf-monitor.workspace = true
kaspa-utils.workspace = true

async-channel.workspace = true
cfg-if.workspace = true
clap.workspace = true
dhat = { workspace = true, optional = true }
futures-util.workspace = true
Expand All @@ -38,3 +39,4 @@ tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] }

[features]
heap = ["dhat", "kaspa-alloc/heap"]
semaphore-trace = ["kaspa-utils/semaphore-trace"]
15 changes: 13 additions & 2 deletions simpa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ use kaspa_consensus_core::{
BlockHashSet, BlockLevel, HashMapCustomHasher,
};
use kaspa_consensus_notify::root::ConsensusNotificationRoot;
use kaspa_core::{info, task::service::AsyncService, task::tick::TickService, time::unix_now, trace, warn};
use kaspa_core::{
info,
task::{service::AsyncService, tick::TickService},
time::unix_now,
trace, warn,
};
use kaspa_database::prelude::ConnBuilder;
use kaspa_database::{create_temp_db, load_existing_db};
use kaspa_hashes::Hash;
Expand Down Expand Up @@ -133,7 +138,13 @@ fn main() {
let args = Args::parse();

// Initialize the logger
kaspa_core::log::init_logger(None, &args.log_level);
cfg_if::cfg_if! {
if #[cfg(feature = "semaphore-trace")] {
kaspa_core::log::init_logger(None, &format!("{},{}=debug", args.log_level, kaspa_utils::sync::semaphore_module_path()));
} else {
kaspa_core::log::init_logger(None, &args.log_level);
}
};

// Configure the panic behavior
// As we log the panic, we want to set it up after the logger
Expand Down
8 changes: 6 additions & 2 deletions utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ repository.workspace = true

[dependencies]
arc-swap.workspace = true
parking_lot.workspace = true
async-channel.workspace = true
borsh.workspace = true
cfg-if.workspace = true
event-listener.workspace = true
faster-hex.workspace = true
ipnet.workspace = true
itertools.workspace = true
log.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
serde.workspace = true
smallvec.workspace = true
thiserror.workspace = true
triggered.workspace = true
uuid.workspace = true
log.workspace = true
wasm-bindgen.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
Expand All @@ -42,3 +43,6 @@ rand.workspace = true
[[bench]]
name = "bench"
harness = false

[features]
semaphore-trace = []
5 changes: 5 additions & 0 deletions utils/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
pub mod rwlock;
pub(crate) mod semaphore;

#[cfg(feature = "semaphore-trace")]
pub fn semaphore_module_path() -> &'static str {
semaphore::get_module_path()
}
92 changes: 89 additions & 3 deletions utils/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,64 @@ use std::{
time::Duration,
};

#[cfg(feature = "semaphore-trace")]
mod trace {
use super::*;
use log::debug;
use once_cell::sync::Lazy;
use std::sync::atomic::AtomicU64;
use std::time::SystemTime;

static SYS_START: Lazy<SystemTime> = Lazy::new(SystemTime::now);

#[inline]
pub(super) fn sys_now() -> u64 {
SystemTime::now().duration_since(*SYS_START).unwrap_or_default().as_micros() as u64
}

#[derive(Debug, Default)]
pub struct TraceInner {
readers_start: AtomicU64,
readers_time: AtomicU64,
log_time: AtomicU64,
log_value: AtomicU64,
}

impl TraceInner {
pub(super) fn mark_readers_start(&self) {
self.readers_start.store(sys_now(), Ordering::Relaxed);
}

pub(super) fn mark_readers_end(&self) {
let start = self.readers_start.load(Ordering::Relaxed);
let now = sys_now();
if start < now {
let readers_time = self.readers_time.fetch_add(now - start, Ordering::Relaxed) + now - start;
let log_time = self.log_time.load(Ordering::Relaxed);
if log_time + (Duration::from_secs(10).as_micros() as u64) < now {
let log_value = self.log_value.load(Ordering::Relaxed);
debug!(
"Semaphore: log interval: {:?}, readers time: {:?}, fraction: {:.2}",
Duration::from_micros(now - log_time),
Duration::from_micros(readers_time - log_value),
(readers_time - log_value) as f64 / (now - log_time) as f64
);
self.log_value.store(readers_time, Ordering::Relaxed);
self.log_time.store(now, Ordering::Relaxed);
}
}
}
}
}

#[cfg(feature = "semaphore-trace")]
use trace::*;

#[cfg(feature = "semaphore-trace")]
pub(crate) fn get_module_path() -> &'static str {
module_path!()
}

/// A low-level non-fair semaphore. The semaphore is non-fair in the sense that clients acquiring
/// a lower number of permits might get their allocation before earlier clients which requested more
/// permits -- if the semaphore can provide the lower allocation but not the larger. This non-fairness
Expand All @@ -15,13 +73,28 @@ use std::{
pub(crate) struct Semaphore {
counter: AtomicUsize,
signal: Event,
#[cfg(feature = "semaphore-trace")]
trace_inner: TraceInner,
}

impl Semaphore {
pub const MAX_PERMITS: usize = usize::MAX;

pub const fn new(available_permits: usize) -> Semaphore {
Semaphore { counter: AtomicUsize::new(available_permits), signal: Event::new() }
pub fn new(available_permits: usize) -> Semaphore {
cfg_if::cfg_if! {
if #[cfg(feature = "semaphore-trace")] {
Semaphore {
counter: AtomicUsize::new(available_permits),
signal: Event::new(),
trace_inner: Default::default(),
}
} else {
Semaphore {
counter: AtomicUsize::new(available_permits),
signal: Event::new(),
}
}
}
}

/// Tries to acquire `permits` slots from the semaphore. Upon success, returns the acquired slot
Expand All @@ -33,7 +106,14 @@ impl Semaphore {
}

match self.counter.compare_exchange_weak(count, count - permits, Ordering::AcqRel, Ordering::Acquire) {
Ok(_) => return Some(count),
Ok(_) => {
#[cfg(feature = "semaphore-trace")]
if permits == 1 && count == Self::MAX_PERMITS {
// permits == 1 indicates a reader, count == Self::MAX_PERMITS indicates it is the first reader
self.trace_inner.mark_readers_start();
}
return Some(count);
}
Err(c) => count = c,
}
}
Expand Down Expand Up @@ -75,6 +155,12 @@ impl Semaphore {
/// Returns the released slot
pub fn release(&self, permits: usize) -> usize {
let slot = self.counter.fetch_add(permits, Ordering::AcqRel) + permits;

#[cfg(feature = "semaphore-trace")]
if permits == 1 && slot == Self::MAX_PERMITS {
// permits == 1 indicates a reader, slot == Self::MAX_PERMITS indicates it is the last reader
self.trace_inner.mark_readers_end();
}
self.signal.notify(permits);
slot
}
Expand Down