Skip to content

Commit

Permalink
added storage filesystem caching strategy
Browse files Browse the repository at this point in the history
Signed-off-by: wadeking98 <wkingnumber2@gmail.com>
  • Loading branch information
wadeking98 committed Jan 23, 2024
1 parent 98722bb commit b52ea45
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 57 deletions.
5 changes: 3 additions & 2 deletions indy-vdr-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use hyper_tls::HttpsConnector;
#[cfg(unix)]
use hyper_unix_connector::UnixConnector;

use indy_vdr::pool::cache::{memcache::MemCacheStorageTTL, Cache};
use indy_vdr::pool::cache::{strategy::CacheStrategyTTL, Cache};
#[cfg(feature = "tls")]
use rustls_pemfile::{certs, pkcs8_private_keys};
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -428,7 +428,8 @@ where
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let cache = if config.cache {
let mem_storage = MemCacheStorageTTL::new(1024, Duration::from_secs(86400).as_millis());
let mem_storage =
CacheStrategyTTL::new(1024, Duration::from_secs(86400).as_millis(), None, None);
let mem_cache = Cache::new(mem_storage);
Some(mem_cache)
} else {
Expand Down
6 changes: 0 additions & 6 deletions libindy_vdr/src/pool/cache/fscache.rs

This file was deleted.

13 changes: 6 additions & 7 deletions libindy_vdr/src/pool/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use async_lock::RwLock;
use async_trait::async_trait;
use std::sync::Arc;

mod helpers;
pub mod memcache;
pub mod fscache;
pub mod storage;
pub mod strategy;

#[async_trait]
pub trait CacheStorage<K, V>: Send + Sync + 'static {
pub trait CacheStrategy<K, V>: Send + Sync + 'static {
async fn get(&self, key: &K) -> Option<V>;

async fn remove(&mut self, key: &K) -> Option<V>;
Expand All @@ -16,11 +15,11 @@ pub trait CacheStorage<K, V>: Send + Sync + 'static {
}

pub struct Cache<K, V> {
storage: Arc<RwLock<dyn CacheStorage<K, V>>>,
storage: Arc<RwLock<dyn CacheStrategy<K, V>>>,
}

