Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: transaction caching #252

Merged
merged 32 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
584adb0
playing around with transaction caching
wadeking98 Jan 16, 2024
b752781
implemented caching
wadeking98 Jan 17, 2024
4478b2f
finished writing tests
wadeking98 Jan 17, 2024
aa57d69
fixed cache storage to be async
wadeking98 Jan 17, 2024
0b2ddd0
fixed request cache key
wadeking98 Jan 18, 2024
2b99028
updated formatting
wadeking98 Jan 18, 2024
ea01382
fixed tests
wadeking98 Jan 18, 2024
43623a9
Added support for LRU memCache
wadeking98 Jan 18, 2024
8e5f2dc
Added a TTL cache and improved LRU cache
wadeking98 Jan 19, 2024
c293903
changed cache lock to rwlock
wadeking98 Jan 19, 2024
cb79261
updated ttl cache expiration policy
wadeking98 Jan 19, 2024
c54dc01
factored ordered hash map code
wadeking98 Jan 19, 2024
bd281a7
moved cache functionality to module
wadeking98 Jan 19, 2024
5cb1297
skip caching if data is null
wadeking98 Jan 20, 2024
98722bb
boilerplate for fs caching
wadeking98 Jan 23, 2024
b52ea45
added storage filesystem caching strategy
wadeking98 Jan 23, 2024
e811e5d
fixed cache storage
wadeking98 Jan 24, 2024
8838b05
added optional per-insert configurations
wadeking98 Jan 24, 2024
9922aba
code touchup
wadeking98 Jan 25, 2024
3875fae
prevented caching results with null seqno
wadeking98 Jan 25, 2024
e26db67
added ffi bindings for new cache
wadeking98 Jan 25, 2024
0291d7b
cleaned up pool creation with cache
wadeking98 Jan 26, 2024
5c6f58c
merged ttl and lru cache strategies
wadeking98 Jan 26, 2024
20bb31e
removed extraneous sleeps
wadeking98 Jan 26, 2024
96b2db6
created per-ledger cache for FFIs
wadeking98 Jan 29, 2024
59244e8
change to global cache with unique key
wadeking98 Feb 2, 2024
9c3877a
updated ffi cache methods
wadeking98 Feb 2, 2024
df072ba
cleaned up ordered hash map
wadeking98 Feb 2, 2024
19fc142
using new_txns for key prefix
wadeking98 Feb 2, 2024
0224681
quick syntax cleanup
wadeking98 Feb 2, 2024
f0b6530
updated TS wrappers
wadeking98 Feb 5, 2024
12deec6
switched to signed typing for ffis
wadeking98 Feb 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions indy-vdr-proxy/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub struct Config {
pub is_multiple: bool,
pub tls_cert_path: Option<String>,
pub tls_key_path: Option<String>,
pub cache: bool,
pub cache_size: usize,
pub cache_path: Option<String>,
}

