Skip to content

Commit

Permalink
change to global cache with unique key
Browse files Browse the repository at this point in the history
Signed-off-by: wadeking98 <wkingnumber2@gmail.com>
  • Loading branch information
wadeking98 committed Feb 2, 2024
1 parent 96b2db6 commit 3153e5b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 75 deletions.
2 changes: 1 addition & 1 deletion indy-vdr-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ where
storage_type,
None,
);
let cache = Cache::new(strategy);
let cache = Cache::new(strategy, None);
Some(cache)
} else {
None
Expand Down
14 changes: 11 additions & 3 deletions libindy_vdr/src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ mod resolver;

use crate::common::error::prelude::*;
use crate::config::{PoolConfig, LIB_VERSION};
use crate::pool::cache::{
storage::{new_fs_ordered_store, OrderedHashMap},
strategy::CacheStrategyTTL,
Cache,
};
use crate::pool::{FilesystemCache, PoolTransactionsCache, ProtocolVersion};
use crate::utils::Validatable;

use self::error::{set_last_error, ErrorCode};
use self::pool::{LegerCacheConfig, LEDGER_TXN_CACHE_CONFIG, POOL_CACHE, POOL_CONFIG};
use self::pool::{LEDGER_TXN_CACHE, POOL_CACHE, POOL_CONFIG};

pub type CallbackId = i64;

Expand Down Expand Up @@ -76,7 +81,8 @@ pub extern "C" fn indy_vdr_set_cache_directory(path: FfiStr) -> ErrorCode {
pub extern "C" fn indy_vdr_set_ledger_txn_cache(capacity: usize, expire_offset: u64) -> ErrorCode {
catch_err! {
debug!("Setting pool ledger transactions cache: capacity={}, expire_offset={}", capacity, expire_offset);
*write_lock!(LEDGER_TXN_CACHE_CONFIG)? = Some(LegerCacheConfig::new(capacity, expire_offset.into(), None));
let cache = Cache::new(CacheStrategyTTL::new(capacity, expire_offset.into(), None, None), None);
*write_lock!(LEDGER_TXN_CACHE)? = Some(cache);
Ok(ErrorCode::Success)
}
}
Expand All @@ -89,7 +95,9 @@ pub extern "C" fn indy_vdr_set_ledger_txn_fs_cache(
) -> ErrorCode {
catch_err! {
debug!("Setting pool ledger transactions cache: capacity={}, expire_offset={}", capacity, expire_offset);
*write_lock!(LEDGER_TXN_CACHE_CONFIG)? = Some(LegerCacheConfig::new(capacity, expire_offset.into(), Some(path.into_string())));
let store = OrderedHashMap::new(new_fs_ordered_store(path.into())?);
let cache = Cache::new(CacheStrategyTTL::new(capacity, expire_offset.into(), Some(store), None), None);
*write_lock!(LEDGER_TXN_CACHE)? = Some(cache);
Ok(ErrorCode::Success)
}
}
Expand Down
73 changes: 12 additions & 61 deletions libindy_vdr/src/ffi/pool.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use std::collections::{btree_map::Entry, BTreeMap, HashMap};
use std::os::raw::c_char;
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::{fs, thread};
use std::thread;

use ffi_support::{rust_string_to_c, FfiStr};
use once_cell::sync::Lazy;

use crate::common::error::prelude::*;
use crate::common::handle::ResourceHandle;
use crate::config::PoolConfig;
use crate::pool::cache::storage::{new_fs_ordered_store, OrderedHashMap};
use crate::pool::cache::strategy::CacheStrategyTTL;
use crate::pool::cache::Cache;
use crate::pool::{
InMemoryCache, PoolBuilder, PoolRunner, PoolTransactions, PoolTransactionsCache, RequestMethod,
Expand All @@ -34,50 +31,6 @@ pub struct PoolInstance {
pub node_weights: Option<NodeWeights>,
}

#[derive(Clone)]
pub struct LegerCacheConfig {
pub cache_size: usize,
pub cache_ttl: u128,
pub path: Option<String>,
}

impl LegerCacheConfig {
pub fn new(cache_size: usize, cache_ttl: u128, path: Option<String>) -> Self {
Self {
cache_size,
cache_ttl,
path,
}
}
pub fn create_cache(
&self,
id: Option<String>,
) -> VdrResult<Cache<String, (String, RequestResultMeta)>> {
if let Some(path) = &self.path {
let full_path = Path::new(path).join(id.unwrap_or("default".to_string()));
let full_path_string = full_path
.into_os_string()
.into_string()
.unwrap_or(path.to_string());
fs::create_dir_all(full_path_string.clone())?;
let store = OrderedHashMap::new(new_fs_ordered_store(full_path_string)?);
Ok(Cache::new(CacheStrategyTTL::new(
self.cache_size,
self.cache_ttl,
Some(store),
None,
)))
} else {
Ok(Cache::new(CacheStrategyTTL::new(
self.cache_size,
self.cache_ttl,
None,
None,
)))
}
}
}

pub type NodeWeights = HashMap<String, f32>;

pub static POOL_CONFIG: Lazy<RwLock<PoolConfig>> = Lazy::new(|| RwLock::new(PoolConfig::default()));
Expand All @@ -88,7 +41,7 @@ pub static POOLS: Lazy<RwLock<BTreeMap<PoolHandle, PoolInstance>>> =
pub static POOL_CACHE: Lazy<RwLock<Option<Arc<dyn PoolTransactionsCache>>>> =
Lazy::new(|| RwLock::new(Some(Arc::new(InMemoryCache::new()))));

pub static LEDGER_TXN_CACHE_CONFIG: Lazy<RwLock<Option<LegerCacheConfig>>> =
pub static LEDGER_TXN_CACHE: Lazy<RwLock<Option<Cache<String, (String, RequestResultMeta)>>>> =
Lazy::new(|| RwLock::new(None));

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -119,12 +72,11 @@ pub extern "C" fn indy_vdr_pool_create(params: FfiStr, handle_p: *mut PoolHandle
"Invalid pool create parameters: must provide transactions or transactions_path"
));
};
let txn_cache_config = read_lock!(LEDGER_TXN_CACHE_CONFIG)?.clone();
let txn_cache = if let Some(config) = txn_cache_config {
config.create_cache(txns.root_hash_base58().ok()).ok()
} else {
None
};
// set this cache with unique key prefix
let mut txn_cache = read_lock!(LEDGER_TXN_CACHE)?.clone();
if let Some(ref mut cache) = txn_cache{
cache.set_key_prefix(txns.root_hash_base58().ok());
}
let mut cached = false;
if let Some(cache) = read_lock!(POOL_CACHE)?.as_ref() {
if let Some(newer_txns) = cache.resolve_latest(&txns)? {
Expand Down Expand Up @@ -159,12 +111,11 @@ fn handle_pool_refresh(
cache.update(&init_txns, latest_txns)?;
}
if let Some(new_txns) = new_txns {
let txn_cache_config = read_lock!(LEDGER_TXN_CACHE_CONFIG)?.clone();
let txn_cache = if let Some(config) = txn_cache_config {
config.create_cache(init_txns.root_hash_base58().ok()).ok()
} else {
None
};
// set this cache with unique key prefix
let mut txn_cache = read_lock!(LEDGER_TXN_CACHE)?.clone();
if let Some(ref mut cache) = txn_cache{
cache.set_key_prefix(init_txns.root_hash_base58().ok());
}
let runner = PoolBuilder::new(config, new_txns).node_weights(node_weights).refreshed(true).into_runner(txn_cache)?;
let mut pools = write_lock!(POOLS)?;
if let Entry::Occupied(mut entry) = pools.entry(pool_handle) {
Expand Down
31 changes: 23 additions & 8 deletions libindy_vdr/src/pool/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_lock::RwLock;
use async_trait::async_trait;
use std::sync::Arc;
use std::{fmt::Display, sync::Arc};

pub mod storage;
pub mod strategy;
Expand All @@ -15,35 +15,50 @@ pub trait CacheStrategy<K, V>: Send + Sync + 'static {
}

pub struct Cache<K, V> {
storage: Arc<RwLock<dyn CacheStrategy<K, V>>>,
storage: Arc<RwLock<dyn CacheStrategy<String, V>>>,
key_prefix: Option<K>,
}

impl<K: 'static, V: 'static> Cache<K, V> {
pub fn new(storage: impl CacheStrategy<K, V>) -> Self {
impl<K: Display + 'static, V: 'static> Cache<K, V> {
fn full_key(&self, key: &K) -> String {
match &self.key_prefix {
Some(prefix) => format!("{}{}", prefix, key),
None => key.to_string(),
}
}
pub fn set_key_prefix(&mut self, key_prefix: Option<K>) {
self.key_prefix = key_prefix;
}
pub fn new(storage: impl CacheStrategy<String, V>, key_prefix: Option<K>) -> Self {
Self {
storage: Arc::new(RwLock::new(storage)),
key_prefix,
}
}
pub async fn get(&self, key: &K) -> Option<V> {
self.storage.read().await.get(key).await
let full_key = self.full_key(key);
self.storage.read().await.get(&full_key).await
}
pub async fn remove(&self, key: &K) -> Option<V> {
self.storage.write().await.remove(key).await
let full_key = self.full_key(key);
self.storage.write().await.remove(&full_key).await
}
pub async fn insert(&self, key: K, value: V, custom_exp_offset: Option<u128>) -> Option<V> {
let full_key = self.full_key(&key);
self.storage
.write()
.await
.insert(key, value, custom_exp_offset)
.insert(full_key, value, custom_exp_offset)
.await
}
}

// need to implement Clone manually because Mutex<dyn CacheStrategy> doesn't implement Clone
impl<K, V> Clone for Cache<K, V> {
impl<K: Display + Clone, V> Clone for Cache<K, V> {
fn clone(&self) -> Self {
Self {
storage: self.storage.clone(),
key_prefix: self.key_prefix.clone(),
}
}
}
4 changes: 2 additions & 2 deletions libindy_vdr/src/pool/cache/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ mod tests {

#[rstest]
fn test_cache_ttl() {
let cache = Cache::new(CacheStrategyTTL::new(2, 5, None, None));
let cache = Cache::new(CacheStrategyTTL::new(2, 5, None, None), None);
let cache_location = "test_fs_cache_ttl";
let tree = sled::open(cache_location)
.unwrap()
.open_tree(cache_location)
.unwrap();
let storage: OrderedHashMap<String, u128, TTLCacheItem<String>> = OrderedHashMap::new(tree);
let fs_cache = Cache::new(CacheStrategyTTL::new(2, 5, Some(storage), None));
let fs_cache = Cache::new(CacheStrategyTTL::new(2, 5, Some(storage), None), None);
let caches = vec![cache, fs_cache];
block_on(async {
for cache in caches {
Expand Down

0 comments on commit 3153e5b

Please sign in to comment.