Skip to content

Commit

Permalink
boilerplate for fs caching
Browse files Browse the repository at this point in the history
Signed-off-by: wadeking98 <wkingnumber2@gmail.com>
  • Loading branch information
wadeking98 committed Jan 23, 2024
1 parent 5cb1297 commit 98722bb
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 60 deletions.
1 change: 1 addition & 0 deletions libindy_vdr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ url = "2.2.2"
zmq = "0.9"
async-trait = "0.1.77"
async-lock = "3.3.0"
sled = "0.34.7"

[dev-dependencies]
rstest = "0.18"
Expand Down
6 changes: 6 additions & 0 deletions libindy_vdr/src/pool/cache/fscache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use sled;

pub fn test() {
let sled = sled::open("my_db").unwrap();
sled.insert(b"yo!", vec![1, 2, 3]).unwrap();
}
154 changes: 109 additions & 45 deletions libindy_vdr/src/pool/cache/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,89 @@
use serde::{de::DeserializeOwned, Serialize};
use sled::{self, IVec, Tree};
use std::{
collections::{BTreeMap, HashMap},
hash::Hash,
};

pub trait OrderedStore<O, V>: Send + Sync {
fn len(&self) -> usize;
fn first_key_value(&self) -> Option<(O, V)>;
fn last_key_value(&self) -> Option<(O, V)>;
fn get(&self, key: &O) -> Option<V>;
fn insert(&mut self, key: O, value: V) -> Option<V>;
fn remove(&mut self, key: &O) -> Option<V>;
}
impl<V: Serialize + DeserializeOwned> OrderedStore<IVec, V> for Tree {
fn len(&self) -> usize {
Tree::len(self)
}
fn first_key_value(&self) -> Option<(IVec, V)> {
match self.first() {
Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| (k, v)),
_ => None,
}
}
fn last_key_value(&self) -> Option<(IVec, V)> {
match self.last() {
Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| (k, v)),
_ => None,
}
}
fn get(&self, key: &IVec) -> Option<V> {
match self.get(key) {
Ok(Some(v)) => serde_json::from_slice(v.as_ref()).ok(),
_ => None,
}
}
fn insert(&mut self, key: IVec, value: V) -> Option<V> {
match Tree::insert(self, key, serde_json::to_vec(&value).unwrap()) {
Ok(Some(v)) => serde_json::from_slice(v.as_ref()).ok(),
_ => None,
}
}
fn remove(&mut self, key: &IVec) -> Option<V> {
match Tree::remove(self, key).map(|v| v) {
Ok(Some(v)) => serde_json::from_slice(&v).ok(),
_ => None,
}
}
}
impl<O: Ord + Copy + Send + Sync, V: Clone + Send + Sync> OrderedStore<O, V> for BTreeMap<O, V> {
fn len(&self) -> usize {
BTreeMap::len(self)
}
fn first_key_value(&self) -> Option<(O, V)> {
BTreeMap::first_key_value(self).map(|(o, v)| (*o, v.clone()))
}
fn last_key_value(&self) -> Option<(O, V)> {
BTreeMap::last_key_value(self).map(|(o, v)| (*o, v.clone()))
}
fn get(&self, key: &O) -> Option<V> {
BTreeMap::get(self, key).map(|v| v.clone())
}
fn insert(&mut self, key: O, value: V) -> Option<V> {
BTreeMap::insert(self, key, value)
}
fn remove(&mut self, key: &O) -> Option<V> {
BTreeMap::remove(self, key)
}
}
/// A hashmap that also maintains a BTreeMap of keys ordered by a given value
/// This is useful for structures that need fast O(1) lookups, but also need to evict the oldest or least recently used entries
pub(crate) struct OrderedHashMap<K, O, V>((HashMap<K, (O, V)>, BTreeMap<O, Vec<K>>));
pub struct OrderedHashMap<K, O, V>(
(
HashMap<K, (O, V)>,
Box<dyn OrderedStore<O, Vec<K>> + Send + Sync>,
),
);

