Skip to content

Commit

Permalink
Added support for LRU memCache
Browse files Browse the repository at this point in the history
Signed-off-by: wadeking98 <wkingnumber2@gmail.com>
  • Loading branch information
wadeking98 committed Jan 18, 2024
1 parent ea01382 commit 43623a9
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 118 deletions.
40 changes: 9 additions & 31 deletions indy-vdr-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use hyper_tls::HttpsConnector;
use hyper_unix_connector::UnixConnector;

use indy_vdr::pool::cache::{Cache, MemCacheStorage};
use indy_vdr::pool::RequestResultMeta;
#[cfg(feature = "tls")]
use rustls_pemfile::{certs, pkcs8_private_keys};
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -183,23 +182,11 @@ async fn init_app_state(
Ok(state)
}

async fn run_pools(
state: Rc<RefCell<AppState>>,
init_refresh: bool,
interval_refresh: u32,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) {
async fn run_pools(state: Rc<RefCell<AppState>>, init_refresh: bool, interval_refresh: u32) {
let mut pool_states = HashMap::new();

for (namespace, pool_state) in &state.clone().borrow().pool_states {
let pool_state = match create_pool(
state.clone(),
namespace.as_str(),
init_refresh,
cache.clone(),
)
.await
{
let pool_state = match create_pool(state.clone(), namespace.as_str(), init_refresh).await {
Ok(pool) => {
let pool = Some(pool.clone());
PoolState {
Expand Down Expand Up @@ -227,7 +214,7 @@ async fn run_pools(
if interval_refresh > 0 {
loop {
select! {
refresh_result = refresh_pools(state.clone(), interval_refresh, cache.clone()) => {
refresh_result = refresh_pools(state.clone(), interval_refresh) => {
match refresh_result {
Ok(upd_pool_states) => {
state.borrow_mut().pool_states = upd_pool_states;
Expand Down Expand Up @@ -272,14 +259,13 @@ async fn create_pool(
state: Rc<RefCell<AppState>>,
namespace: &str,
refresh: bool,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<LocalPool> {
let pool_states = &state.borrow().pool_states;
let pool_state = pool_states.get(namespace).unwrap();
let pool =
PoolBuilder::new(PoolConfig::default(), pool_state.transactions.clone()).into_local()?;
let refresh_pool = if refresh {
refresh_pool(state.clone(), &pool, 0, cache).await?
refresh_pool(state.clone(), &pool, 0).await?
} else {
None
};
Expand All @@ -290,14 +276,12 @@ async fn refresh_pools(
state: Rc<RefCell<AppState>>,
// pool_states: HashMap<String, PoolState>,
delay_mins: u32,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<HashMap<String, PoolState>> {
let mut upd_pool_states = HashMap::new();
let pool_states = &state.borrow().pool_states;
for (namespace, pool_state) in pool_states {
if let Some(pool) = &pool_state.pool {
let upd_pool = match refresh_pool(state.clone(), pool, delay_mins, cache.clone()).await
{
let upd_pool = match refresh_pool(state.clone(), pool, delay_mins).await {
Ok(p) => p,
Err(err) => {
eprintln!(
Expand All @@ -324,14 +308,13 @@ async fn refresh_pool(
state: Rc<RefCell<AppState>>,
pool: &LocalPool,
delay_mins: u32,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<Option<LocalPool>> {
let n_pools = state.borrow().pool_states.len() as u32;
if delay_mins > 0 {
tokio::time::sleep(Duration::from_secs((delay_mins * 60 / n_pools) as u64)).await
}

let (txns, _meta) = perform_refresh(pool, cache).await?;
let (txns, _meta) = perform_refresh(pool).await?;
if let Some(txns) = txns {
let pool = PoolBuilder::new(PoolConfig::default(), txns)
.refreshed(true)
Expand Down Expand Up @@ -445,19 +428,14 @@ where
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let cache = if config.cache {
let mem_storage = MemCacheStorage::new();
let mem_cache = Cache::new(mem_storage, 86400);
let mem_storage = MemCacheStorage::new(1024);
let mem_cache = Cache::new(mem_storage);
Some(mem_cache)
} else {
None
};

let until_done = run_pools(
state.clone(),
config.init_refresh,
config.interval_refresh,
cache.clone(),
);
let until_done = run_pools(state.clone(), config.init_refresh, config.interval_refresh);
let svc = make_service_fn(move |_| {
let state = state.clone();
let cache = cache.clone();
Expand Down
2 changes: 1 addition & 1 deletion libindy_vdr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ thiserror = "1.0"
time = { version = "=0.3.20", features = ["parsing"] }
url = "2.2.2"
zmq = "0.9"
tokio = { version = "1.35.1", features = ["full"] }
async-trait = "0.1.77"
async-lock = "3.3.0"

[dev-dependencies]
rstest = "0.18"
Expand Down
147 changes: 80 additions & 67 deletions libindy_vdr/src/pool/cache.rs
Original file line number Diff line number Diff line change
@@ -1,112 +1,120 @@
use async_trait::async_trait;
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
hash::Hash,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};

use tokio::sync::RwLock;
use async_lock::Mutex;

#[async_trait]
pub trait CacheStorage<K, V>: Send + Sync + 'static {
async fn get(&self, key: &K) -> Option<(V, u64)>;
// Needs to be mutable bc some implementations may need to update the the LRU index of the cache
async fn get(&mut self, key: &K) -> Option<V>;

async fn remove(&mut self, key: &K) -> Option<(V, u64)>;
async fn remove(&mut self, key: &K) -> Option<V>;

async fn insert(&mut self, key: K, value: V, expiration: u64) -> Option<(V, u64)>;
async fn insert(&mut self, key: K, value: V) -> Option<V>;
}

pub struct Cache<K, V> {
storage: Arc<RwLock<dyn CacheStorage<K, V>>>,
expiration_offset: u64,
storage: Arc<Mutex<dyn CacheStorage<K, V>>>,
}

impl<K: 'static, V: 'static> Cache<K, V> {
pub fn new(storage: impl CacheStorage<K, V>, expiration_offset: u64) -> Self {
pub fn new(storage: impl CacheStorage<K, V>) -> Self {
Self {
storage: Arc::new(RwLock::new(storage)),
expiration_offset,
storage: Arc::new(Mutex::new(storage)),
}
}
pub async fn get(&self, key: &K) -> Option<V> {
match self.storage.read().await.get(key).await {
Some((item, expiry)) => {
if expiry > 0
&& expiry
< SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
{
None
} else {
Some(item)
}
}
None => None,
}
pub async fn get(&mut self, key: &K) -> Option<V> {
self.storage.lock().await.get(key).await
}
pub async fn remove(&mut self, key: &K) -> Option<V> {
match self.storage.write().await.remove(key).await {
Some(item) => Some(item.0),
None => None,
}
self.storage.lock().await.remove(key).await
}
pub async fn insert(&mut self, key: K, value: V) -> Option<V> {
let exp_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
+ self.expiration_offset;
match self
.storage
.write()
.await
.insert(key, value, exp_timestamp)
.await
{
Some(item) => Some(item.0),
None => None,
}
self.storage.lock().await.insert(key, value).await
}
}

// need to implement Clone manually because RwLock<dyn CacheStorage> doesn't implement Clone
// need to implement Clone manually because Mutex<dyn CacheStorage> doesn't implement Clone
impl<K, V> Clone for Cache<K, V> {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
expiration_offset: self.expiration_offset,
}
}
}

/// A simple in-memory LRU cache
/// Uses a hashmap for lookups and a BTreeMap for ordering by least recently used
pub struct MemCacheStorage<K, V> {
cache: HashMap<K, (V, u64)>,
cache_lookup: HashMap<K, (V, u64)>,
cache_order: BTreeMap<u64, K>,
capacity: usize,
}

impl<K, V> MemCacheStorage<K, V> {
pub fn new() -> Self {
pub fn new(capacity: usize) -> Self {
Self {
cache: HashMap::new(),
cache_lookup: HashMap::new(),
cache_order: BTreeMap::new(),
capacity,
}
}
}
#[async_trait]
impl<K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static> CacheStorage<K, V>
for MemCacheStorage<K, V>
impl<K: Hash + Eq + Send + Sync + 'static + Clone, V: Clone + Send + Sync + 'static>
CacheStorage<K, V> for MemCacheStorage<K, V>
{
async fn get(&self, key: &K) -> Option<(V, u64)> {
self.cache.get(key).map(|(v, e)| (v.clone(), *e))
async fn get(&mut self, key: &K) -> Option<V> {
// move the key to the end of the LRU index
// this is O(log(n)) in the worst case, but in the average case it's close to O(1)
match self.cache_lookup.get(key) {
Some((v, ts)) => {
self.cache_order.remove(ts).unwrap();
self.cache_order.entry(ts + 1).or_insert(key.clone());
Some(v.clone())
}
None => None,
}
}
async fn remove(&mut self, key: &K) -> Option<(V, u64)> {
self.cache.remove(key)
async fn remove(&mut self, key: &K) -> Option<V> {
let lru_val = self.cache_lookup.remove(key);
match lru_val {
Some((v, ts)) => {
self.cache_order.remove(&ts);
Some(v)
}
None => None,
}
}
async fn insert(&mut self, key: K, value: V, expiration: u64) -> Option<(V, u64)> {
self.cache
.insert(key, (value, expiration))
.map(|(v, e)| (v.clone(), e))
async fn insert(&mut self, key: K, value: V) -> Option<V> {
// this will be O(log(n)) in all cases when cache is at capacity since we need to fetch the first and last element from the btree
let highest_lru = self
.cache_order
.last_key_value()
.map(|(ts, _)| ts + 1)
.unwrap_or(0);
if self.cache_lookup.len() >= self.capacity {
// remove the LRU item
let (lru_ts, lru_key) = match self.cache_order.first_key_value() {
Some((ts, key)) => (*ts, key.clone()),
None => return None,
};
self.cache_lookup.remove(&lru_key);
self.cache_order.remove(&lru_ts);
};

self.cache_order.insert(highest_lru, key.clone());
match self
.cache_lookup
.insert(key.clone(), (value.clone(), highest_lru))
{
Some((v, _)) => Some(v),
None => None,
}
}
}

Expand All @@ -118,21 +126,26 @@ mod tests {
#[rstest]
fn test_cache() {
use super::*;
use std::{thread, time::Duration};

let mut cache = Cache::new(MemCacheStorage::new(), 1);
let mut cache = Cache::new(MemCacheStorage::new(2));
block_on(async {
cache.insert("key".to_string(), "value".to_string()).await;
assert_eq!(
cache.get(&"key".to_string()).await,
Some("value".to_string())
);
thread::sleep(Duration::from_secs(2));
cache.insert("key1".to_string(), "value1".to_string()).await;
cache.insert("key2".to_string(), "value2".to_string()).await;
assert_eq!(cache.get(&"key".to_string()).await, None);
cache.insert("key3".to_string(), "value3".to_string()).await;
cache.get(&"key2".to_string()).await; // move key2 to the end of the LRU index
cache.insert("key4".to_string(), "value4".to_string()).await;
// key3 should be evicted
assert_eq!(
cache.remove(&"key".to_string()).await,
Some("value".to_string())
cache.remove(&"key2".to_string()).await,
Some("value2".to_string())
);
assert_eq!(cache.remove(&"key3".to_string()).await, None);
});
}
}
5 changes: 2 additions & 3 deletions libindy_vdr/src/pool/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ pub async fn perform_pool_catchup_request<T: Pool>(
/// Perform a pool ledger status request followed by a catchup request if necessary
pub async fn perform_refresh<T: Pool>(
pool: &T,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<(Option<PoolTransactions>, RequestResultMeta)> {
let (result, meta) = perform_pool_status_request(pool, cache).await?;
let (result, meta) = perform_pool_status_request(pool, None).await?;
trace!("Got status result: {:?}", &result);
match result {
RequestResult::Reply(target) => match target {
Expand Down Expand Up @@ -224,7 +223,7 @@ pub async fn perform_ledger_request<T: Pool>(
let cache_key = prepared.get_cache_key()?;

if is_read_req {
if let Some(ref cache) = cache_opt {
if let Some(mut cache) = cache_opt.clone() {
if let Some((response, meta)) = cache.get(&cache_key).await {
return Ok((RequestResult::Reply(response), meta));
}
Expand Down
Loading

0 comments on commit 43623a9

Please sign in to comment.