From da0dc1ef5ae41716772fd2c704cafbbf9a932dfd Mon Sep 17 00:00:00 2001 From: qima Date: Thu, 31 Oct 2024 08:14:59 +0800 Subject: [PATCH] chore(node): create a RecordCache struct --- sn_networking/src/record_store.rs | 109 ++++++++++++++++++------------ 1 file changed, 65 insertions(+), 44 deletions(-) diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 254ec6380a..18ec044eaf 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -35,7 +35,6 @@ use sn_protocol::{ storage::{RecordHeader, RecordKind, RecordType}, NetworkAddress, PrettyPrintRecordKey, }; -use std::collections::VecDeque; use std::{ borrow::Cow, collections::{HashMap, HashSet}, @@ -68,6 +67,54 @@ const MAX_STORE_COST: u64 = 1_000_000; // Min store cost for a chunk. const MIN_STORE_COST: u64 = 1; +/// FIFO simple cache of records to reduce read times +struct RecordCache { + records_cache: HashMap, + cache_size: usize, +} + +impl RecordCache { + fn new(cache_size: usize) -> Self { + RecordCache { + records_cache: HashMap::new(), + cache_size, + } + } + + fn remove(&mut self, key: &Key) -> Option<(Record, SystemTime)> { + self.records_cache.remove(key) + } + + fn get(&self, key: &Key) -> Option<&(Record, SystemTime)> { + self.records_cache.get(key) + } + + fn push_back(&mut self, key: Key, record: Record) { + self.free_up_space(); + + let _ = self.records_cache.insert(key, (record, SystemTime::now())); + } + + fn free_up_space(&mut self) { + while self.records_cache.len() >= self.cache_size { + self.remove_oldest_entry() + } + } + + fn remove_oldest_entry(&mut self) { + let mut oldest_timestamp = SystemTime::now(); + + for (_record, timestamp) in self.records_cache.values() { + if *timestamp < oldest_timestamp { + oldest_timestamp = *timestamp; + } + } + + self.records_cache + .retain(|_key, (_record, timestamp)| *timestamp != oldest_timestamp); + } +} + /// A `RecordStore` that stores records on disk. pub struct NodeRecordStore { /// The address of the peer owning the store @@ -79,10 +126,7 @@ pub struct NodeRecordStore { /// Additional index organizing records by distance bucket records_by_bucket: HashMap>, /// FIFO simple cache of records to reduce read times - records_cache: VecDeque, - /// A map from record keys to their indices in the cache - /// allowing for more efficient cache management - records_cache_map: HashMap, + records_cache: RecordCache, /// Send network events to the node layer. network_event_sender: mpsc::Sender, /// Send cmds to the network layer. Used to interact with self in an async fashion. @@ -288,8 +332,7 @@ impl NodeRecordStore { config, records, records_by_bucket: HashMap::new(), - records_cache: VecDeque::with_capacity(cache_size), - records_cache_map: HashMap::with_capacity(cache_size), + records_cache: RecordCache::new(cache_size), network_event_sender, local_swarm_cmd_sender: swarm_cmd_sender, responsible_distance_range: None, @@ -571,35 +614,22 @@ impl NodeRecordStore { let record_key = PrettyPrintRecordKey::from(&r.key).into_owned(); debug!("PUTting a verified Record: {record_key:?}"); - // if the cache already has this record in it (eg, a conflicting spend) - // remove it from the cache - // self.records_cache.retain(|record| record.key != r.key); - // Remove from cache if it already exists - if let Some(&index) = self.records_cache_map.get(key) { - if let Some(existing_record) = self.records_cache.remove(index) { - if existing_record.value == r.value { - // we actually just want to keep what we have, and can assume it's been stored properly. - - // so we put it back in the cache - self.records_cache.insert(index, existing_record); - // and exit early. - return Ok(()); - } - } - self.update_cache_indices(index); - } + // if cache already has the record : + // * if with same content, do nothing and return early + // * if with different content, remove the existing one + if let Some((existing_record, _timestamp)) = self.records_cache.remove(key) { + if existing_record.value == r.value { + // we actually just want to keep what we have, and can assume it's been stored properly. - // Store in the FIFO records cache, removing the oldest if needed - if self.records_cache.len() >= self.config.records_cache_size { - if let Some(old_record) = self.records_cache.pop_front() { - self.records_cache_map.remove(&old_record.key); + // so we put it back in the cache + self.records_cache.push_back(key.clone(), existing_record); + // and exit early. + return Ok(()); } } - // Push the new record to the back of the cache - self.records_cache.push_back(r.clone()); - self.records_cache_map - .insert(key.clone(), self.records_cache.len() - 1); + // Store the new record to the cache + self.records_cache.push_back(key.clone(), r.clone()); self.prune_records_if_needed(key)?; @@ -640,15 +670,6 @@ impl NodeRecordStore { Ok(()) } - /// Update the cache indices after removing an element - fn update_cache_indices(&mut self, start_index: usize) { - for index in start_index..self.records_cache.len() { - if let Some(record) = self.records_cache.get(index) { - self.records_cache_map.insert(record.key.clone(), index); - } - } - } - /// Calculate the cost to store data for our current store state pub(crate) fn store_cost(&self, key: &Key) -> (AttoTokens, QuotingMetrics) { let records_stored = self.records.len(); @@ -730,9 +751,9 @@ impl RecordStore for NodeRecordStore { // ignored if we don't have the record locally. let key = PrettyPrintRecordKey::from(k); - let cached_record = self.records_cache.iter().find(|r| r.key == *k); + let cached_record = self.records_cache.get(k); // first return from FIFO cache if existing there - if let Some(record) = cached_record { + if let Some((record, _timestamp)) = cached_record { return Some(Cow::Borrowed(record)); } @@ -826,7 +847,7 @@ impl RecordStore for NodeRecordStore { } } - self.records_cache.retain(|r| r.key != *k); + self.records_cache.remove(k); #[cfg(feature = "open-metrics")] if let Some(metric) = &self.record_count_metric {