Skip to content

Commit

Permalink
Merge pull request maidsafe#2680 from maqi/total_cost_unit
Browse files Browse the repository at this point in the history
feat(pricing): add records_per_type into QuotingMetrics
  • Loading branch information
maqi authored Jan 30, 2025
2 parents c5bd1f8 + 773d0d8 commit adb8930
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 46 deletions.
1 change: 1 addition & 0 deletions ant-evm/src/data_payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ impl PaymentQuote {
data_size: 0,
data_type: 0,
close_records_stored: 0,
records_per_type: vec![],
max_records: 0,
received_payment_count: 0,
live_time: 0,
Expand Down
22 changes: 17 additions & 5 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub enum LocalSwarmCmd {
AddLocalRecordAsStored {
key: RecordKey,
record_type: ValidationType,
data_type: DataTypes,
},
/// Add a peer to the blocklist
AddPeerToBlockList {
Expand Down Expand Up @@ -235,10 +236,14 @@ impl Debug for LocalSwarmCmd {
PrettyPrintRecordKey::from(key)
)
}
LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } => {
LocalSwarmCmd::AddLocalRecordAsStored {
key,
record_type,
data_type,
} => {
write!(
f,
"LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?} }}",
"LocalSwarmCmd::AddLocalRecordAsStored {{ key: {:?}, record_type: {record_type:?}, data_type: {data_type:?} }}",
PrettyPrintRecordKey::from(key)
)
}
Expand Down Expand Up @@ -778,7 +783,11 @@ impl SwarmDriver {
return Err(err.into());
};
}
LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } => {
LocalSwarmCmd::AddLocalRecordAsStored {
key,
record_type,
data_type,
} => {
info!(
"Adding Record locally, for {:?} and {record_type:?}",
PrettyPrintRecordKey::from(&key)
Expand All @@ -788,7 +797,7 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.mark_as_stored(key, record_type);
.mark_as_stored(key, record_type, data_type);
// Reset counter on any success HDD write.
self.hard_disk_write_error = 0;
}
Expand Down Expand Up @@ -1119,7 +1128,10 @@ impl SwarmDriver {
);
let request = Request::Cmd(Cmd::Replicate {
holder: NetworkAddress::from_peer(self.self_peer_id),
keys: all_records,
keys: all_records
.into_iter()
.map(|(addr, val_type, _data_type)| (addr, val_type))
.collect(),
});
for peer_id in replicate_targets {
self.queue_network_swarm_cmd(NetworkSwarmCmd::SendRequest {
Expand Down
101 changes: 73 additions & 28 deletions ant-networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use aes_gcm_siv::{
use ant_evm::{QuotingMetrics, U256};
use ant_protocol::{
convert_distance_to_u256,
storage::{RecordHeader, RecordKind, ValidationType},
storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
NetworkAddress, PrettyPrintRecordKey,
};
use hkdf::Hkdf;
Expand Down Expand Up @@ -138,7 +138,7 @@ pub struct NodeRecordStore {
/// The configuration of the store.
config: NodeRecordStoreConfig,
/// Main records store remains unchanged for compatibility
records: HashMap<Key, (NetworkAddress, ValidationType)>,
records: HashMap<Key, (NetworkAddress, ValidationType, DataTypes)>,
/// Additional index organizing records by distance
records_by_distance: BTreeMap<U256, Key>,
/// FIFO simple cache of records to reduce read times
Expand Down Expand Up @@ -218,7 +218,7 @@ impl NodeRecordStore {
fn update_records_from_an_existing_store(
config: &NodeRecordStoreConfig,
encryption_details: &(Aes256GcmSiv, [u8; 4]),
) -> HashMap<Key, (NetworkAddress, ValidationType)> {
) -> HashMap<Key, (NetworkAddress, ValidationType, DataTypes)> {
let process_entry = |entry: &DirEntry| -> _ {
let path = entry.path();
if path.is_file() {
Expand Down Expand Up @@ -269,11 +269,19 @@ impl NodeRecordStore {
}
};

let record_type = match RecordHeader::is_record_of_type_chunk(&record) {
Ok(true) => ValidationType::Chunk,
Ok(false) => {
let xorname_hash = XorName::from_content(&record.value);
ValidationType::NonChunk(xorname_hash)
match RecordHeader::get_data_type(&record) {
Ok(data_type) => {
let validate_type = match data_type {
DataTypes::Chunk => ValidationType::Chunk,
_ => {
let xorname_hash = XorName::from_content(&record.value);
ValidationType::NonChunk(xorname_hash)
}
};

let address = NetworkAddress::from_record_key(&key);
info!("Existing record {address:?} loaded from: {path:?}");
return Some((key, (address, validate_type, data_type)));
}
Err(error) => {
warn!(
Expand All @@ -290,11 +298,7 @@ impl NodeRecordStore {
}
return None;
}
};

let address = NetworkAddress::from_record_key(&key);
info!("Existing record loaded: {path:?}");
return Some((key, (address, record_type)));
}
}
None
};
Expand Down Expand Up @@ -370,7 +374,7 @@ impl NodeRecordStore {

// Initialize records_by_distance
let mut records_by_distance: BTreeMap<U256, Key> = BTreeMap::new();
for (key, (addr, _record_type)) in records.iter() {
for (key, (addr, _record_type, _data_type)) in records.iter() {
let distance = convert_distance_to_u256(&local_address.distance(addr));
let _ = records_by_distance.insert(distance, key.clone());
}
Expand Down Expand Up @@ -588,26 +592,35 @@ impl NodeRecordStore {
pub(crate) fn record_addresses(&self) -> HashMap<NetworkAddress, ValidationType> {
self.records
.iter()
.map(|(_record_key, (addr, record_type))| (addr.clone(), record_type.clone()))
.map(|(_record_key, (addr, record_type, _data_type))| {
(addr.clone(), record_type.clone())
})
.collect()
}

/// Returns the reference to the set of `NetworkAddress::RecordKey` held by the store
pub(crate) fn record_addresses_ref(&self) -> &HashMap<Key, (NetworkAddress, ValidationType)> {
pub(crate) fn record_addresses_ref(
&self,
) -> &HashMap<Key, (NetworkAddress, ValidationType, DataTypes)> {
&self.records
}

/// The follow up to `put_verified`, this only registers the RecordKey
/// in the RecordStore records set. After this it should be safe
/// to return the record as stored.
pub(crate) fn mark_as_stored(&mut self, key: Key, record_type: ValidationType) {
pub(crate) fn mark_as_stored(
&mut self,
key: Key,
validate_type: ValidationType,
data_type: DataTypes,
) {
let addr = NetworkAddress::from_record_key(&key);
let distance = self.local_address.distance(&addr);
let distance_u256 = convert_distance_to_u256(&distance);

// Update main records store
self.records
.insert(key.clone(), (addr.clone(), record_type));
.insert(key.clone(), (addr.clone(), validate_type, data_type));

// Update bucket index
let _ = self.records_by_distance.insert(distance_u256, key.clone());
Expand Down Expand Up @@ -686,13 +699,26 @@ impl NodeRecordStore {
let record_key2 = record_key.clone();
spawn(async move {
let key = r.key.clone();
let data_type = match RecordHeader::get_data_type(&r) {
Ok(data_type) => data_type,
Err(err) => {
error!(
"Error get data_type of record {record_key2:?} filename: {filename}, error: {err:?}"
);
return;
}
};
if let Some(bytes) = Self::prepare_record_bytes(r, encryption_details) {
let cmd = match fs::write(&file_path, bytes) {
Ok(_) => {
// vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues):
info!("Wrote record {record_key2:?} to disk! filename: {filename}");

LocalSwarmCmd::AddLocalRecordAsStored { key, record_type }
LocalSwarmCmd::AddLocalRecordAsStored {
key,
record_type,
data_type,
}
}
Err(err) => {
error!(
Expand All @@ -719,6 +745,7 @@ impl NodeRecordStore {
network_size: Option<u64>,
) -> (QuotingMetrics, bool) {
let records_stored = self.records.len();
let records_per_type = self.records_per_type();

let live_time = if let Ok(elapsed) = self.timestamp.elapsed() {
elapsed.as_secs()
Expand All @@ -730,6 +757,7 @@ impl NodeRecordStore {
data_type,
data_size,
close_records_stored: records_stored,
records_per_type,
max_records: self.config.max_records,
received_payment_count: self.received_payment_count,
live_time,
Expand Down Expand Up @@ -780,6 +808,14 @@ impl NodeRecordStore {
pub(crate) fn set_responsible_distance_range(&mut self, responsible_distance: U256) {
self.responsible_distance_range = Some(responsible_distance);
}

fn records_per_type(&self) -> Vec<(u32, u32)> {
let mut map = BTreeMap::new();
for (_, _, data_type) in self.records.values() {
*map.entry(data_type.get_index()).or_insert(0) += 1;
}
map.into_iter().collect()
}
}

impl RecordStore for NodeRecordStore {
Expand Down Expand Up @@ -834,11 +870,15 @@ impl RecordStore for NodeRecordStore {
// otherwise shall be passed further to allow different version of nonchunk
// to be detected or updated.
match self.records.get(&record.key) {
Some((_addr, ValidationType::Chunk)) => {
Some((_addr, ValidationType::Chunk, _data_type)) => {
debug!("Chunk {record_key:?} already exists.");
return Ok(());
}
Some((_addr, ValidationType::NonChunk(existing_content_hash))) => {
Some((
_addr,
ValidationType::NonChunk(existing_content_hash),
_data_type,
)) => {
let content_hash = XorName::from_content(&record.value);
if content_hash == *existing_content_hash {
debug!("A non-chunk record {record_key:?} with same content_hash {content_hash:?} already exists.");
Expand Down Expand Up @@ -873,7 +913,7 @@ impl RecordStore for NodeRecordStore {

fn remove(&mut self, k: &Key) {
// Remove from main store
if let Some((addr, _)) = self.records.remove(k) {
if let Some((addr, _, _)) = self.records.remove(k) {
let distance = convert_distance_to_u256(&self.local_address.distance(&addr));
let _ = self.records_by_distance.remove(&distance);
}
Expand Down Expand Up @@ -1072,7 +1112,7 @@ mod tests {

// We must also mark the record as stored (which would be triggered after the async write in nodes
// via NetworkEvent::CompletedWrite)
store.mark_as_stored(returned_record_key, ValidationType::Chunk);
store.mark_as_stored(returned_record_key, ValidationType::Chunk, DataTypes::Chunk);

// loop over store.get max_iterations times to ensure async disk write had time to complete.
let max_iterations = 10;
Expand Down Expand Up @@ -1149,8 +1189,12 @@ mod tests {
// Wait for the async write operation to complete
if let Some(cmd) = swarm_cmd_receiver.recv().await {
match cmd {
LocalSwarmCmd::AddLocalRecordAsStored { key, record_type } => {
store.mark_as_stored(key, record_type);
LocalSwarmCmd::AddLocalRecordAsStored {
key,
record_type,
data_type,
} => {
store.mark_as_stored(key, record_type, data_type);
}
_ => panic!("Unexpected command received"),
}
Expand Down Expand Up @@ -1248,7 +1292,7 @@ mod tests {
.is_ok());

// Mark as stored (simulating the CompletedWrite event)
store.mark_as_stored(record.key.clone(), ValidationType::Chunk);
store.mark_as_stored(record.key.clone(), ValidationType::Chunk, DataTypes::Chunk);

// Verify the chunk is stored
let stored_record = store.get(&record.key);
Expand Down Expand Up @@ -1322,6 +1366,7 @@ mod tests {
store.mark_as_stored(
record.key.clone(),
ValidationType::NonChunk(XorName::from_content(&record.value)),
DataTypes::Scratchpad,
);

// Verify the scratchpad is stored
Expand Down Expand Up @@ -1416,7 +1461,7 @@ mod tests {
} else {
// We must also mark the record as stored (which would be triggered
// after the async write in nodes via NetworkEvent::CompletedWrite)
store.mark_as_stored(record_key.clone(), ValidationType::Chunk);
store.mark_as_stored(record_key.clone(), ValidationType::Chunk, DataTypes::Chunk);

println!("success sotred len: {:?} ", store.record_addresses().len());
stored_records_at_some_point.push(record_key.clone());
Expand Down Expand Up @@ -1532,7 +1577,7 @@ mod tests {
assert!(store.put_verified(record, ValidationType::Chunk).is_ok());
// We must also mark the record as stored (which would be triggered after the async write in nodes
// via NetworkEvent::CompletedWrite)
store.mark_as_stored(record_key.clone(), ValidationType::Chunk);
store.mark_as_stored(record_key.clone(), ValidationType::Chunk, DataTypes::Chunk);

stored_records.push(record_key.clone());
stored_records.sort_by(|a, b| {
Expand Down
16 changes: 12 additions & 4 deletions ant-networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
use crate::error::{NetworkError, Result};
use crate::record_store::{ClientRecordStore, NodeRecordStore};
use ant_evm::{QuotingMetrics, U256};
use ant_protocol::{storage::ValidationType, NetworkAddress};
use ant_protocol::{
storage::{DataTypes, ValidationType},
NetworkAddress,
};
use libp2p::kad::{store::RecordStore, ProviderRecord, Record, RecordKey};
use std::{borrow::Cow, collections::HashMap};

Expand Down Expand Up @@ -103,7 +106,7 @@ impl UnifiedRecordStore {

pub(crate) fn record_addresses_ref(
&self,
) -> Result<&HashMap<RecordKey, (NetworkAddress, ValidationType)>> {
) -> Result<&HashMap<RecordKey, (NetworkAddress, ValidationType, DataTypes)>> {
match self {
Self::Client(_) => {
error!("Calling record_addresses_ref at Client. This should not happen");
Expand Down Expand Up @@ -185,12 +188,17 @@ impl UnifiedRecordStore {
/// Mark the record as stored in the store.
/// This adds it to records set, so it can now be retrieved
/// (to be done after writes are finalised)
pub(crate) fn mark_as_stored(&mut self, k: RecordKey, record_type: ValidationType) {
pub(crate) fn mark_as_stored(
&mut self,
k: RecordKey,
record_type: ValidationType,
data_type: DataTypes,
) {
match self {
Self::Client(_) => {
error!("Calling mark_as_stored at Client. This should not happen");
}
Self::Node(store) => store.mark_as_stored(k, record_type),
Self::Node(store) => store.mark_as_stored(k, record_type, data_type),
};
}

Expand Down
Loading

0 comments on commit adb8930

Please sign in to comment.