Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(provider): LRUCache Layer #954

Merged
merged 57 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
0d56877
in-memory cache implementation
yash-atreya Jun 21, 2024
df7d861
load and dump cache from fs
yash-atreya Jun 24, 2024
cc8b11d
use RwLock
yash-atreya Jun 24, 2024
8a1e353
doc nits
yash-atreya Jun 24, 2024
03c50c5
feat: CacheConfig, load/save at specific paths
yash-atreya Jun 24, 2024
0fc93a1
RequestType enum
yash-atreya Jun 24, 2024
9b95833
params hash
yash-atreya Jun 24, 2024
f79a128
clone and arc `CacheProvider`
yash-atreya Jun 24, 2024
e6da7f7
add: get_block_by_hash
yash-atreya Jun 24, 2024
902a24b
todos
yash-atreya Jun 24, 2024
863c868
Merge branch 'main' into yash/provider-cache
yash-atreya Jul 29, 2024
bb05544
refactor: port to transport layer
yash-atreya Aug 1, 2024
5fa27f4
rm provider cache layer
yash-atreya Aug 1, 2024
12a76c3
use parking_lot::RwLock + tracing nits
yash-atreya Aug 6, 2024
64343b0
cleanup nits
yash-atreya Aug 6, 2024
9fee441
nit
yash-atreya Aug 6, 2024
cb1c1b5
move cache instance to layer
yash-atreya Aug 7, 2024
6e9c7f4
Merge branch 'main' into yash/provider-cache
yash-atreya Aug 21, 2024
47ee0c6
resolve conflicts
yash-atreya Sep 19, 2024
5eb4af2
Revert "refactor: port to transport layer"
yash-atreya Sep 20, 2024
3e9538e
use provider cache
yash-atreya Sep 20, 2024
efa027a
use macro
yash-atreya Sep 20, 2024
ab44b80
cached get_proof
yash-atreya Sep 20, 2024
ae9e123
nit
yash-atreya Sep 20, 2024
9f646b2
nit
yash-atreya Sep 20, 2024
c25371a
Merge branch 'main' into yash/provider-cache
yash-atreya Sep 24, 2024
947bd91
use parking_lot
yash-atreya Sep 24, 2024
5653119
Merge branch 'main' into yash/provider-cache
yash-atreya Sep 27, 2024
c82901c
make params hash independent of client
yash-atreya Sep 27, 2024
32a7beb
fix
yash-atreya Sep 27, 2024
0478ded
cache_rpc_call_with_block!
yash-atreya Sep 27, 2024
fa8346c
fix: request type
yash-atreya Sep 27, 2024
6e0c676
redirect reqs with block tags to rpc
yash-atreya Sep 27, 2024
70bc5e1
nits
yash-atreya Sep 27, 2024
f8d1ab6
get_accounts
yash-atreya Sep 27, 2024
d8d4094
chain_id
yash-atreya Sep 27, 2024
d67142c
cfg gate wasm
yash-atreya Sep 30, 2024
8d0e89a
Merge branch 'main' into yash/provider-cache
yash-atreya Oct 4, 2024
2545bb0
rm get_accounts and get_chain_id
yash-atreya Oct 4, 2024
3214f63
rm related tests
yash-atreya Oct 4, 2024
dcfc38a
tests: run_with_temp_dir
yash-atreya Oct 4, 2024
0c88fe9
feat: SharedCache
yash-atreya Oct 4, 2024
d1c5e09
make CacheProvider generic over Network
yash-atreya Oct 4, 2024
cc0c1eb
add more methods
yash-atreya Oct 4, 2024
ce08a88
fmt
yash-atreya Oct 4, 2024
ccb67c1
docs
yash-atreya Oct 4, 2024
55fe2ff
docs
yash-atreya Oct 4, 2024
93d139d
nit
yash-atreya Oct 4, 2024
dcc4481
fix
yash-atreya Oct 4, 2024
60bd432
clippy
yash-atreya Oct 4, 2024
b77279f
mv SharedCache
yash-atreya Oct 5, 2024
8facd6f
use schnellru
yash-atreya Oct 5, 2024
62cfbeb
feat: get_derialized
yash-atreya Oct 5, 2024
6716c93
nits
yash-atreya Oct 5, 2024
19fd125
fix save_cache
yash-atreya Oct 8, 2024
6ec0026
nit
yash-atreya Oct 8, 2024
7e47495
nit
yash-atreya Oct 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/json-rpc/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,15 @@ impl<Payload, ErrData> ResponsePacket<Payload, ErrData> {
Self::Batch(batch) => batch.iter().filter(|res| ids.contains(&res.id)).collect(),
}
}

