Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap sidecar in arcs #11554

Merged
merged 26 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions crates/transaction-pool/src/blobstore/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use parking_lot::{Mutex, RwLock};
use reth_primitives::BlobTransactionSidecar;
use schnellru::{ByLength, LruMap};
use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc};
use futures_util::TryStreamExt;
use tracing::{debug, trace};

/// How many [`BlobTransactionSidecar`] to cache in memory.
Expand Down Expand Up @@ -104,7 +105,7 @@ impl BlobStore for DiskFileBlobStore {
stat
}

fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.inner.get_one(tx)
}

Expand All @@ -115,14 +116,14 @@ impl BlobStore for DiskFileBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
self.inner.get_all(txs)
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
Expand Down Expand Up @@ -165,7 +166,7 @@ impl BlobStore for DiskFileBlobStore {

struct DiskFileBlobStoreInner {
blob_dir: PathBuf,
blob_cache: Mutex<LruMap<TxHash, BlobTransactionSidecar, ByLength>>,
blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
size_tracker: BlobStoreSize,
file_lock: RwLock<()>,
txs_to_delete: RwLock<HashSet<B256>>,
Expand Down Expand Up @@ -206,7 +207,7 @@ impl DiskFileBlobStoreInner {
fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
let mut buf = Vec::with_capacity(data.fields_len());
data.encode(&mut buf);
self.blob_cache.lock().insert(tx, data);
self.blob_cache.lock().insert(tx, Arc::new(data));
let size = self.write_one_encoded(tx, &buf)?;

self.size_tracker.add_size(size);
Expand All @@ -228,7 +229,7 @@ impl DiskFileBlobStoreInner {
{
let mut cache = self.blob_cache.lock();
for (tx, data) in txs {
cache.insert(tx, data);
cache.insert(tx, Arc::new(data));
}
}
let mut add = 0;
Expand Down Expand Up @@ -279,15 +280,17 @@ impl DiskFileBlobStoreInner {
}

/// Retrieves the blob for the given transaction hash from the blob cache or disk.
fn get_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if let Some(blob) = self.blob_cache.lock().get(&tx) {
return Ok(Some(blob.clone()))
}
let blob = self.read_one(tx)?;

if let Some(blob) = &blob {
self.blob_cache.lock().insert(tx, blob.clone());
self.blob_cache.lock().insert(tx, Arc::new(blob.clone()));
kdonthi marked this conversation as resolved.
Show resolved Hide resolved
}
Ok(blob)

Ok(blob.map(|e| Arc::new(e)))
}

/// Returns the path to the blob file for the given transaction hash.
Expand All @@ -312,6 +315,7 @@ impl DiskFileBlobStoreInner {
}
}
};

BlobTransactionSidecar::decode(&mut data.as_slice())
.map(Some)
.map_err(BlobStoreError::DecodeError)
Expand Down Expand Up @@ -375,7 +379,7 @@ impl DiskFileBlobStoreInner {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let mut res = Vec::with_capacity(txs.len());
let mut cache_miss = Vec::new();
{
Expand All @@ -397,8 +401,9 @@ impl DiskFileBlobStoreInner {
}
let mut cache = self.blob_cache.lock();
for (tx, data) in from_disk {
cache.insert(tx, data.clone());
res.push((tx, data));
let arc = Arc::new(data.clone());
cache.insert(tx, arc.clone());
res.push((tx, arc.clone()));
}

Ok(res)
Expand All @@ -408,7 +413,7 @@ impl DiskFileBlobStoreInner {
///
/// Returns an error if there are any missing blobs.
#[inline]
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let mut res = Vec::with_capacity(txs.len());
for tx in txs {
let blob = self.get_one(tx)?.ok_or_else(|| BlobStoreError::MissingSidecar(tx))?;
Expand Down Expand Up @@ -491,6 +496,7 @@ pub enum OpenDiskFileBlobStore {

#[cfg(test)]
mod tests {
use std::ops::Deref;
use super::*;
use std::sync::atomic::Ordering;

Expand Down Expand Up @@ -519,11 +525,14 @@ mod tests {
let blobs = rng_blobs(10);
let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
store.insert_all(blobs.clone()).unwrap();

// all cached
for (tx, blob) in &blobs {
assert!(store.is_cached(tx));
assert_eq!(store.get(*tx).unwrap().unwrap(), *blob);
let a = store.get(*tx).unwrap().unwrap().read().unwrap();
assert_eq!(a, *blob);
}

let all = store.get_all(all_hashes.clone()).unwrap();
for (tx, blob) in all {
assert!(blobs.contains(&(tx, blob)), "missing blob {tx:?}");
Expand Down
14 changes: 7 additions & 7 deletions crates/transaction-pool/src/blobstore/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct InMemoryBlobStore {
#[derive(Debug, Default)]
struct InMemoryBlobStoreInner {
/// Storage for all blob data.
store: RwLock<HashMap<B256, BlobTransactionSidecar>>,
store: RwLock<HashMap<B256, Arc<BlobTransactionSidecar>>>,
size_tracker: BlobStoreSize,
}

Expand Down Expand Up @@ -75,7 +75,7 @@ impl BlobStore for InMemoryBlobStore {
}

// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let store = self.inner.store.read();
Ok(store.get(&tx).cloned())
}
Expand All @@ -88,7 +88,7 @@ impl BlobStore for InMemoryBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let mut items = Vec::with_capacity(txs.len());
let store = self.inner.store.read();
for tx in txs {
Expand All @@ -100,7 +100,7 @@ impl BlobStore for InMemoryBlobStore {
Ok(items)
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let mut items = Vec::with_capacity(txs.len());
let store = self.inner.store.read();
for tx in txs {
Expand Down Expand Up @@ -150,7 +150,7 @@ impl BlobStore for InMemoryBlobStore {

/// Removes the given blob from the store and returns the size of the blob that was removed.
#[inline]
fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) -> usize {
fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>, tx: &B256) -> usize {
store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
}

Expand All @@ -159,11 +159,11 @@ fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) ->
/// We don't need to handle the size updates for replacements because transactions are unique.
#[inline]
fn insert_size(
store: &mut HashMap<B256, BlobTransactionSidecar>,
store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>,
tx: B256,
blob: BlobTransactionSidecar,
) -> usize {
let add = blob.size();
store.insert(tx, blob);
store.insert(tx, Arc::new(blob));
add
}
7 changes: 4 additions & 3 deletions crates/transaction-pool/src/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
fmt,
sync::atomic::{AtomicUsize, Ordering},
};
use std::sync::Arc;
pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};

pub mod disk;
Expand Down Expand Up @@ -44,7 +45,7 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn cleanup(&self) -> BlobStoreCleanupStat;

/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Checks if the given transaction hash is in the blob store.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
Expand All @@ -58,13 +59,13 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError>;

/// Returns the exact [`BlobTransactionSidecar`] for the given transaction hashes in the exact
/// order they were requested.
///
/// Returns an error if any of the blobs are not found in the blob store.
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_by_versioned_hashes(
Expand Down
7 changes: 4 additions & 3 deletions crates/transaction-pool/src/blobstore/noop.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobTransactionSidecar};
use alloy_eips::eip4844::BlobAndProofV1;
use alloy_primitives::B256;
Expand Down Expand Up @@ -28,7 +29,7 @@ impl BlobStore for NoopBlobStore {
BlobStoreCleanupStat::default()
}

fn get(&self, _tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, _tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}

Expand All @@ -39,11 +40,11 @@ impl BlobStore for NoopBlobStore {
fn get_all(
&self,
_txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(vec![])
}
Expand Down
8 changes: 5 additions & 3 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ pub mod identifier;
mod ordering;
mod traits;

type SharedBlobTransactionSidecar = Arc<BlobTransactionSidecar>;
kdonthi marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(any(test, feature = "test-utils"))]
/// Common test helpers for mocking a pool
pub mod test_utils;
Expand Down Expand Up @@ -517,21 +519,21 @@ where
self.pool.unique_senders()
}

fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(&self, tx_hash: TxHash) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get(tx_hash)
}

fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
self.pool.blob_store().get_all(tx_hashes)
}

fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get_exact(tx_hashes)
}

Expand Down
3 changes: 2 additions & 1 deletion crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ use std::{
sync::Arc,
time::Instant,
};
use std::ops::Deref;
use tokio::sync::mpsc;
use tracing::{debug, trace, warn};
mod events;
Expand Down Expand Up @@ -306,7 +307,7 @@ where
/// Caution: this assumes the given transaction is eip-4844
fn get_blob_transaction(&self, transaction: TransactionSigned) -> Option<BlobTransaction> {
if let Ok(Some(sidecar)) = self.blob_store.get(transaction.hash()) {
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, sidecar) {
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, *(sidecar.clone())) {
return Some(blob)
}
}
Expand Down
Loading