From b156cb98494463c5951d1ff6e17f98e5102f4980 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 15 Sep 2023 17:13:44 +0200 Subject: [PATCH] feat: add blob transactions subpool (#4608) --- crates/transaction-pool/src/pool/blob.rs | 167 ++++++++++++++++++++ crates/transaction-pool/src/pool/mod.rs | 1 + crates/transaction-pool/src/pool/parked.rs | 2 +- crates/transaction-pool/src/pool/pending.rs | 2 +- crates/transaction-pool/src/pool/txpool.rs | 5 + 5 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 crates/transaction-pool/src/pool/blob.rs diff --git a/crates/transaction-pool/src/pool/blob.rs b/crates/transaction-pool/src/pool/blob.rs new file mode 100644 index 000000000000..4bda855e5d5e --- /dev/null +++ b/crates/transaction-pool/src/pool/blob.rs @@ -0,0 +1,167 @@ +#![allow(dead_code, unused)] +use crate::{ + identifier::TransactionId, pool::size::SizeTracker, PoolTransaction, ValidPoolTransaction, +}; +use std::{ + cmp::Ordering, + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; + +/// A set of __all__ validated blob transactions in the pool. +/// +/// The purpose of this pool is keep track of blob transactions that are either pending or queued +/// and to evict the worst blob transactions once the sub-pool is full. +/// +/// This expects that certain constraints are met: +/// - blob transactions are always gap less +pub(crate) struct BlobTransactions { + /// Keeps track of transactions inserted in the pool. + /// + /// This way we can determine when transactions were submitted to the pool. + submission_id: u64, + /// _All_ Transactions that are currently inside the pool grouped by their identifier. + by_id: BTreeMap>>, + /// _All_ transactions sorted by blob priority. + all: BTreeSet>, + /// Keeps track of the size of this pool. + /// + /// See also [`PoolTransaction::size`](crate::traits::PoolTransaction::size). + size_of: SizeTracker, +} + +// === impl BlobTransactions === + +impl BlobTransactions { + /// Adds a new transactions to the pending queue. + /// + /// # Panics + /// + /// - If the transaction is not a blob tx. + /// - If the transaction is already included. + pub(crate) fn add_transaction(&mut self, tx: Arc>) { + assert!(tx.is_eip4844(), "transaction is not a blob tx"); + let id = *tx.id(); + assert!( + !self.by_id.contains_key(&id), + "transaction already included {:?}", + self.by_id.contains_key(&id) + ); + let submission_id = self.next_id(); + + // keep track of size + self.size_of += tx.size(); + + self.by_id.insert(id, tx.clone()); + + let ord = BlobOrd { submission_id }; + let transaction = BlobTransaction { ord, transaction: tx }; + self.all.insert(transaction); + } + + /// Removes the transaction from the pool + pub(crate) fn remove_transaction( + &mut self, + id: &TransactionId, + ) -> Option>> { + // remove from queues + let tx = self.by_id.remove(id)?; + + // TODO: remove from ordered set + // self.best.remove(&tx); + + // keep track of size + self.size_of -= tx.transaction.size(); + + Some(tx) + } + + fn next_id(&mut self) -> u64 { + let id = self.submission_id; + self.submission_id = self.submission_id.wrapping_add(1); + id + } + + /// The reported size of all transactions in this pool. + pub(crate) fn size(&self) -> usize { + self.size_of.into() + } + + /// Number of transactions in the entire pool + pub(crate) fn len(&self) -> usize { + self.by_id.len() + } + + /// Returns `true` if the transaction with the given id is already included in this pool. + #[cfg(test)] + #[allow(unused)] + pub(crate) fn contains(&self, id: &TransactionId) -> bool { + self.by_id.contains_key(id) + } +} + +impl Default for BlobTransactions { + fn default() -> Self { + Self { + submission_id: 0, + by_id: Default::default(), + all: Default::default(), + size_of: Default::default(), + } + } +} + +/// A transaction that is ready to be included in a block. +struct BlobTransaction { + /// Actual blob transaction. + transaction: Arc>, + /// The value that determines the order of this transaction. + ord: BlobOrd, +} + +impl Eq for BlobTransaction {} + +impl PartialEq for BlobTransaction { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl PartialOrd for BlobTransaction { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for BlobTransaction { + fn cmp(&self, other: &Self) -> Ordering { + self.ord.cmp(&other.ord) + } +} + +#[derive(Debug)] +struct BlobOrd { + /// Identifier that tags when transaction was submitted in the pool. + pub(crate) submission_id: u64, + // TODO(mattsse): add ord values +} + +impl Eq for BlobOrd {} + +impl PartialEq for BlobOrd { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl PartialOrd for BlobOrd { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for BlobOrd { + fn cmp(&self, other: &Self) -> Ordering { + other.submission_id.cmp(&self.submission_id) + } +} diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 9786d3e9867a..aadca293a060 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -110,6 +110,7 @@ pub use listener::{AllTransactionsEvents, TransactionEvents}; use reth_rlp::Encodable; mod best; +mod blob; mod parked; pub(crate) mod pending; pub(crate) mod size; diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index e04070908d22..075f244f0edb 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -17,7 +17,7 @@ use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc}; pub(crate) struct ParkedPool { /// Keeps track of transactions inserted in the pool. /// - /// This way we can determine when transactions where submitted to the pool. + /// This way we can determine when transactions were submitted to the pool. submission_id: u64, /// _All_ Transactions that are currently inside the pool grouped by their identifier. by_id: FnvHashMap>, diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index 286b699b328a..8ccd96efa807 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -28,7 +28,7 @@ pub(crate) struct PendingPool { ordering: T, /// Keeps track of transactions inserted in the pool. /// - /// This way we can determine when transactions where submitted to the pool. + /// This way we can determine when transactions were submitted to the pool. submission_id: u64, /// _All_ Transactions that are currently inside the pool grouped by their identifier. by_id: BTreeMap>, diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index b760dac5d45c..51e6d7e54564 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -6,6 +6,7 @@ use crate::{ metrics::TxPoolMetrics, pool::{ best::BestTransactions, + blob::BlobTransactions, parked::{BasefeeOrd, ParkedPool, QueuedOrd}, pending::PendingPool, state::{SubPool, TxState}, @@ -86,6 +87,9 @@ pub struct TxPool { /// Holds all parked transactions that currently violate the dynamic fee requirement but could /// be moved to pending if the base fee changes in their favor (decreases) in future blocks. basefee_pool: ParkedPool>, + /// All blob transactions in the pool + #[allow(unused)] + blob_transactions: BlobTransactions, /// All transactions in the pool. all_transactions: AllTransactions, /// Transaction pool metrics @@ -102,6 +106,7 @@ impl TxPool { pending_pool: PendingPool::new(ordering), queued_pool: Default::default(), basefee_pool: Default::default(), + blob_transactions: Default::default(), all_transactions: AllTransactions::new(&config), config, metrics: Default::default(),