From 773d0d8fc19174183ea84d6799bea493a6a08948 Mon Sep 17 00:00:00 2001 From: qima Date: Wed, 29 Jan 2025 23:46:55 +0800 Subject: [PATCH] feat(pricing): add records_per_type into QuotingMetrics --- ant-evm/src/data_payments.rs | 1 + ant-networking/src/cmd.rs | 22 +++-- ant-networking/src/record_store.rs | 101 ++++++++++++++++------ ant-networking/src/record_store_api.rs | 16 +++- ant-networking/src/replication_fetcher.rs | 16 ++-- ant-protocol/src/storage/header.rs | 9 ++ evmlib/src/quoting_metrics.rs | 6 +- evmlib/tests/payment_vault.rs | 3 + evmlib/tests/wallet.rs | 1 + 9 files changed, 129 insertions(+), 46 deletions(-) diff --git a/ant-evm/src/data_payments.rs b/ant-evm/src/data_payments.rs index b0a32a4b22..f8154f44e1 100644 --- a/ant-evm/src/data_payments.rs +++ b/ant-evm/src/data_payments.rs @@ -242,6 +242,7 @@ impl PaymentQuote { data_size: 0, data_type: 0, close_records_stored: 0, + records_per_type: vec![], max_records: 0, received_payment_count: 0, live_time: 0, diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index ec512f31f7..63059ca4e0 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -123,6 +123,7 @@ pub enum LocalSwarmCmd { AddLocalRecordAsStored { key: RecordKey, record_type: ValidationType, + data_type: DataTypes, }, /// Add a peer to the blocklist AddPeerToBlockList { @@ -235,10 +236,14 @@ impl Debug for LocalSwarmCmd { PrettyPrintRecordKey::from(key) ) } - LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } => { + LocalSwarmCmd::AddLocalRecordAsStored { + key, + record_type, + data_type, + } => { write!( f, - "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?} }}", + "LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}", PrettyPrintRecordKey::from(key) ) } @@ -778,7 +783,11 @@ impl SwarmDriver { return Err(err.into()); }; } - LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } => { + LocalSwarmCmd::AddLocalRecordAsStored { + key, + record_type, + data_type, + } => { info!( "Adding Record locally, for {:?} and {record_type:?}", PrettyPrintRecordKey::from(&key) @@ -788,7 +797,7 @@ impl SwarmDriver { .behaviour_mut() .kademlia .store_mut() - .mark_as_stored(key, record_type); + .mark_as_stored(key, record_type, data_type); // Reset counter on any success HDD write. self.hard_disk_write_error = 0; } @@ -1119,7 +1128,10 @@ impl SwarmDriver { ); let request = Request::Cmd(Cmd::Replicate { holder: NetworkAddress::from_peer(self.self_peer_id), - keys: all_records, + keys: all_records + .into_iter() + .map(|(addr, val_type, _data_type)| (addr, val_type)) + .collect(), }); for peer_id in replicate_targets { self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest { diff --git a/ant-networking/src/record_store.rs b/ant-networking/src/record_store.rs index 4018a69aa7..51538fd555 100644 --- a/ant-networking/src/record_store.rs +++ b/ant-networking/src/record_store.rs @@ -19,7 +19,7 @@ use aes_gcm_siv::{ use ant_evm::{QuotingMetrics, U256}; use ant_protocol::{ convert_distance_to_u256, - storage::{RecordHeader, RecordKind, ValidationType}, + storage::{DataTypes, RecordHeader, RecordKind, ValidationType}, NetworkAddress, PrettyPrintRecordKey, }; use hkdf::Hkdf; @@ -138,7 +138,7 @@ pub struct NodeRecordStore { /// The configuration of the store. config: NodeRecordStoreConfig, /// Main records store remains unchanged for compatibility - records: HashMap, + records: HashMap, /// Additional index organizing records by distance records_by_distance: BTreeMap, /// FIFO simple cache of records to reduce read times @@ -218,7 +218,7 @@ impl NodeRecordStore { fn update_records_from_an_existing_store( config: &NodeRecordStoreConfig, encryption_details: &(Aes256GcmSiv, [u8; 4]), - ) -> HashMap { + ) -> HashMap { let process_entry = |entry: &DirEntry| -> _ { let path = entry.path(); if path.is_file() { @@ -269,11 +269,19 @@ impl NodeRecordStore { } }; - let record_type = match RecordHeader::is_record_of_type_chunk(&record) { - Ok(true) => ValidationType::Chunk, - Ok(false) => { - let xorname_hash = XorName::from_content(&record.value); - ValidationType::NonChunk(xorname_hash) + match RecordHeader::get_data_type(&record) { + Ok(data_type) => { + let validate_type = match data_type { + DataTypes::Chunk => ValidationType::Chunk, + _ => { + let xorname_hash = XorName::from_content(&record.value); + ValidationType::NonChunk(xorname_hash) + } + }; + + let address = NetworkAddress::from_record_key(&key); + info!("Existing record {address:?} loaded from: {path:?}"); + return Some((key, (address, validate_type, data_type))); } Err(error) => { warn!( @@ -290,11 +298,7 @@ impl NodeRecordStore { } return None; } - }; - - let address = NetworkAddress::from_record_key(&key); - info!("Existing record loaded: {path:?}"); - return Some((key, (address, record_type))); + } } None }; @@ -370,7 +374,7 @@ impl NodeRecordStore { // Initialize records_by_distance let mut records_by_distance: BTreeMap = BTreeMap::new(); - for (key, (addr, _record_type)) in records.iter() { + for (key, (addr, _record_type, _data_type)) in records.iter() { let distance = convert_distance_to_u256(&local_address.distance(addr)); let _ = records_by_distance.insert(distance, key.clone()); } @@ -588,26 +592,35 @@ impl NodeRecordStore { pub(crate) fn record_addresses(&self) -> HashMap { self.records .iter() - .map(|(_record_key, (addr, record_type))| (addr.clone(), record_type.clone())) + .map(|(_record_key, (addr, record_type, _data_type))| { + (addr.clone(), record_type.clone()) + }) .collect() } /// Returns the reference to the set of `NetworkAddress::RecordKey` held by the store - pub(crate) fn record_addresses_ref(&self) -> &HashMap { + pub(crate) fn record_addresses_ref( + &self, + ) -> &HashMap { &self.records } /// The follow up to `put_verified`, this only registers the RecordKey /// in the RecordStore records set. After this it should be safe /// to return the record as stored. - pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: ValidationType) { + pub(crate) fn mark_as_stored( + &mut self, + key: Key, + validate_type: ValidationType, + data_type: DataTypes, + ) { let addr = NetworkAddress::from_record_key(&key); let distance = self.local_address.distance(&addr); let distance_u256 = convert_distance_to_u256(&distance); // Update main records store self.records - .insert(key.clone(), (addr.clone(), record_type)); + .insert(key.clone(), (addr.clone(), validate_type, data_type)); // Update bucket index let _ = self.records_by_distance.insert(distance_u256, key.clone()); @@ -686,13 +699,26 @@ impl NodeRecordStore { let record_key2 = record_key.clone(); spawn(async move { let key = r.key.clone(); + let data_type = match RecordHeader::get_data_type(&r) { + Ok(data_type) => data_type, + Err(err) => { + error!( + "Error get data_type of record {record_key2:?} filename: {filename}, error: {err:?}" + ); + return; + } + }; if let Some(bytes) = Self::prepare_record_bytes(r, encryption_details) { let cmd = match fs::write(&file_path, bytes) { Ok(_) => { // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues): info!("Wrote record {record_key2:?} to disk! filename: {filename}"); - LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } + LocalSwarmCmd::AddLocalRecordAsStored { + key, + record_type, + data_type, + } } Err(err) => { error!( @@ -719,6 +745,7 @@ impl NodeRecordStore { network_size: Option, ) -> (QuotingMetrics, bool) { let records_stored = self.records.len(); + let records_per_type = self.records_per_type(); let live_time = if let Ok(elapsed) = self.timestamp.elapsed() { elapsed.as_secs() @@ -730,6 +757,7 @@ impl NodeRecordStore { data_type, data_size, close_records_stored: records_stored, + records_per_type, max_records: self.config.max_records, received_payment_count: self.received_payment_count, live_time, @@ -780,6 +808,14 @@ impl NodeRecordStore { pub(crate) fn set_responsible_distance_range(&mut self, responsible_distance: U256) { self.responsible_distance_range = Some(responsible_distance); } + + fn records_per_type(&self) -> Vec<(u32, u32)> { + let mut map = BTreeMap::new(); + for (_, _, data_type) in self.records.values() { + *map.entry(data_type.get_index()).or_insert(0) += 1; + } + map.into_iter().collect() + } } impl RecordStore for NodeRecordStore { @@ -834,11 +870,15 @@ impl RecordStore for NodeRecordStore { // otherwise shall be passed further to allow different version of nonchunk // to be detected or updated. match self.records.get(&record.key) { - Some((_addr, ValidationType::Chunk)) => { + Some((_addr, ValidationType::Chunk, _data_type)) => { debug!("Chunk {record_key:?} already exists."); return Ok(()); } - Some((_addr, ValidationType::NonChunk(existing_content_hash))) => { + Some(( + _addr, + ValidationType::NonChunk(existing_content_hash), + _data_type, + )) => { let content_hash = XorName::from_content(&record.value); if content_hash == *existing_content_hash { debug!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists."); @@ -873,7 +913,7 @@ impl RecordStore for NodeRecordStore { fn remove(&mut self, k: &Key) { // Remove from main store - if let Some((addr, _)) = self.records.remove(k) { + if let Some((addr, _, _)) = self.records.remove(k) { let distance = convert_distance_to_u256(&self.local_address.distance(&addr)); let _ = self.records_by_distance.remove(&distance); } @@ -1072,7 +1112,7 @@ mod tests { // We must also mark the record as stored (which would be triggered after the async write in nodes // via NetworkEvent::CompletedWrite) - store.mark_as_stored(returned_record_key, ValidationType::Chunk); + store.mark_as_stored(returned_record_key, ValidationType::Chunk, DataTypes::Chunk); // loop over store.get max_iterations times to ensure async disk write had time to complete. let max_iterations = 10; @@ -1149,8 +1189,12 @@ mod tests { // Wait for the async write operation to complete if let Some(cmd) = swarm_cmd_receiver.recv().await { match cmd { - LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } => { - store.mark_as_stored(key, record_type); + LocalSwarmCmd::AddLocalRecordAsStored { + key, + record_type, + data_type, + } => { + store.mark_as_stored(key, record_type, data_type); } _ => panic!("Unexpected command received"), } @@ -1248,7 +1292,7 @@ mod tests { .is_ok()); // Mark as stored (simulating the CompletedWrite event) - store.mark_as_stored(record.key.clone(), ValidationType::Chunk); + store.mark_as_stored(record.key.clone(), ValidationType::Chunk, DataTypes::Chunk); // Verify the chunk is stored let stored_record = store.get(&record.key); @@ -1322,6 +1366,7 @@ mod tests { store.mark_as_stored( record.key.clone(), ValidationType::NonChunk(XorName::from_content(&record.value)), + DataTypes::Scratchpad, ); // Verify the scratchpad is stored @@ -1416,7 +1461,7 @@ mod tests { } else { // We must also mark the record as stored (which would be triggered // after the async write in nodes via NetworkEvent::CompletedWrite) - store.mark_as_stored(record_key.clone(), ValidationType::Chunk); + store.mark_as_stored(record_key.clone(), ValidationType::Chunk, DataTypes::Chunk); println!("success sotred len: {:?} ", store.record_addresses().len()); stored_records_at_some_point.push(record_key.clone()); @@ -1532,7 +1577,7 @@ mod tests { assert!(store.put_verified(record, ValidationType::Chunk).is_ok()); // We must also mark the record as stored (which would be triggered after the async write in nodes // via NetworkEvent::CompletedWrite) - store.mark_as_stored(record_key.clone(), ValidationType::Chunk); + store.mark_as_stored(record_key.clone(), ValidationType::Chunk, DataTypes::Chunk); stored_records.push(record_key.clone()); stored_records.sort_by(|a, b| { diff --git a/ant-networking/src/record_store_api.rs b/ant-networking/src/record_store_api.rs index 88d44735b4..777eff2779 100644 --- a/ant-networking/src/record_store_api.rs +++ b/ant-networking/src/record_store_api.rs @@ -10,7 +10,10 @@ use crate::error::{NetworkError, Result}; use crate::record_store::{ClientRecordStore, NodeRecordStore}; use ant_evm::{QuotingMetrics, U256}; -use ant_protocol::{storage::ValidationType, NetworkAddress}; +use ant_protocol::{ + storage::{DataTypes, ValidationType}, + NetworkAddress, +}; use libp2p::kad::{store::RecordStore, ProviderRecord, Record, RecordKey}; use std::{borrow::Cow, collections::HashMap}; @@ -103,7 +106,7 @@ impl UnifiedRecordStore { pub(crate) fn record_addresses_ref( &self, - ) -> Result<&HashMap> { + ) -> Result<&HashMap> { match self { Self::Client(_) => { error!("Calling record_addresses_ref at Client. This should not happen"); @@ -185,12 +188,17 @@ impl UnifiedRecordStore { /// Mark the record as stored in the store. /// This adds it to records set, so it can now be retrieved /// (to be done after writes are finalised) - pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: ValidationType) { + pub(crate) fn mark_as_stored( + &mut self, + k: RecordKey, + record_type: ValidationType, + data_type: DataTypes, + ) { match self { Self::Client(_) => { error!("Calling mark_as_stored at Client. This should not happen"); } - Self::Node(store) => store.mark_as_stored(k, record_type), + Self::Node(store) => store.mark_as_stored(k, record_type, data_type), }; } diff --git a/ant-networking/src/replication_fetcher.rs b/ant-networking/src/replication_fetcher.rs index 142e3565f7..99c41856bf 100644 --- a/ant-networking/src/replication_fetcher.rs +++ b/ant-networking/src/replication_fetcher.rs @@ -11,7 +11,9 @@ use crate::time::spawn; use crate::{event::NetworkEvent, time::Instant, CLOSE_GROUP_SIZE}; use ant_evm::U256; use ant_protocol::{ - convert_distance_to_u256, storage::ValidationType, NetworkAddress, PrettyPrintRecordKey, + convert_distance_to_u256, + storage::{DataTypes, ValidationType}, + NetworkAddress, PrettyPrintRecordKey, }; use libp2p::{ kad::{KBucketDistance as Distance, RecordKey, K_VALUE}, @@ -88,7 +90,7 @@ impl ReplicationFetcher { &mut self, holder: PeerId, incoming_keys: Vec<(NetworkAddress, ValidationType)>, - locally_stored_keys: &HashMap, + locally_stored_keys: &HashMap, is_fresh_replicate: bool, closest_k_peers: Vec, ) -> Vec<(PeerId, RecordKey)> { @@ -328,7 +330,7 @@ impl ReplicationFetcher { &mut self, holder: &PeerId, incoming_keys: Vec<(NetworkAddress, ValidationType)>, - locally_stored_keys: &HashMap, + locally_stored_keys: &HashMap, closest_k_peers: Vec, ) -> Vec<(PeerId, NetworkAddress, ValidationType)> { match self.is_peer_trustworthy(holder) { @@ -422,7 +424,7 @@ impl ReplicationFetcher { &mut self, holder: &PeerId, incoming_keys: Vec<(NetworkAddress, ValidationType)>, - locally_stored_keys: &HashMap, + locally_stored_keys: &HashMap, mut closest_k_peers: Vec, ) -> Vec<(NetworkAddress, ValidationType)> { // Pre-calculate self_address since it's used multiple times @@ -552,10 +554,10 @@ impl ReplicationFetcher { /// This checks the hash on GraphEntry to ensure we pull in divergent GraphEntry. fn remove_stored_keys( &mut self, - existing_keys: &HashMap, + existing_keys: &HashMap, ) { self.to_be_fetched.retain(|(key, t, _), _| { - if let Some((_addr, record_type)) = existing_keys.get(key) { + if let Some((_addr, record_type, _data_type)) = existing_keys.get(key) { // check the address only against similar record types t != record_type } else { @@ -563,7 +565,7 @@ impl ReplicationFetcher { } }); self.on_going_fetches.retain(|(key, t), _| { - if let Some((_addr, record_type)) = existing_keys.get(key) { + if let Some((_addr, record_type, _data_type)) = existing_keys.get(key) { // check the address only against similar record types t != record_type } else { diff --git a/ant-protocol/src/storage/header.rs b/ant-protocol/src/storage/header.rs index 96c20ca71c..d932f1f19a 100644 --- a/ant-protocol/src/storage/header.rs +++ b/ant-protocol/src/storage/header.rs @@ -152,6 +152,15 @@ impl RecordHeader { let kind = Self::from_record(record)?.kind; Ok(kind == RecordKind::DataOnly(DataTypes::Chunk)) } + + pub fn get_data_type(record: &Record) -> Result { + let kind = Self::from_record(record)?.kind; + match kind { + RecordKind::DataOnly(data_type) | RecordKind::DataWithPayment(data_type) => { + Ok(data_type) + } + } + } } /// Utility to deserialize a `KAD::Record` into any type. diff --git a/evmlib/src/quoting_metrics.rs b/evmlib/src/quoting_metrics.rs index 4042688a4b..be383042b9 100644 --- a/evmlib/src/quoting_metrics.rs +++ b/evmlib/src/quoting_metrics.rs @@ -19,6 +19,8 @@ pub struct QuotingMetrics { pub data_size: usize, /// the records stored pub close_records_stored: usize, + /// each entry to be `(data_type_index, num_of_records_of_that_type)` + pub records_per_type: Vec<(u32, u32)>, /// the max_records configured pub max_records: usize, /// number of times that got paid @@ -36,7 +38,7 @@ impl Debug for QuotingMetrics { fn fmt(&self, formatter: &mut Formatter) -> FmtResult { let density_u256 = self.network_density.map(U256::from_be_bytes); - write!(formatter, "QuotingMetrics {{ data_type: {}, data_size: {}, close_records_stored: {}, max_records: {}, received_payment_count: {}, live_time: {}, network_density: {density_u256:?}, network_size: {:?} }}", - self.data_type, self.data_size, self.close_records_stored, self.max_records, self.received_payment_count, self.live_time, self.network_size) + write!(formatter, "QuotingMetrics {{ data_type: {}, data_size: {}, close_records_stored: {}, records_per_type {:?}, max_records: {}, received_payment_count: {}, live_time: {}, network_density: {density_u256:?}, network_size: {:?} }}", + self.data_type, self.data_size, self.close_records_stored, self.records_per_type, self.max_records, self.received_payment_count, self.live_time, self.network_size) } } diff --git a/evmlib/tests/payment_vault.rs b/evmlib/tests/payment_vault.rs index 24f5a0eede..e79d9a4b4c 100644 --- a/evmlib/tests/payment_vault.rs +++ b/evmlib/tests/payment_vault.rs @@ -126,6 +126,7 @@ async fn test_proxy_reachable_on_arb_sepolia() { data_size: 0, data_type: 0, close_records_stored: 0, + records_per_type: vec![], max_records: 0, received_payment_count: 0, live_time: 0, @@ -148,6 +149,7 @@ async fn test_get_quote_on_arb_sepolia_test() { data_type: 1, // a GraphEntry record data_size: 100, close_records_stored: 10, + records_per_type: vec![(0, 5), (1, 5)], max_records: 16 * 1024, received_payment_count: 0, live_time: 1400, @@ -222,6 +224,7 @@ async fn test_verify_payment_on_local() { data_size: 0, data_type: 0, close_records_stored: 0, + records_per_type: vec![], max_records: 0, received_payment_count: 0, live_time: 0, diff --git a/evmlib/tests/wallet.rs b/evmlib/tests/wallet.rs index 8122bda952..6713879279 100644 --- a/evmlib/tests/wallet.rs +++ b/evmlib/tests/wallet.rs @@ -96,6 +96,7 @@ async fn test_pay_for_quotes_and_data_payment_verification() { data_size: 0, data_type: 0, close_records_stored: 0, + records_per_type: vec![], max_records: 0, received_payment_count: 0, live_time: 0,