pub fn load_config() -> Result<Config, String> {
Expand Down Expand Up @@ -81,6 +84,21 @@ pub fn load_config() -> Result<Config, String> {
.long("tls-key")
.value_name("KEY")
.help("Path to the TLS private key file")
).arg(
Arg::new("use-cache")
.long("use-cache").action(ArgAction::SetTrue)
.value_name("CACHE")
.help("Whether to use cache or not")
).arg(
Arg::new("cache-size")
.long("cache-size")
.value_name("CACHE_SIZE")
.help("Size of cache")
).arg(
Arg::new("cache-path")
.long("cache-path")
.value_name("CACHE_PATH")
.help("Path to cache")
);

#[cfg(unix)]
Expand Down Expand Up @@ -139,6 +157,13 @@ pub fn load_config() -> Result<Config, String> {

let tls_cert_path = matches.get_one::<String>("tls-cert").cloned();
let tls_key_path = matches.get_one::<String>("tls-key").cloned();
let cache = matches.get_flag("use-cache");
let cache_size = matches
.get_one::<String>("cache-size")
.map(|ival| ival.parse::<usize>().map_err(|_| "Invalid cache size"))
.transpose()?
.unwrap_or(1000);
let cache_path = matches.get_one::<String>("cache-path").cloned();

Ok(Config {
genesis,
Expand All @@ -152,5 +177,8 @@ pub fn load_config() -> Result<Config, String> {
is_multiple,
tls_cert_path,
tls_key_path,
cache,
cache_size,
cache_path,
})
}
103 changes: 70 additions & 33 deletions indy-vdr-proxy/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::rc::Rc;
use std::time::UNIX_EPOCH;

use hyper::{Body, Method, Request, Response, StatusCode};
use indy_vdr::pool::cache::Cache;
use percent_encoding::percent_decode_str;
use regex::Regex;

Expand Down Expand Up @@ -300,6 +301,7 @@ async fn get_attrib<T: Pool>(
raw: &str,
seq_no: Option<i32>,
timestamp: Option<u64>,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let dest = DidValue::from_str(dest)?;
let request = pool.get_request_builder().build_get_attrib_request(
Expand All @@ -311,7 +313,7 @@ async fn get_attrib<T: Pool>(
seq_no,
timestamp,
)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

Expand All @@ -320,59 +322,80 @@ async fn get_nym<T: Pool>(
nym: &str,
seq_no: Option<i32>,
timestamp: Option<u64>,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let nym = DidValue::from_str(nym)?;
let request = pool
.get_request_builder()
.build_get_nym_request(None, &nym, seq_no, timestamp)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

async fn get_schema<T: Pool>(pool: &T, schema_id: &str) -> VdrResult<ResponseType> {
async fn get_schema<T: Pool>(
pool: &T,
schema_id: &str,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let schema_id = SchemaId::from_str(schema_id)?;
let request = pool
.get_request_builder()
.build_get_schema_request(None, &schema_id)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

async fn get_cred_def<T: Pool>(pool: &T, cred_def_id: &str) -> VdrResult<ResponseType> {
async fn get_cred_def<T: Pool>(
pool: &T,
cred_def_id: &str,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let cred_def_id = CredentialDefinitionId::from_str(cred_def_id)?;
let request = pool
.get_request_builder()
.build_get_cred_def_request(None, &cred_def_id)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

async fn get_revoc_reg_def<T: Pool>(pool: &T, revoc_reg_def_id: &str) -> VdrResult<ResponseType> {
async fn get_revoc_reg_def<T: Pool>(
pool: &T,
revoc_reg_def_id: &str,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let revoc_reg_def_id = RevocationRegistryId::from_str(revoc_reg_def_id)?;
let request = pool
.get_request_builder()
.build_get_revoc_reg_def_request(None, &revoc_reg_def_id)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

async fn get_revoc_reg<T: Pool>(pool: &T, revoc_reg_def_id: &str) -> VdrResult<ResponseType> {
async fn get_revoc_reg<T: Pool>(
pool: &T,
revoc_reg_def_id: &str,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let revoc_reg_def_id = RevocationRegistryId::from_str(revoc_reg_def_id)?;
let request = pool.get_request_builder().build_get_revoc_reg_request(
None,
&revoc_reg_def_id,
timestamp_now(),
)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

async fn get_revoc_reg_delta<T: Pool>(pool: &T, revoc_reg_def_id: &str) -> VdrResult<ResponseType> {
async fn get_revoc_reg_delta<T: Pool>(
pool: &T,
revoc_reg_def_id: &str,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let revoc_reg_def_id = RevocationRegistryId::from_str(revoc_reg_def_id)?;
let request = pool
.get_request_builder()
.build_get_revoc_reg_delta_request(None, &revoc_reg_def_id, None, timestamp_now())?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

Expand All @@ -383,19 +406,25 @@ async fn test_get_validator_info<T: Pool>(pool: &T, pretty: bool) -> VdrResult<S
}
*/

async fn get_taa<T: Pool>(pool: &T) -> VdrResult<ResponseType> {
async fn get_taa<T: Pool>(
pool: &T,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let request = pool
.get_request_builder()
.build_get_txn_author_agreement_request(None, None)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

async fn get_aml<T: Pool>(pool: &T) -> VdrResult<ResponseType> {
async fn get_aml<T: Pool>(
pool: &T,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let request = pool
.get_request_builder()
.build_get_acceptance_mechanisms_request(None, None, None)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

Expand All @@ -404,6 +433,7 @@ async fn get_auth_rule<T: Pool>(
auth_type: Option<String>,
auth_action: Option<String>,
field: Option<String>,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let request = pool.get_request_builder().build_get_auth_rule_request(
None,
Expand All @@ -413,24 +443,30 @@ async fn get_auth_rule<T: Pool>(
None,
None,
)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, cache).await?;
Ok(result.into())
}

async fn get_txn<T: Pool>(pool: &T, ledger: LedgerType, seq_no: i32) -> VdrResult<ResponseType> {
let result = perform_get_txn(pool, ledger.to_id(), seq_no).await?;
async fn get_txn<T: Pool>(
pool: &T,
ledger: LedgerType,
seq_no: i32,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> VdrResult<ResponseType> {
let result = perform_get_txn(pool, ledger.to_id(), seq_no, cache).await?;
Ok(result.into())
}

async fn submit_request<T: Pool>(pool: &T, message: Vec<u8>) -> VdrResult<ResponseType> {
let request = PreparedRequest::from_request_json(message)?;
let result = perform_ledger_request(pool, &request).await?;
let result = perform_ledger_request(pool, &request, None).await?;
Ok(result.into())
}

pub async fn handle_request(
req: Request<Body>,
state: Rc<RefCell<AppState>>,
cache: Option<Cache<String, (String, RequestResultMeta)>>,
) -> Result<Response<Body>, hyper::Error> {
let mut parts = req
.uri()
Expand Down Expand Up @@ -532,12 +568,12 @@ pub async fn handle_request(
let resolver = Resolver::new(pool);
// is DID Url
if did.find('/').is_some() {
match resolver.dereference(did).await {
match resolver.dereference(did, cache.clone()).await {
Ok(result) => Ok(ResponseType::Resolver(result)),
Err(err) => http_status_msg(StatusCode::BAD_REQUEST, err.to_string()),
}
} else {
match resolver.resolve(did).await {
match resolver.resolve(did, cache).await {
Ok(result) => Ok(ResponseType::Resolver(result)),
Err(err) => http_status_msg(StatusCode::BAD_REQUEST, err.to_string()),
}
Expand All @@ -558,8 +594,8 @@ pub async fn handle_request(
}
}
(&Method::GET, "genesis") => get_pool_genesis(&pool).await,
(&Method::GET, "taa") => get_taa(&pool).await,
(&Method::GET, "aml") => get_aml(&pool).await,
(&Method::GET, "taa") => get_taa(&pool, cache.clone()).await,
(&Method::GET, "aml") => get_aml(&pool, cache.clone()).await,
(&Method::GET, "attrib") => {
if let (Some(dest), Some(attrib)) = (parts.next(), parts.next()) {
// NOTE: 'endpoint' is currently the only supported attribute
Expand All @@ -569,7 +605,7 @@ pub async fn handle_request(
let timestamp: Option<u64> = query_params
.get("timestamp")
.and_then(|ts| ts.as_str().parse().ok());
get_attrib(&pool, &dest, &attrib, seq_no, timestamp).await
get_attrib(&pool, &dest, &attrib, seq_no, timestamp, cache.clone()).await
} else {
http_status(StatusCode::NOT_FOUND)
}
Expand All @@ -582,18 +618,19 @@ pub async fn handle_request(
Some(auth_type.to_owned()),
Some(auth_action.to_owned()),
Some("*".to_owned()),
cache.clone(),
)
.await
} else {
http_status(StatusCode::NOT_FOUND)
}
} else {
get_auth_rule(&pool, None, None, None).await // get all
get_auth_rule(&pool, None, None, None, cache.clone()).await // get all
}
}
(&Method::GET, "cred_def") => {
if let Some(cred_def_id) = parts.next() {
get_cred_def(&pool, &cred_def_id).await
get_cred_def(&pool, &cred_def_id, cache.clone()).await
} else {
http_status(StatusCode::NOT_FOUND)
}
Expand All @@ -606,35 +643,35 @@ pub async fn handle_request(
let timestamp: Option<u64> = query_params
.get("timestamp")
.and_then(|ts| ts.as_str().parse().ok());
get_nym(&pool, &nym, seq_no, timestamp).await
get_nym(&pool, &nym, seq_no, timestamp, cache.clone()).await
} else {
http_status(StatusCode::NOT_FOUND)
}
}
(&Method::GET, "rev_reg_def") => {
if let Some(rev_reg_def_id) = parts.next() {
get_revoc_reg_def(&pool, &rev_reg_def_id).await
get_revoc_reg_def(&pool, &rev_reg_def_id, cache.clone()).await
} else {
http_status(StatusCode::NOT_FOUND)
}
}
(&Method::GET, "rev_reg") => {
if let Some(rev_reg_def_id) = parts.next() {
get_revoc_reg(&pool, &rev_reg_def_id).await
get_revoc_reg(&pool, &rev_reg_def_id, cache.clone()).await
} else {
http_status(StatusCode::NOT_FOUND)
}
}
(&Method::GET, "rev_reg_delta") => {
if let Some(rev_reg_def_id) = parts.next() {
get_revoc_reg_delta(&pool, &rev_reg_def_id).await
get_revoc_reg_delta(&pool, &rev_reg_def_id, cache.clone()).await
} else {
http_status(StatusCode::NOT_FOUND)
}
}
(&Method::GET, "schema") => {
if let Some(schema_id) = parts.next() {
get_schema(&pool, &schema_id).await
get_schema(&pool, &schema_id, cache.clone()).await
} else {
http_status(StatusCode::NOT_FOUND)
}
Expand All @@ -644,7 +681,7 @@ pub async fn handle_request(
if let (Ok(ledger), Ok(txn)) =
(LedgerType::try_from(ledger.as_str()), txn.parse::<i32>())
{
get_txn(&pool, ledger, txn).await
get_txn(&pool, ledger, txn, cache.clone()).await
} else {
http_status(StatusCode::NOT_FOUND)
}
Expand Down
Loading
Loading