From b52ea45db9e3a832e9fbb3b376f7f1e91a265cc0 Mon Sep 17 00:00:00 2001 From: wadeking98 Date: Tue, 23 Jan 2024 13:44:34 -0800 Subject: [PATCH] added storage filesystem caching strategy Signed-off-by: wadeking98 --- indy-vdr-proxy/src/main.rs | 5 +- libindy_vdr/src/pool/cache/fscache.rs | 6 - libindy_vdr/src/pool/cache/mod.rs | 13 +- .../src/pool/cache/{helpers.rs => storage.rs} | 46 +++--- .../pool/cache/{memcache.rs => strategy.rs} | 133 +++++++++++++++--- libindy_vdr/src/pool/helpers.rs | 4 +- 6 files changed, 150 insertions(+), 57 deletions(-) delete mode 100644 libindy_vdr/src/pool/cache/fscache.rs rename libindy_vdr/src/pool/cache/{helpers.rs => storage.rs} (79%) rename libindy_vdr/src/pool/cache/{memcache.rs => strategy.rs} (54%) diff --git a/indy-vdr-proxy/src/main.rs b/indy-vdr-proxy/src/main.rs index 4f8eb225..6bd59285 100644 --- a/indy-vdr-proxy/src/main.rs +++ b/indy-vdr-proxy/src/main.rs @@ -36,7 +36,7 @@ use hyper_tls::HttpsConnector; #[cfg(unix)] use hyper_unix_connector::UnixConnector; -use indy_vdr::pool::cache::{memcache::MemCacheStorageTTL, Cache}; +use indy_vdr::pool::cache::{strategy::CacheStrategyTTL, Cache}; #[cfg(feature = "tls")] use rustls_pemfile::{certs, pkcs8_private_keys}; #[cfg(feature = "tls")] @@ -428,7 +428,8 @@ where I::Error: Into>, { let cache = if config.cache { - let mem_storage = MemCacheStorageTTL::new(1024, Duration::from_secs(86400).as_millis()); + let mem_storage = + CacheStrategyTTL::new(1024, Duration::from_secs(86400).as_millis(), None, None); let mem_cache = Cache::new(mem_storage); Some(mem_cache) } else { diff --git a/libindy_vdr/src/pool/cache/fscache.rs b/libindy_vdr/src/pool/cache/fscache.rs deleted file mode 100644 index f3b4f820..00000000 --- a/libindy_vdr/src/pool/cache/fscache.rs +++ /dev/null @@ -1,6 +0,0 @@ -use sled; - -pub fn test() { - let sled = sled::open("my_db").unwrap(); - sled.insert(b"yo!", vec![1, 2, 3]).unwrap(); -} diff --git a/libindy_vdr/src/pool/cache/mod.rs b/libindy_vdr/src/pool/cache/mod.rs index 2fb6483c..5845be1f 100644 --- a/libindy_vdr/src/pool/cache/mod.rs +++ b/libindy_vdr/src/pool/cache/mod.rs @@ -2,12 +2,11 @@ use async_lock::RwLock; use async_trait::async_trait; use std::sync::Arc; -mod helpers; -pub mod memcache; -pub mod fscache; +pub mod storage; +pub mod strategy; #[async_trait] -pub trait CacheStorage: Send + Sync + 'static { +pub trait CacheStrategy: Send + Sync + 'static { async fn get(&self, key: &K) -> Option; async fn remove(&mut self, key: &K) -> Option; @@ -16,11 +15,11 @@ pub trait CacheStorage: Send + Sync + 'static { } pub struct Cache { - storage: Arc>>, + storage: Arc>>, } impl Cache { - pub fn new(storage: impl CacheStorage) -> Self { + pub fn new(storage: impl CacheStrategy) -> Self { Self { storage: Arc::new(RwLock::new(storage)), } @@ -36,7 +35,7 @@ impl Cache { } } -// need to implement Clone manually because Mutex doesn't implement Clone +// need to implement Clone manually because Mutex doesn't implement Clone impl Clone for Cache { fn clone(&self) -> Self { Self { diff --git a/libindy_vdr/src/pool/cache/helpers.rs b/libindy_vdr/src/pool/cache/storage.rs similarity index 79% rename from libindy_vdr/src/pool/cache/helpers.rs rename to libindy_vdr/src/pool/cache/storage.rs index d29751f6..2775b06e 100644 --- a/libindy_vdr/src/pool/cache/helpers.rs +++ b/libindy_vdr/src/pool/cache/storage.rs @@ -1,5 +1,5 @@ use serde::{de::DeserializeOwned, Serialize}; -use sled::{self, IVec, Tree}; +use sled::{self, Tree}; use std::{ collections::{BTreeMap, HashMap}, hash::Hash, @@ -13,36 +13,46 @@ pub trait OrderedStore: Send + Sync { fn insert(&mut self, key: O, value: V) -> Option; fn remove(&mut self, key: &O) -> Option; } -impl OrderedStore for Tree { +impl OrderedStore for Tree { fn len(&self) -> usize { Tree::len(self) } - fn first_key_value(&self) -> Option<(IVec, V)> { + fn first_key_value(&self) -> Option<(u128, V)> { match self.first() { - Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| (k, v)), + Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| { + ( + u128::from_be_bytes(k.as_ref().try_into().unwrap_or([0; 16])), + v, + ) + }), _ => None, } } - fn last_key_value(&self) -> Option<(IVec, V)> { + fn last_key_value(&self) -> Option<(u128, V)> { match self.last() { - Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| (k, v)), + Ok(Some((k, v))) => serde_json::from_slice(v.as_ref()).ok().map(|v| { + ( + u128::from_be_bytes(k.as_ref().try_into().unwrap_or([0; 16])), + v, + ) + }), _ => None, } } - fn get(&self, key: &IVec) -> Option { - match self.get(key) { + fn get(&self, key: &u128) -> Option { + match Tree::get(self, key.to_be_bytes()).map(|v| v) { Ok(Some(v)) => serde_json::from_slice(v.as_ref()).ok(), _ => None, } } - fn insert(&mut self, key: IVec, value: V) -> Option { - match Tree::insert(self, key, serde_json::to_vec(&value).unwrap()) { + fn insert(&mut self, key: u128, value: V) -> Option { + match Tree::insert(self, key.to_be_bytes(), serde_json::to_vec(&value).unwrap()) { Ok(Some(v)) => serde_json::from_slice(v.as_ref()).ok(), _ => None, } } - fn remove(&mut self, key: &IVec) -> Option { - match Tree::remove(self, key).map(|v| v) { + fn remove(&mut self, key: &u128) -> Option { + match Tree::remove(self, key.to_be_bytes()).map(|v| v) { Ok(Some(v)) => serde_json::from_slice(&v).ok(), _ => None, } @@ -77,13 +87,13 @@ pub struct OrderedHashMap( ), ); -impl OrderedHashMap { +impl OrderedHashMap { pub fn new(order: impl OrderedStore> + 'static) -> Self { Self((HashMap::new(), Box::new(order))) } } -impl OrderedHashMap { +impl OrderedHashMap { pub fn len(&self) -> usize { let (lookup, _) = &self.0; lookup.len() @@ -103,7 +113,7 @@ impl OrderedHashMap { keys.first().and_then(|key| { lookup .get(key) - .and_then(|(o, v)| Some((key.clone(), *o, v.clone()))) + .and_then(|(o, v)| Some((key.clone(), o.clone(), v.clone()))) }) }) } @@ -131,7 +141,7 @@ impl OrderedHashMap { keys.retain(|k| *k != key); // insert modified keys back into btree if !keys.is_empty() { - order_lookup.insert(*old_order, keys); + order_lookup.insert(old_order.clone(), keys); } } } @@ -142,7 +152,7 @@ impl OrderedHashMap { } None => vec![key.clone()], }; - order_lookup.insert(order, keys); + order_lookup.insert(order.clone(), keys); lookup .insert(key, (order, value)) .and_then(|(_, v)| Some(v)) @@ -156,7 +166,7 @@ impl OrderedHashMap { keys.retain(|k| k != key); // insert remaining keys back in if !keys.is_empty() { - order_lookup.insert(order, keys); + order_lookup.insert(order.clone(), keys); } } None => {} diff --git a/libindy_vdr/src/pool/cache/memcache.rs b/libindy_vdr/src/pool/cache/strategy.rs similarity index 54% rename from libindy_vdr/src/pool/cache/memcache.rs rename to libindy_vdr/src/pool/cache/strategy.rs index a5e00dbd..abd02190 100644 --- a/libindy_vdr/src/pool/cache/memcache.rs +++ b/libindy_vdr/src/pool/cache/strategy.rs @@ -1,24 +1,37 @@ -use super::helpers::OrderedHashMap; -use super::CacheStorage; +use super::storage::OrderedHashMap; +use super::CacheStrategy; use async_lock::Mutex; use async_trait::async_trait; use std::{collections::BTreeMap, fmt::Debug, hash::Hash, sync::Arc, time::SystemTime}; -/// A simple in-memory cache that uses timestamps to expire entries. Once the cache fills up, the oldest entry is evicted. +/// A simple cache that uses timestamps to expire entries. Once the cache fills up, the oldest entry is evicted. /// Uses a hashmap for lookups and a BTreeMap for ordering by age -pub struct MemCacheStorageTTL { +pub struct CacheStrategyTTL { store: OrderedHashMap, capacity: usize, - startup_time: SystemTime, + create_time: SystemTime, expire_after: u128, } -impl MemCacheStorageTTL { +impl CacheStrategyTTL { /// Create a new cache with the given capacity and expiration time in milliseconds - pub fn new(capacity: usize, expire_after: u128) -> Self { + /// If store_type is None, the cache will use an in-memory hashmap and BTreeMap + /// cache_time is used as a starting point to generate timestamps if it is None, the cache will use the UNIX_EPOCH as the cache start time + pub fn new( + capacity: usize, + expire_after: u128, + store_type: Option>, + create_time: Option, + ) -> Self { Self { - store: OrderedHashMap::new(BTreeMap::new()), + store: match store_type { + Some(store) => store, + None => OrderedHashMap::new(BTreeMap::new()), + }, capacity, - startup_time: SystemTime::now(), + create_time: match create_time { + Some(time) => time, + None => SystemTime::UNIX_EPOCH, + }, expire_after, } } @@ -26,13 +39,13 @@ impl MemCacheStorageTTL { #[async_trait] impl - CacheStorage for MemCacheStorageTTL + CacheStrategy for CacheStrategyTTL { async fn get(&self, key: &K) -> Option { match self.store.get(key) { Some((ts, v)) => { let current_time = SystemTime::now() - .duration_since(self.startup_time) + .duration_since(self.create_time) .unwrap() .as_millis(); if current_time < *ts { @@ -49,7 +62,7 @@ impl Option { let current_ts = SystemTime::now() - .duration_since(self.startup_time) + .duration_since(self.create_time) .unwrap() .as_millis(); @@ -78,25 +91,28 @@ impl { +pub struct CacheStrategyLRU { // The store is wrapped in an arc and a mutex so that get() can be immutable - store: Arc>>, + store: Arc>>, capacity: usize, } -impl MemCacheStorageLRU { - pub fn new(capacity: usize) -> Self { +impl CacheStrategyLRU { + pub fn new(capacity: usize, store_type: Option>) -> Self { Self { - store: Arc::new(Mutex::new(OrderedHashMap::new(BTreeMap::new()))), + store: Arc::new(Mutex::new(match store_type { + Some(store) => store, + None => OrderedHashMap::new(BTreeMap::new()), + })), capacity, } } } #[async_trait] impl - CacheStorage for MemCacheStorageLRU + CacheStrategy for CacheStrategyLRU { async fn get(&self, key: &K) -> Option { // move the key to the end of the LRU index @@ -140,12 +156,12 @@ mod tests { use std::thread; use super::*; - use crate::pool::cache::Cache; + use crate::pool::cache::{storage::OrderedHashMap, Cache}; use futures_executor::block_on; #[rstest] fn test_cache_lru() { - let cache = Cache::new(MemCacheStorageLRU::new(2)); + let cache = Cache::new(CacheStrategyLRU::new(2, None)); block_on(async { cache.insert("key".to_string(), "value".to_string()).await; assert_eq!( @@ -168,9 +184,43 @@ mod tests { }); } + #[rstest] + fn test_fs_cache_lru() { + let cache_location = "test_fs_cache_lru"; + let tree = sled::open(cache_location) + .unwrap() + .open_tree(cache_location) + .unwrap(); + let storage: OrderedHashMap = OrderedHashMap::new(tree); + let cache = Cache::new(CacheStrategyLRU::new(2, Some(storage))); + block_on(async { + cache.insert("key".to_string(), "value".to_string()).await; + assert_eq!( + cache.get(&"key".to_string()).await, + Some("value".to_string()) + ); + cache.insert("key1".to_string(), "value1".to_string()).await; + cache.insert("key2".to_string(), "value2".to_string()).await; + assert_eq!(cache.get(&"key".to_string()).await, None); + cache.insert("key3".to_string(), "value3".to_string()).await; + cache.insert("key3".to_string(), "value3".to_string()).await; + cache.get(&"key2".to_string()).await; // move key2 to the end of the LRU index + cache.insert("key4".to_string(), "value4".to_string()).await; + // key3 should be evicted + assert_eq!( + cache.remove(&"key2".to_string()).await, + Some("value2".to_string()) + ); + assert_eq!(cache.remove(&"key3".to_string()).await, None); + + // cleanup + std::fs::remove_dir_all(cache_location).unwrap(); + }); + } + #[rstest] fn test_cache_ttl() { - let cache = Cache::new(MemCacheStorageTTL::new(2, 5)); + let cache = Cache::new(CacheStrategyTTL::new(2, 5, None, None)); block_on(async { cache.insert("key".to_string(), "value".to_string()).await; thread::sleep(std::time::Duration::from_millis(1)); @@ -197,4 +247,43 @@ mod tests { assert_eq!(cache.get(&"key5".to_string()).await, None); }); } + + #[rstest] + fn test_fs_cache_ttl() { + let cache_location = "test_fs_cache_ttl"; + let tree = sled::open(cache_location) + .unwrap() + .open_tree(cache_location) + .unwrap(); + let storage: OrderedHashMap = OrderedHashMap::new(tree); + let cache = Cache::new(CacheStrategyTTL::new(2, 5, Some(storage), None)); + block_on(async { + cache.insert("key".to_string(), "value".to_string()).await; + thread::sleep(std::time::Duration::from_millis(1)); + assert_eq!( + cache.get(&"key".to_string()).await, + Some("value".to_string()) + ); + cache.insert("key1".to_string(), "value1".to_string()).await; + thread::sleep(std::time::Duration::from_millis(1)); + cache.insert("key2".to_string(), "value2".to_string()).await; + assert_eq!(cache.get(&"key".to_string()).await, None); + thread::sleep(std::time::Duration::from_millis(1)); + cache.insert("key3".to_string(), "value3".to_string()).await; + cache.get(&"key2".to_string()).await; + cache.insert("key4".to_string(), "value4".to_string()).await; + // key2 should be evicted + assert_eq!(cache.remove(&"key2".to_string()).await, None); + assert_eq!( + cache.remove(&"key3".to_string()).await, + Some("value3".to_string()) + ); + cache.insert("key5".to_string(), "value5".to_string()).await; + thread::sleep(std::time::Duration::from_millis(6)); + assert_eq!(cache.get(&"key5".to_string()).await, None); + + // cleanup + std::fs::remove_dir_all(cache_location).unwrap(); + }); + } } diff --git a/libindy_vdr/src/pool/helpers.rs b/libindy_vdr/src/pool/helpers.rs index 75354b80..058270a3 100644 --- a/libindy_vdr/src/pool/helpers.rs +++ b/libindy_vdr/src/pool/helpers.rs @@ -223,7 +223,7 @@ pub async fn perform_ledger_request( let cache_key = prepared.get_cache_key()?; if is_read_req { - if let Some(mut cache) = cache_opt.clone() { + if let Some(cache) = cache_opt.clone() { if let Some((response, meta)) = cache.get(&cache_key).await { return Ok((RequestResult::Reply(response), meta)); } @@ -240,7 +240,7 @@ pub async fn perform_ledger_request( return result; } } - if let Some(mut cache) = cache_opt { + if let Some(cache) = cache_opt { cache .insert(cache_key, (response.to_string(), meta.clone())) .await;