Skip to content

Commit

Permalink
Added a TTL cache and improved LRU cache
Browse files Browse the repository at this point in the history
Signed-off-by: wadeking98 <wkingnumber2@gmail.com>
  • Loading branch information
wadeking98 committed Jan 19, 2024
1 parent 43623a9 commit 8e5f2dc
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 32 deletions.
4 changes: 2 additions & 2 deletions indy-vdr-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use hyper_tls::HttpsConnector;
#[cfg(unix)]
use hyper_unix_connector::UnixConnector;

use indy_vdr::pool::cache::{Cache, MemCacheStorage};
use indy_vdr::pool::cache::{Cache, MemCacheStorageTTL};
#[cfg(feature = "tls")]
use rustls_pemfile::{certs, pkcs8_private_keys};
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -428,7 +428,7 @@ where
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let cache = if config.cache {
let mem_storage = MemCacheStorage::new(1024);
let mem_storage = MemCacheStorageTTL::new(1024);
let mem_cache = Cache::new(mem_storage);
Some(mem_cache)
} else {
Expand Down
192 changes: 162 additions & 30 deletions libindy_vdr/src/pool/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use async_trait::async_trait;
use std::{
collections::{BTreeMap, HashMap},
hash::Hash,
ops::DerefMut,
sync::Arc,
time::SystemTime,
};

use async_lock::Mutex;

#[async_trait]
pub trait CacheStorage<K, V>: Send + Sync + 'static {
// Needs to be mutable bc some implementations may need to update the the LRU index of the cache
async fn get(&mut self, key: &K) -> Option<V>;
async fn get(&self, key: &K) -> Option<V>;

async fn remove(&mut self, key: &K) -> Option<V>;

Expand Down Expand Up @@ -47,72 +48,170 @@ impl<K, V> Clone for Cache<K, V> {
}
}

/// A simple in-memory LRU cache
/// A simple in-memory cache that uses timestamps to expire entries. Once the cache fills up, the oldest entry is evicted.
/// Uses a hashmap for lookups and a BTreeMap for ordering by age
pub struct MemCacheStorageTTL<K, V> {
store: (HashMap<K, (V, u128)>, BTreeMap<u128, Vec<K>>),
capacity: usize,
startup_time: SystemTime,
}

impl<K, V> MemCacheStorageTTL<K, V> {
pub fn new(capacity: usize) -> Self {
Self {
store: (HashMap::new(), BTreeMap::new()),
capacity,
startup_time: SystemTime::now(),
}
}
}

#[async_trait]
impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'static>
CacheStorage<K, V> for MemCacheStorageTTL<K, V>
{
async fn get(&self, key: &K) -> Option<V> {
let (cache_lookup, _) = &self.store;
match cache_lookup.get(key) {
Some((v, _)) => Some(v.clone()),
None => None,
}
}
async fn remove(&mut self, key: &K) -> Option<V> {
let (cache_lookup, cache_order) = &mut self.store;
let ttl_val = cache_lookup.remove(key);
match ttl_val {
Some((v, ts)) => {
let val = cache_order.get_mut(&ts).unwrap();
if val.len() <= 1 {
cache_order.remove(&ts);
} else {
val.retain(|k| k != key);
}
Some(v)
}
None => None,
}
}
async fn insert(&mut self, key: K, value: V) -> Option<V> {
let (cache_lookup, cache_order) = &mut self.store;
let ts = SystemTime::now()
.duration_since(self.startup_time)
.unwrap()
.as_millis();
// only remove the oldest item if the cache is full and the key is not already in the cache
if cache_lookup.len() >= self.capacity && cache_lookup.get(&key).is_none() {
// remove the oldest item
let (oldest_ts_ref, _) = cache_order.first_key_value().unwrap();
let oldest_ts = *oldest_ts_ref;
let oldest_keys = cache_order.get_mut(&oldest_ts).unwrap();
let removal_key = oldest_keys.first().and_then(|k| Some(k.clone()));
if oldest_keys.len() <= 1 {
// remove the whole array since it's the last entry
cache_order.remove(&oldest_ts);
} else {
oldest_keys.swap_remove(0);
}
cache_lookup.remove(&key);
if let Some(removal_key) = removal_key {
cache_lookup.remove(&removal_key);
}
};

// if value is overwritten when inserting a new key, we need to remove the old key from the order index
cache_order.entry(ts).or_insert(vec![]).push(key.clone());
match cache_lookup.insert(key.clone(), (value.clone(), ts)) {
Some((v, ts)) => {
if let Some(ord_keys) = cache_order.get_mut(&ts) {
if ord_keys.len() <= 1 {
cache_order.remove(&ts);
} else {
ord_keys.retain(|k| k != &key);
}
}
Some(v)
}
None => None,
}
}
}

