Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(node): create a RecordCache struct #2368

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 65 additions & 44 deletions sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use sn_protocol::{
storage::{RecordHeader, RecordKind, RecordType},
NetworkAddress, PrettyPrintRecordKey,
};
use std::collections::VecDeque;
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -68,6 +67,54 @@ const MAX_STORE_COST: u64 = 1_000_000;
// Min store cost for a chunk.
const MIN_STORE_COST: u64 = 1;

/// FIFO simple cache of records to reduce read times
struct RecordCache {
records_cache: HashMap<Key, (Record, SystemTime)>,
cache_size: usize,
}

impl RecordCache {
fn new(cache_size: usize) -> Self {
RecordCache {
records_cache: HashMap::new(),
cache_size,
}
}

fn remove(&mut self, key: &Key) -> Option<(Record, SystemTime)> {
self.records_cache.remove(key)
}

fn get(&self, key: &Key) -> Option<&(Record, SystemTime)> {
self.records_cache.get(key)
}

fn push_back(&mut self, key: Key, record: Record) {
self.free_up_space();

let _ = self.records_cache.insert(key, (record, SystemTime::now()));
}

fn free_up_space(&mut self) {
while self.records_cache.len() >= self.cache_size {
self.remove_oldest_entry()
}
}

fn remove_oldest_entry(&mut self) {
let mut oldest_timestamp = SystemTime::now();

for (_record, timestamp) in self.records_cache.values() {
if *timestamp < oldest_timestamp {
oldest_timestamp = *timestamp;
}
}

self.records_cache
.retain(|_key, (_record, timestamp)| *timestamp != oldest_timestamp);
}
}

/// A `RecordStore` that stores records on disk.
pub struct NodeRecordStore {
/// The address of the peer owning the store
Expand All @@ -79,10 +126,7 @@ pub struct NodeRecordStore {
/// Additional index organizing records by distance bucket
records_by_bucket: HashMap<u32, HashSet<Key>>,
/// FIFO simple cache of records to reduce read times
records_cache: VecDeque<Record>,
/// A map from record keys to their indices in the cache
/// allowing for more efficient cache management
records_cache_map: HashMap<Key, usize>,
records_cache: RecordCache,
/// Send network events to the node layer.
network_event_sender: mpsc::Sender<NetworkEvent>,
/// Send cmds to the network layer. Used to interact with self in an async fashion.
Expand Down Expand Up @@ -288,8 +332,7 @@ impl NodeRecordStore {
config,
records,
records_by_bucket: HashMap::new(),
records_cache: VecDeque::with_capacity(cache_size),
records_cache_map: HashMap::with_capacity(cache_size),
records_cache: RecordCache::new(cache_size),
network_event_sender,
local_swarm_cmd_sender: swarm_cmd_sender,
responsible_distance_range: None,
Expand Down Expand Up @@ -571,35 +614,22 @@ impl NodeRecordStore {
let record_key = PrettyPrintRecordKey::from(&r.key).into_owned();
debug!("PUTting a verified Record: {record_key:?}");

// if the cache already has this record in it (eg, a conflicting spend)
// remove it from the cache
// self.records_cache.retain(|record| record.key != r.key);
// Remove from cache if it already exists
if let Some(&index) = self.records_cache_map.get(key) {
if let Some(existing_record) = self.records_cache.remove(index) {
if existing_record.value == r.value {
// we actually just want to keep what we have, and can assume it's been stored properly.

// so we put it back in the cache
self.records_cache.insert(index, existing_record);
// and exit early.
return Ok(());
}
}
self.update_cache_indices(index);
}
// if cache already has the record :
// * if with same content, do nothing and return early
// * if with different content, remove the existing one
if let Some((existing_record, _timestamp)) = self.records_cache.remove(key) {
if existing_record.value == r.value {
// we actually just want to keep what we have, and can assume it's been stored properly.

// Store in the FIFO records cache, removing the oldest if needed
if self.records_cache.len() >= self.config.records_cache_size {
if let Some(old_record) = self.records_cache.pop_front() {
self.records_cache_map.remove(&old_record.key);
// so we put it back in the cache
self.records_cache.push_back(key.clone(), existing_record);
// and exit early.
return Ok(());
}
}

// Push the new record to the back of the cache
self.records_cache.push_back(r.clone());
self.records_cache_map
.insert(key.clone(), self.records_cache.len() - 1);
// Store the new record to the cache
self.records_cache.push_back(key.clone(), r.clone());

self.prune_records_if_needed(key)?;

Expand Down Expand Up @@ -640,15 +670,6 @@ impl NodeRecordStore {
Ok(())
}

/// Update the cache indices after removing an element
fn update_cache_indices(&mut self, start_index: usize) {
for index in start_index..self.records_cache.len() {
if let Some(record) = self.records_cache.get(index) {
self.records_cache_map.insert(record.key.clone(), index);
}
}
}

/// Calculate the cost to store data for our current store state
pub(crate) fn store_cost(&self, key: &Key) -> (AttoTokens, QuotingMetrics) {
let records_stored = self.records.len();
Expand Down Expand Up @@ -730,9 +751,9 @@ impl RecordStore for NodeRecordStore {
// ignored if we don't have the record locally.
let key = PrettyPrintRecordKey::from(k);

let cached_record = self.records_cache.iter().find(|r| r.key == *k);
let cached_record = self.records_cache.get(k);
// first return from FIFO cache if existing there
if let Some(record) = cached_record {
if let Some((record, _timestamp)) = cached_record {
return Some(Cow::Borrowed(record));
}

Expand Down Expand Up @@ -826,7 +847,7 @@ impl RecordStore for NodeRecordStore {
}
}

self.records_cache.retain(|r| r.key != *k);
self.records_cache.remove(k);

#[cfg(feature = "open-metrics")]
if let Some(metric) = &self.record_count_metric {
Expand Down
Loading