Skip to content

Commit

Permalink
15 rc4 updates (#554)
Browse files Browse the repository at this point in the history
* metrics: fix first snapshot sample + cleanup

* Wallet SDK: scan() - fix UtxoContext processing latency during scan. Add UtxoProcessor notification lock to the scan processor.

* cleanup
  • Loading branch information
aspect authored Sep 10, 2024
1 parent d1dc5dd commit 2a99817
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 103 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion metrics/core/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,7 @@ pub fn as_data_size(bytes: f64, si: bool) -> String {
}

/// Format supplied value as a float with 2 decimal places.
fn format_as_float(f: f64, short: bool) -> String {
pub fn format_as_float(f: f64, short: bool) -> String {
if short {
if f < 1000.0 {
format_with_precision(f)
Expand Down
92 changes: 6 additions & 86 deletions metrics/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,25 @@ impl Metrics {
let interval = interval(Duration::from_secs(1));
pin_mut!(interval);

let mut first = true;

loop {
select! {
_ = task_ctl_receiver.recv().fuse() => {
break;
},
_ = interval.next().fuse() => {

// current_metrics_data = MetricsData::new(unixtime_as_millis_f64());

if let Some(rpc) = this.rpc() {
// if let Err(err) = this.sample_metrics(rpc.clone(), &mut current_metrics_data).await {
match this.sample_metrics(rpc.clone()).await {
Ok(incoming_data) => {
let last_metrics_data = current_metrics_data;
current_metrics_data = incoming_data;
this.data.lock().unwrap().replace(current_metrics_data.clone());

if let Some(sink) = this.sink() {
if first {
first = false;
} else if let Some(sink) = this.sink() {
let snapshot = MetricsSnapshot::from((&last_metrics_data, &current_metrics_data));
if let Some(future) = sink(snapshot) {
future.await.ok();
Expand All @@ -100,7 +101,6 @@ impl Metrics {

}
Err(err) => {
// current_metrics_data = last_metrics_data.clone();
log_trace!("Metrics::sample_metrics() error: {}", err);
}
}
Expand All @@ -120,87 +120,7 @@ impl Metrics {
Ok(())
}

// --- samplers

async fn sample_metrics(self: &Arc<Self>, rpc: Arc<dyn RpcApi>) -> Result<MetricsData> {
// let GetMetricsResponse {
// server_time: _,
// consensus_metrics,
// connection_metrics,
// bandwidth_metrics,
// process_metrics,
// storage_metrics,
// custom_metrics: _,
// } =
let response = rpc.get_metrics(true, true, true, true, true, false).await?;

MetricsData::try_from(response)

// if let Some(consensus_metrics) = consensus_metrics {
// data.node_blocks_submitted_count = consensus_metrics.node_blocks_submitted_count;
// data.node_headers_processed_count = consensus_metrics.node_headers_processed_count;
// data.node_dependencies_processed_count = consensus_metrics.node_dependencies_processed_count;
// data.node_bodies_processed_count = consensus_metrics.node_bodies_processed_count;
// data.node_transactions_processed_count = consensus_metrics.node_transactions_processed_count;
// data.node_chain_blocks_processed_count = consensus_metrics.node_chain_blocks_processed_count;
// data.node_mass_processed_count = consensus_metrics.node_mass_processed_count;
// // --
// data.node_database_blocks_count = consensus_metrics.node_database_blocks_count;
// data.node_database_headers_count = consensus_metrics.node_database_headers_count;
// data.network_mempool_size = consensus_metrics.network_mempool_size;
// data.network_tip_hashes_count = consensus_metrics.network_tip_hashes_count;
// data.network_difficulty = consensus_metrics.network_difficulty;
// data.network_past_median_time = consensus_metrics.network_past_median_time;
// data.network_virtual_parent_hashes_count = consensus_metrics.network_virtual_parent_hashes_count;
// data.network_virtual_daa_score = consensus_metrics.network_virtual_daa_score;
// }

// if let Some(connection_metrics) = connection_metrics {
// data.node_borsh_live_connections = connection_metrics.borsh_live_connections;
// data.node_borsh_connection_attempts = connection_metrics.borsh_connection_attempts;
// data.node_borsh_handshake_failures = connection_metrics.borsh_handshake_failures;
// data.node_json_live_connections = connection_metrics.json_live_connections;
// data.node_json_connection_attempts = connection_metrics.json_connection_attempts;
// data.node_json_handshake_failures = connection_metrics.json_handshake_failures;
// data.node_active_peers = connection_metrics.active_peers;
// }

// if let Some(bandwidth_metrics) = bandwidth_metrics {
// data.node_borsh_bytes_tx = bandwidth_metrics.borsh_bytes_tx;
// data.node_borsh_bytes_rx = bandwidth_metrics.borsh_bytes_rx;
// data.node_json_bytes_tx = bandwidth_metrics.json_bytes_tx;
// data.node_json_bytes_rx = bandwidth_metrics.json_bytes_rx;
// data.node_p2p_bytes_tx = bandwidth_metrics.p2p_bytes_tx;
// data.node_p2p_bytes_rx = bandwidth_metrics.p2p_bytes_rx;
// data.node_grpc_user_bytes_tx = bandwidth_metrics.grpc_bytes_tx;
// data.node_grpc_user_bytes_rx = bandwidth_metrics.grpc_bytes_rx;

// data.node_total_bytes_tx = bandwidth_metrics.borsh_bytes_tx
// + bandwidth_metrics.json_bytes_tx
// + bandwidth_metrics.p2p_bytes_tx
// + bandwidth_metrics.grpc_bytes_tx;

// data.node_total_bytes_rx = bandwidth_metrics.borsh_bytes_rx
// + bandwidth_metrics.json_bytes_rx
// + bandwidth_metrics.p2p_bytes_rx
// + bandwidth_metrics.grpc_bytes_rx;
// }

// if let Some(process_metrics) = process_metrics {
// data.node_resident_set_size_bytes = process_metrics.resident_set_size;
// data.node_virtual_memory_size_bytes = process_metrics.virtual_memory_size;
// data.node_cpu_cores = process_metrics.core_num;
// data.node_cpu_usage = process_metrics.cpu_usage;
// data.node_file_handles = process_metrics.fd_num;
// data.node_disk_io_read_bytes = process_metrics.disk_io_read_bytes;
// data.node_disk_io_write_bytes = process_metrics.disk_io_write_bytes;
// data.node_disk_io_read_per_sec = process_metrics.disk_io_read_per_sec;
// data.node_disk_io_write_per_sec = process_metrics.disk_io_write_per_sec;
// }

// if let Some(storage_metrics) = storage_metrics {
// data.node_storage_size_bytes = storage_metrics.storage_size_bytes;
// }
// Ok(())
MetricsData::try_from(rpc.get_metrics(true, true, true, true, true, false).await?)
}
}
4 changes: 3 additions & 1 deletion wallet/core/src/imports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ pub use crate::wallet::*;
pub use crate::{storage, utils};

pub use ahash::{AHashMap, AHashSet};
pub use async_std::sync::{Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard};
pub use async_std::sync::{
Mutex as AsyncMutex, MutexGuard as AsyncMutexGuard, RwLock as AsyncRwLock, RwLockReadGuard as AsyncRwLockReadGuard,
};
pub use async_trait::async_trait;
pub use borsh::{BorshDeserialize, BorshSerialize};
pub use cfg_if::cfg_if;
Expand Down
12 changes: 7 additions & 5 deletions wallet/core/src/utxo/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl UtxoContext {
}
Ok(())
} else {
log_warn!("ignoring duplicate utxo entry");
// log_warn!("Warning: Ignoring duplicate UTXO entry");
Ok(())
}
}
Expand Down Expand Up @@ -347,7 +347,7 @@ impl UtxoContext {
remove_mature_ids.push(id);
}
} else if context.outgoing.get(&utxo.transaction_id()).is_none() {
log_error!("Error: UTXO not found in UtxoContext map!");
// log_warm!("Warning: UTXO not found in UtxoContext map!");
}
}

Expand All @@ -374,10 +374,10 @@ impl UtxoContext {
context.mature.sorted_insert_binary_asc_by_key(utxo_entry.clone(), |entry| entry.amount_as_ref());
} else {
log_error!("Error: non-pending utxo promotion!");
unreachable!("Error: non-pending utxo promotion!");
}
}

// sanity check
if self.context().outgoing.get(&txid).is_some() {
unreachable!("Error: promotion of the outgoing transaction!");
}
Expand Down Expand Up @@ -421,7 +421,7 @@ impl UtxoContext {
let mut context = self.context();

let mut pending = vec![];
let mut mature = vec![];
let mut mature = Vec::with_capacity(utxo_entries.len());

let params = NetworkParams::from(self.processor().network_id()?);

Expand All @@ -444,14 +444,16 @@ impl UtxoContext {
}
Maturity::Confirmed => {
mature.push(utxo_entry.clone());
context.mature.sorted_insert_binary_asc_by_key(utxo_entry.clone(), |entry| entry.amount_as_ref());
}
}
} else {
log_warn!("ignoring duplicate utxo entry");
}
}

context.mature.extend(mature.iter().cloned());
context.mature.sort_by_key(|entry| entry.amount());

(pending, mature)
};

Expand Down
10 changes: 4 additions & 6 deletions wallet/core/src/utxo/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ use kaspa_rpc_core::{
notify::connection::{ChannelConnection, ChannelType},
Notification,
};
// use workflow_core::task;
// use kaspa_metrics_core::{Metrics,Metric};

pub struct Inner {
/// Coinbase UTXOs in stasis
Expand All @@ -58,7 +56,7 @@ pub struct Inner {
sync_proc: SyncMonitor,
multiplexer: Multiplexer<Box<Events>>,
wallet_bus: Option<Channel<WalletBusMessage>>,
notification_guard: AsyncMutex<()>,
notification_guard: AsyncRwLock<()>,
connect_disconnect_guard: AsyncMutex<()>,
metrics: Arc<Metrics>,
metrics_kinds: Mutex<Vec<MetricsUpdateKind>>,
Expand Down Expand Up @@ -161,8 +159,8 @@ impl UtxoProcessor {
&self.inner.multiplexer
}

pub async fn notification_lock(&self) -> AsyncMutexGuard<()> {
self.inner.notification_guard.lock().await
pub async fn notification_lock(&self) -> AsyncRwLockReadGuard<()> {
self.inner.notification_guard.read().await
}

pub fn sync_proc(&self) -> &SyncMonitor {
Expand Down Expand Up @@ -577,7 +575,7 @@ impl UtxoProcessor {
}

async fn handle_notification(&self, notification: Notification) -> Result<()> {
let _lock = self.notification_lock().await;
let _lock = self.inner.notification_guard.write().await;

match notification {
Notification::VirtualDaaScoreChanged(virtual_daa_score_changed_notification) => {
Expand Down
9 changes: 6 additions & 3 deletions wallet/core/src/utxo/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ impl Scan {
}

pub async fn scan(&self, utxo_context: &UtxoContext) -> Result<()> {
// block notifications while scanning...
let _lock = utxo_context.processor().notification_lock().await;

match &self.provider {
Provider::AddressManager(address_manager) => self.scan_with_address_manager(address_manager, utxo_context).await,
Provider::AddressSet(addresses) => self.scan_with_address_set(addresses, utxo_context).await,
Expand Down Expand Up @@ -86,9 +89,9 @@ impl Scan {

let ts = Instant::now();
let resp = utxo_context.processor().rpc_api().get_utxos_by_addresses(addresses).await?;
let elapsed_msec = ts.elapsed().as_secs_f32();
if elapsed_msec > 1.0 {
log_warn!("get_utxos_by_address() fetched {} entries in: {} msec", resp.len(), elapsed_msec);
let elapsed_sec = ts.elapsed().as_secs_f32();
if elapsed_sec > 1.0 {
log_warn!("get_utxos_by_address() fetched {} entries in: {} msec", resp.len(), elapsed_sec);
}
yield_executor().await;

Expand Down

0 comments on commit 2a99817

Please sign in to comment.