/// Returns the underlying response if ResponsePacket is of type Single,
/// otherwise returns `None`.
pub const fn single_response(&self) -> Option<&Response<Payload, ErrData>> {
match self {
Self::Single(single) => Some(single),
Self::Batch(_) => None,
}
}
}

/// An Iterator over the [ErrorPayload]s in a [ResponsePacket].
Expand Down
21 changes: 21 additions & 0 deletions crates/provider/src/provider/trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,9 @@ mod tests {
use alloy_network::AnyNetwork;
use alloy_node_bindings::Anvil;
use alloy_primitives::{address, b256, bytes, keccak256};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_eth::request::TransactionRequest;
use alloy_transport::layers::CacheLayer;

fn init_tracing() {
let _ = tracing_subscriber::fmt::try_init();
Expand Down Expand Up @@ -1022,6 +1024,25 @@ mod tests {
assert_eq!(0, num);
}

#[tokio::test]
async fn test_transport_cache_layer() {
init_tracing();
let anvil = Anvil::new().block_time_f64(0.3).spawn();
let cache_layer = CacheLayer::new(10, "./transport-rpc-cache.txt".into());
let client = ClientBuilder::default().layer(cache_layer).http(anvil.endpoint_url());
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved
let provider = ProviderBuilder::new().on_client(client);
let blk = provider.get_block_by_number(0.into(), true).await.unwrap();
let blk2 = provider.get_block_by_number(0.into(), true).await.unwrap();
assert_eq!(blk, blk2);

tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;

let latest_block_num = provider.get_block_number().await.unwrap();
let blk3 = provider.get_block_by_number(latest_block_num.into(), true).await.unwrap();
let blk4 = provider.get_block_by_number(latest_block_num.into(), true).await.unwrap();
assert_eq!(blk3, blk4);
}

#[cfg(feature = "reqwest")]
#[tokio::test]
async fn object_safety() {
Expand Down
3 changes: 3 additions & 0 deletions crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ workspace = true

[dependencies]
alloy-json-rpc.workspace = true
alloy-primitives.workspace = true

base64.workspace = true
futures-util.workspace = true
Expand All @@ -31,6 +32,8 @@ tower.workspace = true
url.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
lru = "0.12"
parking_lot = "0.12.3"
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = { version = "0.4", optional = true }
Expand Down
228 changes: 228 additions & 0 deletions crates/transport/src/layers/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
use crate::{TransportError, TransportErrorKind, TransportFut, TransportResult};
use alloy_json_rpc::{
Id, RequestPacket, Response, ResponsePacket, ResponsePayload, RpcError, SerializedRequest,
};
use alloy_primitives::B256;
use lru::LruCache;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use std::{
io::BufReader,
num::NonZeroUsize,
path::PathBuf,
sync::Arc,
task::{Context, Poll},
};
use tower::{Layer, Service};
use tracing::trace;
/// Cache Layer
#[derive(Debug, Clone)]
pub struct CacheLayer {
/// Config for the cache layer.
config: CacheConfig,
}

impl CacheLayer {
/// Instantiate a new cache layer with the the maximum number of
/// items to store.
#[inline]
pub const fn new(max_items: usize, path: PathBuf) -> Self {
Self { config: CacheConfig { max_items, path } }
}

/// Returns the maximum number of items that can be stored in the cache, set at initialization.
#[inline]
pub const fn max_items(&self) -> usize {
self.config.max_items
}
}

/// Configuration for the cache layer.
/// For future extensibility of the configurations.
#[derive(Debug, Clone)]
pub struct CacheConfig {
/// Maximum number of items to store in the cache.
pub max_items: usize,
/// Path of the cache file.
pub path: PathBuf,
}

impl<S> Layer<S> for CacheLayer {
type Service = CachingService<S>;

fn layer(&self, inner: S) -> Self::Service {
CachingService::new(inner, self.config.clone())
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Caching service.
#[derive(Debug, Clone)]
pub struct CachingService<S> {
/// Inner transport service.
inner: S,
/// Config for the cache layer.
config: CacheConfig,
/// In-memory LRU cache, mapping requests to responses.
cache: Arc<RwLock<LruCache<B256, String>>>,
}

impl<S> Drop for CachingService<S> {
fn drop(&mut self) {
let _ = self.save_cache();
}
}

impl<S> CachingService<S> {
/// Instantiate a new cache service.
pub fn new(inner: S, config: CacheConfig) -> Self {
let cache = Arc::new(RwLock::new(LruCache::<B256, String>::new(
NonZeroUsize::new(config.max_items).unwrap(),
)));
let service = Self { inner, config, cache };

let _loaded = service.load_cache().inspect_err(|e| {
trace!(?e, "Error loading cache");
});

service
}

/// Puts a value into the cache, and returns the old value if it existed.
pub fn put(&self, key: B256, value: String) -> TransportResult<Option<String>> {
let mut cache = self.cache.write();
Ok(cache.put(key, value))
}

/// Gets a value from the cache, if it exists.
pub fn get(&self, key: &B256) -> TransportResult<Option<String>> {
// Need to acquire a write guard to change the order of keys in LRU cache.
let mut cache = self.cache.write();
let val = cache.get(key).cloned();
Ok(val)
}

/// Resolves a `SerializedRequest` into a `RawValue` if it exists in the cache.
pub fn resolve(&self, req: &SerializedRequest) -> TransportResult<Option<Box<RawValue>>> {
let key = req.params_hash();
let value = self.get(&key)?;

match value {
Some(value) => {
let raw = RawValue::from_string(value).map_err(RpcError::ser_err)?;
Ok(Some(raw))
}
None => Ok(None),
}
}

/// Handles a cache hit.
fn handle_cache_hit(&self, id: Id, raw: Box<RawValue>) -> ResponsePacket {
let payload = ResponsePayload::Success(raw);
let response = Response { id, payload };
ResponsePacket::Single(response)
}

/// Saves the cache to a file specified by the path.
/// If the files does not exist, it creates one.
/// If the file exists, it overwrites it.
pub fn save_cache(&self) -> TransportResult<()> {
let path = self.config.path.clone();
let file = std::fs::File::create(path).map_err(TransportErrorKind::custom)?;
let cache = self.cache.read();

// Iterate over the cache and dump to the file.
let entries = cache
.iter()
.map(|(key, value)| FsCacheEntry { key: *key, value: value.clone() })
.collect::<Vec<_>>();
serde_json::to_writer(file, &entries).map_err(TransportErrorKind::custom)?;
Ok(())
}

/// Loads the cache from a file specified by the path.
/// If the file does not exist, it returns without error.
pub fn load_cache(&self) -> TransportResult<()> {
trace!("Loading cache...");
let path = self.config.path.clone();
if !path.exists() {
trace!(?path, "Cache file does not exist.");
return Ok(());
};
let file = std::fs::File::open(path).map_err(TransportErrorKind::custom)?;
let file = BufReader::new(file);
let entries: Vec<FsCacheEntry> =
serde_json::from_reader(file).map_err(TransportErrorKind::custom)?;
let mut cache = self.cache.write();
for entry in entries {
cache.put(entry.key, entry.value);
}

Ok(())
}
}

impl<S> Service<RequestPacket> for CachingService<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError>
+ Send
+ 'static
+ Clone,
S::Future: Send + 'static,
{
type Response = ResponsePacket;
type Error = TransportError;
type Future = TransportFut<'static>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), TransportError>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: RequestPacket) -> Self::Future {
let mut inner = self.inner.clone();
let this = self.clone();
match req.clone() {
RequestPacket::Single(ser_req) => {
let params_hash = ser_req.params_hash();
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved
match this.resolve(&ser_req) {
Ok(Some(raw)) => {
let resp = this.handle_cache_hit(ser_req.id().to_owned(), raw);
Box::pin(async move { Ok(resp) })
}
Ok(None) => {
Box::pin(async move {
match inner.call(req).await {
Ok(resp) => {
// Store success response into cache.
if let Some(res) = resp.single_response() {
let ser = res.payload.as_success().unwrap().to_string();
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved
let _ = this.put(params_hash, ser);
}

Ok(resp)
}
Err(e) => Err(e),
}
})
}
Err(e) => Box::pin(async move { Err(e) }),
}
}
RequestPacket::Batch(_) => Box::pin(async move {
// Ignores cache, forwards request.
match inner.call(req).await {
Ok(resp) => Ok(resp),
Err(e) => Err(e),
}
}),
}
}
}

#[derive(Debug, Serialize, Deserialize)]
struct FsCacheEntry {
/// Hash of the request params
key: B256,
/// Serialized response to the request from which the hash was computed.
value: String,
}
5 changes: 5 additions & 0 deletions crates/transport/src/layers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ mod retry;

/// RetryBackoffLayer
pub use retry::{RateLimitRetryPolicy, RetryBackoffLayer, RetryBackoffService, RetryPolicy};

mod cache;

/// CacheLayer
pub use cache::{CacheConfig, CacheLayer, CachingService};