impl<K, O, V> OrderedHashMap<K, O, V> {
pub(crate) fn new() -> Self {
Self((HashMap::new(), BTreeMap::new()))
impl<K: Clone + Send + Sync, O: Ord + Copy + Send + Sync, V> OrderedHashMap<K, O, V> {
pub fn new(order: impl OrderedStore<O, Vec<K>> + 'static) -> Self {
Self((HashMap::new(), Box::new(order)))
}
}

impl<K: Hash + Eq + Clone, O: Ord + Copy, V> OrderedHashMap<K, O, V> {
impl<K: Hash + Eq + Clone, O: Ord + Copy, V: Clone> OrderedHashMap<K, O, V> {
pub fn len(&self) -> usize {
let (lookup, _) = &self.0;
lookup.len()
Expand All @@ -23,63 +94,55 @@ impl<K: Hash + Eq + Clone, O: Ord + Copy, V> OrderedHashMap<K, O, V> {
}
fn get_key_value(
&self,
selector: Box<dyn Fn(&BTreeMap<O, Vec<K>>) -> Option<(&O, &Vec<K>)>>,
) -> Option<(&K, &O, &V)> {
selector: Box<
dyn Fn(&Box<dyn OrderedStore<O, Vec<K>> + Send + Sync>) -> Option<(O, Vec<K>)>,
>,
) -> Option<(K, O, V)> {
let (lookup, ordered_lookup) = &self.0;
selector(ordered_lookup).and_then(|(_, keys)| {
keys.first()
.and_then(|key| lookup.get(key).and_then(|(o, v)| Some((key, o, v))))
keys.first().and_then(|key| {
lookup
.get(key)
.and_then(|(o, v)| Some((key.clone(), *o, v.clone())))
})
})
}
/// gets the entry with the lowest order value
pub fn get_first_key_value(&self) -> Option<(&K, &O, &V)> {
pub fn get_first_key_value(&self) -> Option<(K, O, V)> {
self.get_key_value(Box::new(|ordered_lookup| ordered_lookup.first_key_value()))
}
/// gets the entry with the highest order value
pub fn get_last_key_value(&self) -> Option<(&K, &O, &V)> {
pub fn get_last_key_value(&self) -> Option<(K, O, V)> {
self.get_key_value(Box::new(|ordered_lookup| ordered_lookup.last_key_value()))
}
/// re-orders the entry with the given key
/// re-orders the entry with the given new order
pub fn re_order(&mut self, key: &K, new_order: O) {
let (lookup, order_lookup) = &mut self.0;
if let Some((old_order, _)) = lookup.get(key) {
// remove entry in btree
match order_lookup.get_mut(old_order) {
Some(keys) => {
keys.retain(|k| k != key);
if keys.len() == 0 {
order_lookup.remove(old_order);
}
}
None => {}
}
if let Some((_, value)) = self.remove(key) {
self.insert(key.clone(), value, new_order);
}
order_lookup
.entry(new_order)
.or_insert(vec![])
.push(key.clone());
lookup.get_mut(key).map(|(o, _)| *o = new_order);
}
/// inserts a new entry with the given key and value and order
pub fn insert(&mut self, key: K, value: V, order: O) -> Option<V> {
let (lookup, order_lookup) = &mut self.0;

if let Some((old_order, _)) = lookup.get(&key) {
// remove entry in btree
match order_lookup.get_mut(old_order) {
Some(keys) => {
keys.retain(|k| k != &key);
if keys.len() == 0 {
order_lookup.remove(old_order);
}
// if entry already exists, remove it from the btree
if let Some(mut keys) = order_lookup.remove(old_order) {
keys.retain(|k| *k != key);
// insert modified keys back into btree
if !keys.is_empty() {
order_lookup.insert(*old_order, keys);
}
None => {}
}
}
order_lookup
.entry(order)
.or_insert(vec![])
.push(key.clone());
let keys = match order_lookup.remove(&order) {
Some(mut ks) => {
ks.push(key.clone());
ks
}
None => vec![key.clone()],
};
order_lookup.insert(order, keys);
lookup
.insert(key, (order, value))
.and_then(|(_, v)| Some(v))
Expand All @@ -88,11 +151,12 @@ impl<K: Hash + Eq + Clone, O: Ord + Copy, V> OrderedHashMap<K, O, V> {
pub fn remove(&mut self, key: &K) -> Option<(O, V)> {
let (lookup, order_lookup) = &mut self.0;
lookup.remove(key).and_then(|(order, v)| {
match order_lookup.get_mut(&order) {
Some(keys) => {
match order_lookup.remove(&order) {
Some(mut keys) => {
keys.retain(|k| k != key);
if keys.len() == 0 {
order_lookup.remove(&order);
// insert remaining keys back in
if !keys.is_empty() {
order_lookup.insert(order, keys);
}
}
None => {}
Expand Down
24 changes: 12 additions & 12 deletions libindy_vdr/src/pool/cache/memcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::helpers::OrderedHashMap;
use super::CacheStorage;
use async_lock::Mutex;
use async_trait::async_trait;
use std::{hash::Hash, sync::Arc, time::SystemTime};
use std::{collections::BTreeMap, fmt::Debug, hash::Hash, sync::Arc, time::SystemTime};
/// A simple in-memory cache that uses timestamps to expire entries. Once the cache fills up, the oldest entry is evicted.
/// Uses a hashmap for lookups and a BTreeMap for ordering by age
pub struct MemCacheStorageTTL<K, V> {
Expand All @@ -12,11 +12,11 @@ pub struct MemCacheStorageTTL<K, V> {
expire_after: u128,
}

impl<K, V> MemCacheStorageTTL<K, V> {
impl<K: Clone + Send + Sync + 'static, V> MemCacheStorageTTL<K, V> {
/// Create a new cache with the given capacity and expiration time in milliseconds
pub fn new(capacity: usize, expire_after: u128) -> Self {
Self {
store: OrderedHashMap::new(),
store: OrderedHashMap::new(BTreeMap::new()),
capacity,
startup_time: SystemTime::now(),
expire_after,
Expand All @@ -25,7 +25,7 @@ impl<K, V> MemCacheStorageTTL<K, V> {
}

#[async_trait]
impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'static>
impl<K: Hash + Eq + Send + Sync + 'static + Clone + Debug, V: Clone + Send + Sync + 'static>
CacheStorage<K, V> for MemCacheStorageTTL<K, V>
{
async fn get(&self, key: &K) -> Option<V> {
Expand All @@ -35,7 +35,7 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'sta
.duration_since(self.startup_time)
.unwrap()
.as_millis();
if current_time < ts + self.expire_after {
if current_time < *ts {
Some(v.clone())
} else {
None
Expand All @@ -54,12 +54,11 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'sta
.as_millis();

// remove expired entries
let exp_offset = self.expire_after;
while self.store.len() > 0
&& self
.store
.get_first_key_value()
.map(|(_, ts, _)| ts + exp_offset < current_ts)
.map(|(_, ts, _)| ts.clone() < current_ts)
.unwrap_or(false)
{
self.store.remove_first();
Expand All @@ -74,7 +73,8 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'sta
}
};

self.store.insert(key, value, current_ts)
let exp_offset = self.expire_after;
self.store.insert(key, value, current_ts + exp_offset)
}
}

Expand All @@ -86,10 +86,10 @@ pub struct MemCacheStorageLRU<K, V> {
capacity: usize,
}

impl<K, V> MemCacheStorageLRU<K, V> {
impl<K: Clone + Send + Sync + 'static, V> MemCacheStorageLRU<K, V> {
pub fn new(capacity: usize) -> Self {
Self {
store: Arc::new(Mutex::new(OrderedHashMap::new())),
store: Arc::new(Mutex::new(OrderedHashMap::new(BTreeMap::new()))),
capacity,
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ mod tests {

#[rstest]
fn test_cache_lru() {
let mut cache = Cache::new(MemCacheStorageLRU::new(2));
let cache = Cache::new(MemCacheStorageLRU::new(2));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
assert_eq!(
Expand All @@ -170,7 +170,7 @@ mod tests {

#[rstest]
fn test_cache_ttl() {
let mut cache = Cache::new(MemCacheStorageTTL::new(2, 5));
let cache = Cache::new(MemCacheStorageTTL::new(2, 5));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
thread::sleep(std::time::Duration::from_millis(1));
Expand Down
7 changes: 4 additions & 3 deletions libindy_vdr/src/pool/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;

mod helpers;
pub mod memcache;
pub mod fscache;

#[async_trait]
pub trait CacheStorage<K, V>: Send + Sync + 'static {
Expand All @@ -24,13 +25,13 @@ impl<K: 'static, V: 'static> Cache<K, V> {
storage: Arc::new(RwLock::new(storage)),
}
}
pub async fn get(&mut self, key: &K) -> Option<V> {
pub async fn get(&self, key: &K) -> Option<V> {
self.storage.read().await.get(key).await
}
pub async fn remove(&mut self, key: &K) -> Option<V> {
pub async fn remove(&self, key: &K) -> Option<V> {
self.storage.write().await.remove(key).await
}
pub async fn insert(&mut self, key: K, value: V) -> Option<V> {
pub async fn insert(&self, key: K, value: V) -> Option<V> {
self.storage.write().await.insert(key, value).await
}
}
Expand Down

0 comments on commit 98722bb

Please sign in to comment.