impl<K: 'static, V: 'static> Cache<K, V> {
pub fn new(storage: impl CacheStorage<K, V>) -> Self {
pub fn new(storage: impl CacheStrategy<K, V>) -> Self {
Self {
storage: Arc::new(RwLock::new(storage)),
}
Expand All @@ -36,7 +35,7 @@ impl<K: 'static, V: 'static> Cache<K, V> {
}
}

// need to implement Clone manually because Mutex<dyn CacheStorage> doesn't implement Clone
// need to implement Clone manually because Mutex<dyn CacheStrategy> doesn't implement Clone
impl<K, V> Clone for Cache<K, V> {
fn clone(&self) -> Self {
Self {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use serde::{de::DeserializeOwned, Serialize};
use sled::{self, IVec, Tree};
use sled::{self, Tree};
use std::{
collections::{BTreeMap, HashMap},
hash::Hash,
Expand All @@ -13,36 +13,46 @@ pub trait OrderedStore<O, V>: Send + Sync {
fn insert(&mut self, key: O, value: V) -> Option<V>;
fn remove(&mut self, key: &O) -> Option<V>;
}
impl<V: Serialize + DeserializeOwned> OrderedStore<IVec, V> for Tree {
impl<V: Serialize + DeserializeOwned> OrderedStore<u128, V> for Tree {
fn len(&self) -> usize {
Tree::len(self)
}
fn first_key_value(&self) -> Option<(IVec, V)> {
fn first_key_value(&self) -> Option<(u128, V)> {
match self.first() {
Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| (k, v)),
Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| {
(
u128::from_be_bytes(k.as_ref().try_into().unwrap_or([0; 16])),
v,
)
}),
_ => None,
}
}
fn last_key_value(&self) -> Option<(IVec, V)> {
fn last_key_value(&self) -> Option<(u128, V)> {
match self.last() {
Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| (k, v)),
Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| {
(
u128::from_be_bytes(k.as_ref().try_into().unwrap_or([0; 16])),
v,
)
}),
_ => None,
}
}
fn get(&self, key: &IVec) -> Option<V> {
match self.get(key) {
fn get(&self, key: &u128) -> Option<V> {
match Tree::get(self, key.to_be_bytes()).map(|v| v) {
Ok(Some(v)) => serde_json::from_slice(v.as_ref()).ok(),
_ => None,
}
}
fn insert(&mut self, key: IVec, value: V) -> Option<V> {
match Tree::insert(self, key, serde_json::to_vec(&value).unwrap()) {
fn insert(&mut self, key: u128, value: V) -> Option<V> {
match Tree::insert(self, key.to_be_bytes(), serde_json::to_vec(&value).unwrap()) {
Ok(Some(v)) => serde_json::from_slice(v.as_ref()).ok(),
_ => None,
}
}
fn remove(&mut self, key: &IVec) -> Option<V> {
match Tree::remove(self, key).map(|v| v) {
fn remove(&mut self, key: &u128) -> Option<V> {
match Tree::remove(self, key.to_be_bytes()).map(|v| v) {
Ok(Some(v)) => serde_json::from_slice(&v).ok(),
_ => None,
}
Expand Down Expand Up @@ -77,13 +87,13 @@ pub struct OrderedHashMap<K, O, V>(
),
);

impl<K: Clone + Send + Sync, O: Ord + Copy + Send + Sync, V> OrderedHashMap<K, O, V> {
impl<K: Clone + Send + Sync, O: Ord + Clone + Send + Sync, V> OrderedHashMap<K, O, V> {
pub fn new(order: impl OrderedStore<O, Vec<K>> + 'static) -> Self {
Self((HashMap::new(), Box::new(order)))
}
}

impl<K: Hash + Eq + Clone, O: Ord + Copy, V: Clone> OrderedHashMap<K, O, V> {
impl<K: Hash + Eq + Clone, O: Ord + Clone, V: Clone> OrderedHashMap<K, O, V> {
pub fn len(&self) -> usize {
let (lookup, _) = &self.0;
lookup.len()
Expand All @@ -103,7 +113,7 @@ impl<K: Hash + Eq + Clone, O: Ord + Copy, V: Clone> OrderedHashMap<K, O, V> {
keys.first().and_then(|key| {
lookup
.get(key)
.and_then(|(o, v)| Some((key.clone(), *o, v.clone())))
.and_then(|(o, v)| Some((key.clone(), o.clone(), v.clone())))
})
})
}
Expand Down Expand Up @@ -131,7 +141,7 @@ impl<K: Hash + Eq + Clone, O: Ord + Copy, V: Clone> OrderedHashMap<K, O, V> {
keys.retain(|k| *k != key);
// insert modified keys back into btree
if !keys.is_empty() {
order_lookup.insert(*old_order, keys);
order_lookup.insert(old_order.clone(), keys);
}
}
}
Expand All @@ -142,7 +152,7 @@ impl<K: Hash + Eq + Clone, O: Ord + Copy, V: Clone> OrderedHashMap<K, O, V> {
}
None => vec![key.clone()],
};
order_lookup.insert(order, keys);
order_lookup.insert(order.clone(), keys);
lookup
.insert(key, (order, value))
.and_then(|(_, v)| Some(v))
Expand All @@ -156,7 +166,7 @@ impl<K: Hash + Eq + Clone, O: Ord + Copy, V: Clone> OrderedHashMap<K, O, V> {
keys.retain(|k| k != key);
// insert remaining keys back in
if !keys.is_empty() {
order_lookup.insert(order, keys);
order_lookup.insert(order.clone(), keys);
}
}
None => {}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,51 @@
use super::helpers::OrderedHashMap;
use super::CacheStorage;
use super::storage::OrderedHashMap;
use super::CacheStrategy;
use async_lock::Mutex;
use async_trait::async_trait;
use std::{collections::BTreeMap, fmt::Debug, hash::Hash, sync::Arc, time::SystemTime};
/// A simple in-memory cache that uses timestamps to expire entries. Once the cache fills up, the oldest entry is evicted.
/// A simple cache that uses timestamps to expire entries. Once the cache fills up, the oldest entry is evicted.
/// Uses a hashmap for lookups and a BTreeMap for ordering by age
pub struct MemCacheStorageTTL<K, V> {
pub struct CacheStrategyTTL<K, V> {
store: OrderedHashMap<K, u128, V>,
capacity: usize,
startup_time: SystemTime,
create_time: SystemTime,
expire_after: u128,
}

impl<K: Clone + Send + Sync + 'static, V> MemCacheStorageTTL<K, V> {
impl<K: Clone + Send + Sync + 'static, V> CacheStrategyTTL<K, V> {
/// Create a new cache with the given capacity and expiration time in milliseconds
pub fn new(capacity: usize, expire_after: u128) -> Self {
/// If store_type is None, the cache will use an in-memory hashmap and BTreeMap
/// cache_time is used as a starting point to generate timestamps if it is None, the cache will use the UNIX_EPOCH as the cache start time
pub fn new(
capacity: usize,
expire_after: u128,
store_type: Option<OrderedHashMap<K, u128, V>>,
create_time: Option<SystemTime>,
) -> Self {
Self {
store: OrderedHashMap::new(BTreeMap::new()),
store: match store_type {
Some(store) => store,
None => OrderedHashMap::new(BTreeMap::new()),
},
capacity,
startup_time: SystemTime::now(),
create_time: match create_time {
Some(time) => time,
None => SystemTime::UNIX_EPOCH,
},
expire_after,
}
}
}

#[async_trait]
impl<K: Hash + Eq + Send + Sync + 'static + Clone + Debug, V: Clone + Send + Sync + 'static>
CacheStorage<K, V> for MemCacheStorageTTL<K, V>
CacheStrategy<K, V> for CacheStrategyTTL<K, V>
{
async fn get(&self, key: &K) -> Option<V> {
match self.store.get(key) {
Some((ts, v)) => {
let current_time = SystemTime::now()
.duration_since(self.startup_time)
.duration_since(self.create_time)
.unwrap()
.as_millis();
if current_time < *ts {
Expand All @@ -49,7 +62,7 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone + Debug, V: Clone + Send + Syn
}
async fn insert(&mut self, key: K, value: V) -> Option<V> {
let current_ts = SystemTime::now()
.duration_since(self.startup_time)
.duration_since(self.create_time)
.unwrap()
.as_millis();

Expand Down Expand Up @@ -78,25 +91,28 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone + Debug, V: Clone + Send + Syn
}
}

/// A simple in-memory LRU cache. Once the cache fills up, the least recently used entry is evicted.
/// A simple LRU cache. Once the cache fills up, the least recently used entry is evicted.
/// Uses a hashmap for lookups and a BTreeMap for ordering by least recently used
pub struct MemCacheStorageLRU<K, V> {
pub struct CacheStrategyLRU<K, V> {
// The store is wrapped in an arc and a mutex so that get() can be immutable
store: Arc<Mutex<OrderedHashMap<K, u64, V>>>,
store: Arc<Mutex<OrderedHashMap<K, u128, V>>>,
capacity: usize,
}

impl<K: Clone + Send + Sync + 'static, V> MemCacheStorageLRU<K, V> {
pub fn new(capacity: usize) -> Self {
impl<K: Clone + Send + Sync + 'static, V> CacheStrategyLRU<K, V> {
pub fn new(capacity: usize, store_type: Option<OrderedHashMap<K, u128, V>>) -> Self {
Self {
store: Arc::new(Mutex::new(OrderedHashMap::new(BTreeMap::new()))),
store: Arc::new(Mutex::new(match store_type {
Some(store) => store,
None => OrderedHashMap::new(BTreeMap::new()),
})),
capacity,
}
}
}
#[async_trait]
impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'static>
CacheStorage<K, V> for MemCacheStorageLRU<K, V>
CacheStrategy<K, V> for CacheStrategyLRU<K, V>
{
async fn get(&self, key: &K) -> Option<V> {
// move the key to the end of the LRU index
Expand Down Expand Up @@ -140,12 +156,12 @@ mod tests {
use std::thread;

use super::*;
use crate::pool::cache::Cache;
use crate::pool::cache::{storage::OrderedHashMap, Cache};
use futures_executor::block_on;

#[rstest]
fn test_cache_lru() {
let cache = Cache::new(MemCacheStorageLRU::new(2));
let cache = Cache::new(CacheStrategyLRU::new(2, None));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
assert_eq!(
Expand All @@ -168,9 +184,43 @@ mod tests {
});
}

#[rstest]
fn test_fs_cache_lru() {
let cache_location = "test_fs_cache_lru";
let tree = sled::open(cache_location)
.unwrap()
.open_tree(cache_location)
.unwrap();
let storage: OrderedHashMap<String, u128, String> = OrderedHashMap::new(tree);
let cache = Cache::new(CacheStrategyLRU::new(2, Some(storage)));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
assert_eq!(
cache.get(&"key".to_string()).await,
Some("value".to_string())
);
cache.insert("key1".to_string(), "value1".to_string()).await;
cache.insert("key2".to_string(), "value2".to_string()).await;
assert_eq!(cache.get(&"key".to_string()).await, None);
cache.insert("key3".to_string(), "value3".to_string()).await;
cache.insert("key3".to_string(), "value3".to_string()).await;
cache.get(&"key2".to_string()).await; // move key2 to the end of the LRU index
cache.insert("key4".to_string(), "value4".to_string()).await;
// key3 should be evicted
assert_eq!(
cache.remove(&"key2".to_string()).await,
Some("value2".to_string())
);
assert_eq!(cache.remove(&"key3".to_string()).await, None);

// cleanup
std::fs::remove_dir_all(cache_location).unwrap();
});
}

#[rstest]
fn test_cache_ttl() {
let cache = Cache::new(MemCacheStorageTTL::new(2, 5));
let cache = Cache::new(CacheStrategyTTL::new(2, 5, None, None));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
thread::sleep(std::time::Duration::from_millis(1));
Expand All @@ -197,4 +247,43 @@ mod tests {
assert_eq!(cache.get(&"key5".to_string()).await, None);
});
}

#[rstest]
fn test_fs_cache_ttl() {
let cache_location = "test_fs_cache_ttl";
let tree = sled::open(cache_location)
.unwrap()
.open_tree(cache_location)
.unwrap();
let storage: OrderedHashMap<String, u128, String> = OrderedHashMap::new(tree);
let cache = Cache::new(CacheStrategyTTL::new(2, 5, Some(storage), None));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
thread::sleep(std::time::Duration::from_millis(1));
assert_eq!(
cache.get(&"key".to_string()).await,
Some("value".to_string())
);
cache.insert("key1".to_string(), "value1".to_string()).await;
thread::sleep(std::time::Duration::from_millis(1));
cache.insert("key2".to_string(), "value2".to_string()).await;
assert_eq!(cache.get(&"key".to_string()).await, None);
thread::sleep(std::time::Duration::from_millis(1));
cache.insert("key3".to_string(), "value3".to_string()).await;
cache.get(&"key2".to_string()).await;
cache.insert("key4".to_string(), "value4".to_string()).await;
// key2 should be evicted
assert_eq!(cache.remove(&"key2".to_string()).await, None);
assert_eq!(
cache.remove(&"key3".to_string()).await,
Some("value3".to_string())
);
cache.insert("key5".to_string(), "value5".to_string()).await;
thread::sleep(std::time::Duration::from_millis(6));
assert_eq!(cache.get(&"key5".to_string()).await, None);

// cleanup
std::fs::remove_dir_all(cache_location).unwrap();
});
}
}
Loading

0 comments on commit b52ea45

Please sign in to comment.