/// A simple in-memory LRU cache. Once the cache fills up, the least recently used entry is evicted.
/// Uses a hashmap for lookups and a BTreeMap for ordering by least recently used
pub struct MemCacheStorage<K, V> {
cache_lookup: HashMap<K, (V, u64)>,
cache_order: BTreeMap<u64, K>,
pub struct MemCacheStorageLRU<K, V> {
// The store is wrapped in an arc and a mutex so that get() can be immutable
store: Arc<Mutex<(HashMap<K, (V, u64)>, BTreeMap<u64, K>)>>,
capacity: usize,
}

impl<K, V> MemCacheStorage<K, V> {
impl<K, V> MemCacheStorageLRU<K, V> {
pub fn new(capacity: usize) -> Self {
Self {
cache_lookup: HashMap::new(),
cache_order: BTreeMap::new(),
store: Arc::new(Mutex::new((HashMap::new(), BTreeMap::new()))),
capacity,
}
}
}
#[async_trait]
impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'static>
CacheStorage<K, V> for MemCacheStorage<K, V>
CacheStorage<K, V> for MemCacheStorageLRU<K, V>
{
async fn get(&mut self, key: &K) -> Option<V> {
async fn get(&self, key: &K) -> Option<V> {
// move the key to the end of the LRU index
// this is O(log(n)) in the worst case, but in the average case it's close to O(1)
match self.cache_lookup.get(key) {
let mut store_lock = self.store.lock().await;
let (cache_lookup, cache_order) = store_lock.deref_mut();
let highest_lru = cache_order
.last_key_value()
.map(|(hts, _)| hts + 1)
.unwrap_or(0);
match cache_lookup.get_mut(key) {
Some((v, ts)) => {
self.cache_order.remove(ts).unwrap();
self.cache_order.entry(ts + 1).or_insert(key.clone());
cache_order.remove(ts).unwrap();
cache_order.entry(highest_lru).or_insert(key.clone());
*ts = highest_lru;
Some(v.clone())
}
None => None,
}
}
async fn remove(&mut self, key: &K) -> Option<V> {
let lru_val = self.cache_lookup.remove(key);
let mut store_lock = self.store.lock().await;
let (cache_lookup, cache_order) = store_lock.deref_mut();
let lru_val = cache_lookup.remove(key);
match lru_val {
Some((v, ts)) => {
self.cache_order.remove(&ts);
cache_order.remove(&ts);
Some(v)
}
None => None,
}
}
async fn insert(&mut self, key: K, value: V) -> Option<V> {
// this will be O(log(n)) in all cases when cache is at capacity since we need to fetch the first and last element from the btree
let highest_lru = self
.cache_order
let mut store_lock = self.store.lock().await;
let (cache_lookup, cache_order) = store_lock.deref_mut();
let highest_lru = cache_order
.last_key_value()
.map(|(ts, _)| ts + 1)
.unwrap_or(0);
if self.cache_lookup.len() >= self.capacity {
if cache_lookup.len() >= self.capacity && cache_lookup.get(&key).is_none() {
// remove the LRU item
let (lru_ts, lru_key) = match self.cache_order.first_key_value() {
let (lru_ts, lru_key) = match cache_order.first_key_value() {
Some((ts, key)) => (*ts, key.clone()),
None => return None,
};
self.cache_lookup.remove(&lru_key);
self.cache_order.remove(&lru_ts);
cache_lookup.remove(&lru_key);
cache_order.remove(&lru_ts);
};

self.cache_order.insert(highest_lru, key.clone());
match self
.cache_lookup
.insert(key.clone(), (value.clone(), highest_lru))
{
Some((v, _)) => Some(v),
// if value is overwritten when inserting a new key, we need to remove the old key from the order index
cache_order.insert(highest_lru, key.clone());
match cache_lookup.insert(key.clone(), (value.clone(), highest_lru)) {
Some((v, ts)) => {
cache_order.remove(&ts);
Some(v)
}
None => None,
}
}
Expand All @@ -121,13 +220,15 @@ impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'sta
#[cfg(test)]
mod tests {

use std::thread;

use futures_executor::block_on;

#[rstest]
fn test_cache() {
fn test_cache_lru() {
use super::*;

let mut cache = Cache::new(MemCacheStorage::new(2));
let mut cache = Cache::new(MemCacheStorageLRU::new(2));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
assert_eq!(
Expand All @@ -138,6 +239,7 @@ mod tests {
cache.insert("key2".to_string(), "value2".to_string()).await;
assert_eq!(cache.get(&"key".to_string()).await, None);
cache.insert("key3".to_string(), "value3".to_string()).await;
cache.insert("key3".to_string(), "value3".to_string()).await;
cache.get(&"key2".to_string()).await; // move key2 to the end of the LRU index
cache.insert("key4".to_string(), "value4".to_string()).await;
// key3 should be evicted
Expand All @@ -148,4 +250,34 @@ mod tests {
assert_eq!(cache.remove(&"key3".to_string()).await, None);
});
}

#[rstest]
fn test_cache_ttl() {
use super::*;

let mut cache = Cache::new(MemCacheStorageTTL::new(2));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
thread::sleep(std::time::Duration::from_millis(1));
assert_eq!(
cache.get(&"key".to_string()).await,
Some("value".to_string())
);
cache.insert("key1".to_string(), "value1".to_string()).await;
thread::sleep(std::time::Duration::from_millis(1));
cache.insert("key2".to_string(), "value2".to_string()).await;
assert_eq!(cache.get(&"key".to_string()).await, None);
thread::sleep(std::time::Duration::from_millis(1));
cache.insert("key3".to_string(), "value3".to_string()).await;
cache.get(&"key2".to_string()).await;
cache.insert("key4".to_string(), "value4".to_string()).await;
// key2 should be evicted
assert_eq!(cache.remove(&"key2".to_string()).await, None);
assert_eq!(
cache.remove(&"key3".to_string()).await,
Some("value3".to_string())
);
cache.insert("key5".to_string(), "value5".to_string()).await;
});
}
}

0 comments on commit 8e5f2dc

Please sign in to comment.