Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap sidecar in arcs #11554

Merged
merged 26 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/ethereum/payload/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ where
let mut payload = EthBuiltPayload::new(attributes.id, sealed_block, total_fees, Some(executed));

// extend the payload with the blob sidecars from the executed txs
payload.extend_sidecars(blob_sidecars);
let blob_sidecars_deref = blob_sidecars.into_iter().map(|x| (*x).clone()).collect();
payload.extend_sidecars(blob_sidecars_deref);

Ok(BuildOutcome::Better { payload, cached_reads })
}
47 changes: 31 additions & 16 deletions crates/transaction-pool/src/blobstore/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl BlobStore for DiskFileBlobStore {
stat
}

fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.inner.get_one(tx)
}

Expand All @@ -114,14 +114,17 @@ impl BlobStore for DiskFileBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
self.inner.get_all(txs)
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(Vec::new())
}
Expand Down Expand Up @@ -164,7 +167,7 @@ impl BlobStore for DiskFileBlobStore {

struct DiskFileBlobStoreInner {
blob_dir: PathBuf,
blob_cache: Mutex<LruMap<TxHash, BlobTransactionSidecar, ByLength>>,
blob_cache: Mutex<LruMap<TxHash, Arc<BlobTransactionSidecar>, ByLength>>,
size_tracker: BlobStoreSize,
file_lock: RwLock<()>,
txs_to_delete: RwLock<HashSet<B256>>,
Expand Down Expand Up @@ -205,7 +208,7 @@ impl DiskFileBlobStoreInner {
fn insert_one(&self, tx: B256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError> {
let mut buf = Vec::with_capacity(data.rlp_encoded_fields_length());
data.rlp_encode_fields(&mut buf);
self.blob_cache.lock().insert(tx, data);
self.blob_cache.lock().insert(tx, Arc::new(data));
let size = self.write_one_encoded(tx, &buf)?;

self.size_tracker.add_size(size);
Expand All @@ -227,7 +230,7 @@ impl DiskFileBlobStoreInner {
{
let mut cache = self.blob_cache.lock();
for (tx, data) in txs {
cache.insert(tx, data);
cache.insert(tx, Arc::new(data));
}
}
let mut add = 0;
Expand Down Expand Up @@ -278,15 +281,19 @@ impl DiskFileBlobStoreInner {
}

/// Retrieves the blob for the given transaction hash from the blob cache or disk.
fn get_one(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_one(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if let Some(blob) = self.blob_cache.lock().get(&tx) {
return Ok(Some(blob.clone()))
}
let blob = self.read_one(tx)?;

if let Some(blob) = &blob {
self.blob_cache.lock().insert(tx, blob.clone());
let blob_arc = Arc::new(blob.clone());
self.blob_cache.lock().insert(tx, blob_arc.clone());
return Ok(Some(blob_arc))
}
Ok(blob)

Ok(None)
}

/// Returns the path to the blob file for the given transaction hash.
Expand Down Expand Up @@ -374,7 +381,7 @@ impl DiskFileBlobStoreInner {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let mut res = Vec::with_capacity(txs.len());
let mut cache_miss = Vec::new();
{
Expand All @@ -396,8 +403,9 @@ impl DiskFileBlobStoreInner {
}
let mut cache = self.blob_cache.lock();
for (tx, data) in from_disk {
cache.insert(tx, data.clone());
res.push((tx, data));
let arc = Arc::new(data.clone());
cache.insert(tx, arc.clone());
res.push((tx, arc.clone()));
}

Ok(res)
Expand All @@ -407,7 +415,10 @@ impl DiskFileBlobStoreInner {
///
/// Returns an error if there are any missing blobs.
#[inline]
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
txs.into_iter()
.map(|tx| self.get_one(tx)?.ok_or(BlobStoreError::MissingSidecar(tx)))
.collect()
Expand Down Expand Up @@ -486,9 +497,10 @@ pub enum OpenDiskFileBlobStore {

#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;

use super::*;

fn tmp_store() -> (DiskFileBlobStore, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let store = DiskFileBlobStore::open(dir.path(), Default::default()).unwrap();
Expand All @@ -514,14 +526,17 @@ mod tests {
let blobs = rng_blobs(10);
let all_hashes = blobs.iter().map(|(tx, _)| *tx).collect::<Vec<_>>();
store.insert_all(blobs.clone()).unwrap();

// all cached
for (tx, blob) in &blobs {
assert!(store.is_cached(tx));
assert_eq!(store.get(*tx).unwrap().unwrap(), *blob);
let b = (*(store.get(*tx).unwrap().unwrap())).clone();
assert_eq!(b, *blob);
}

let all = store.get_all(all_hashes.clone()).unwrap();
for (tx, blob) in all {
assert!(blobs.contains(&(tx, blob)), "missing blob {tx:?}");
assert!(blobs.contains(&(tx, (*blob).clone())), "missing blob {tx:?}");
}

assert!(store.contains(all_hashes[0]).unwrap());
Expand Down
21 changes: 11 additions & 10 deletions crates/transaction-pool/src/blobstore/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct InMemoryBlobStore {
#[derive(Debug, Default)]
struct InMemoryBlobStoreInner {
/// Storage for all blob data.
store: RwLock<HashMap<B256, BlobTransactionSidecar>>,
store: RwLock<HashMap<B256, Arc<BlobTransactionSidecar>>>,
size_tracker: BlobStoreSize,
}

Expand Down Expand Up @@ -75,7 +75,7 @@ impl BlobStore for InMemoryBlobStore {
}

// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(self.inner.store.read().get(&tx).cloned())
}

Expand All @@ -86,16 +86,17 @@ impl BlobStore for InMemoryBlobStore {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
let store = self.inner.store.read();
Ok(txs.into_iter().filter_map(|tx| store.get(&tx).map(|item| (tx, item.clone()))).collect())
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
let store = self.inner.store.read();
txs.into_iter()
.map(|tx| store.get(&tx).cloned().ok_or_else(|| BlobStoreError::MissingSidecar(tx)))
.collect()
Ok(txs.into_iter().filter_map(|tx| store.get(&tx).cloned()).collect())
}

fn get_by_versioned_hashes(
Expand Down Expand Up @@ -134,7 +135,7 @@ impl BlobStore for InMemoryBlobStore {

/// Removes the given blob from the store and returns the size of the blob that was removed.
#[inline]
fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) -> usize {
fn remove_size(store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>, tx: &B256) -> usize {
store.remove(tx).map(|rem| rem.size()).unwrap_or_default()
}

Expand All @@ -143,11 +144,11 @@ fn remove_size(store: &mut HashMap<B256, BlobTransactionSidecar>, tx: &B256) ->
/// We don't need to handle the size updates for replacements because transactions are unique.
#[inline]
fn insert_size(
store: &mut HashMap<B256, BlobTransactionSidecar>,
store: &mut HashMap<B256, Arc<BlobTransactionSidecar>>,
tx: B256,
blob: BlobTransactionSidecar,
) -> usize {
let add = blob.size();
store.insert(tx, blob);
store.insert(tx, Arc::new(blob));
add
}
12 changes: 8 additions & 4 deletions crates/transaction-pool/src/blobstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ pub use noop::NoopBlobStore;
use reth_primitives::BlobTransactionSidecar;
use std::{
fmt,
sync::atomic::{AtomicUsize, Ordering},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
pub use tracker::{BlobStoreCanonTracker, BlobStoreUpdates};

Expand Down Expand Up @@ -44,7 +47,7 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn cleanup(&self) -> BlobStoreCleanupStat;

/// Retrieves the decoded blob data for the given transaction hash.
fn get(&self, tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get(&self, tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Checks if the given transaction hash is in the blob store.
fn contains(&self, tx: B256) -> Result<bool, BlobStoreError>;
Expand All @@ -58,13 +61,14 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
fn get_all(
&self,
txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError>;

/// Returns the exact [`BlobTransactionSidecar`] for the given transaction hashes in the exact
/// order they were requested.
///
/// Returns an error if any of the blobs are not found in the blob store.
fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
fn get_exact(&self, txs: Vec<B256>)
-> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_by_versioned_hashes(
Expand Down
10 changes: 7 additions & 3 deletions crates/transaction-pool/src/blobstore/noop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobTransactionSidecar};
use alloy_eips::eip4844::BlobAndProofV1;
use alloy_primitives::B256;
use std::sync::Arc;

/// A blobstore implementation that does nothing
#[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -28,7 +29,7 @@ impl BlobStore for NoopBlobStore {
BlobStoreCleanupStat::default()
}

fn get(&self, _tx: B256) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get(&self, _tx: B256) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}

Expand All @@ -39,11 +40,14 @@ impl BlobStore for NoopBlobStore {
fn get_all(
&self,
_txs: Vec<B256>,
) -> Result<Vec<(B256, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(B256, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}

fn get_exact(&self, txs: Vec<B256>) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
fn get_exact(
&self,
txs: Vec<B256>,
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if txs.is_empty() {
return Ok(vec![])
}
Expand Down
9 changes: 6 additions & 3 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,21 +547,24 @@ where
self.pool.unique_senders()
}

fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(
&self,
tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get(tx_hash)
}

fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
self.pool.blob_store().get_all(tx_hashes)
}

fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
self.pool.blob_store().get_exact(tx_hashes)
}

Expand Down
3 changes: 2 additions & 1 deletion crates/transaction-pool/src/maintain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
.flatten()
.and_then(|sidecar| {
PooledTransactionsElementEcRecovered::try_from_blob_transaction(
tx, sidecar,
tx,
(*sidecar).clone(),
)
.ok()
})
Expand Down
9 changes: 6 additions & 3 deletions crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,24 @@ impl TransactionPool for NoopTransactionPool {
Default::default()
}

fn get_blob(&self, _tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError> {
fn get_blob(
&self,
_tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError> {
Ok(None)
}

fn get_all_blobs(
&self,
_tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError> {
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError> {
Ok(vec![])
}

fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError> {
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError> {
if tx_hashes.is_empty() {
return Ok(vec![])
}
Expand Down
2 changes: 1 addition & 1 deletion crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ where
/// Caution: this assumes the given transaction is eip-4844
fn get_blob_transaction(&self, transaction: TransactionSigned) -> Option<BlobTransaction> {
if let Ok(Some(sidecar)) = self.blob_store.get(transaction.hash()) {
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, sidecar) {
if let Ok(blob) = BlobTransaction::try_from_signed(transaction, (*sidecar).clone()) {
return Some(blob)
}
}
Expand Down
9 changes: 6 additions & 3 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,10 @@ pub trait TransactionPool: Send + Sync + Clone {

/// Returns the [BlobTransactionSidecar] for the given transaction hash if it exists in the blob
/// store.
fn get_blob(&self, tx_hash: TxHash) -> Result<Option<BlobTransactionSidecar>, BlobStoreError>;
fn get_blob(
&self,
tx_hash: TxHash,
) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Returns all [BlobTransactionSidecar] for the given transaction hashes if they exists in the
/// blob store.
Expand All @@ -441,7 +444,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_all_blobs(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<(TxHash, BlobTransactionSidecar)>, BlobStoreError>;
) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError>;

/// Returns the exact [BlobTransactionSidecar] for the given transaction hashes in the order
/// they were requested.
Expand All @@ -450,7 +453,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_all_blobs_exact(
&self,
tx_hashes: Vec<TxHash>,
) -> Result<Vec<BlobTransactionSidecar>, BlobStoreError>;
) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;

/// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
fn get_blobs_for_versioned_hashes(
Expand Down
Loading
Loading