diff --git a/Cargo.lock b/Cargo.lock index 84b660cd7375..95f8166ce2fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8235,10 +8235,13 @@ dependencies = [ name = "reth-optimism-node" version = "1.1.1" dependencies = [ + "alloy-consensus", "alloy-eips", "alloy-genesis", + "alloy-network", "alloy-primitives", "alloy-rpc-types-engine", + "alloy-signer-local", "clap", "eyre", "futures", @@ -8261,6 +8264,7 @@ dependencies = [ "reth-optimism-consensus", "reth-optimism-evm", "reth-optimism-forks", + "reth-optimism-node", "reth-optimism-payload-builder", "reth-optimism-rpc", "reth-payload-builder", diff --git a/crates/optimism/node/Cargo.toml b/crates/optimism/node/Cargo.toml index 07315a07f4ee..c1e23e3d5719 100644 --- a/crates/optimism/node/Cargo.toml +++ b/crates/optimism/node/Cargo.toml @@ -55,17 +55,23 @@ parking_lot.workspace = true # rpc serde_json.workspace = true +# test-utils dependencies +reth = { workspace = true, optional = true } +reth-e2e-test-utils = { workspace = true, optional = true } +alloy-genesis = { workspace = true, optional = true } +tokio = { workspace = true, optional = true } + [dev-dependencies] -reth.workspace = true +reth-optimism-node = { workspace = true, features = ["test-utils"] } reth-db.workspace = true -reth-e2e-test-utils.workspace = true reth-node-builder = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } reth-revm = { workspace = true, features = ["test-utils"] } -tokio.workspace = true alloy-primitives.workspace = true -alloy-genesis.workspace = true op-alloy-consensus.workspace = true +alloy-signer-local.workspace = true +alloy-network.workspace = true +alloy-consensus.workspace = true futures.workspace = true [features] @@ -79,15 +85,21 @@ optimism = [ "reth-optimism-rpc/optimism", "reth-engine-local/optimism", "reth-optimism-consensus/optimism", - "reth-db/optimism" + "reth-db/optimism", + "reth-optimism-node/optimism" ] asm-keccak = [ "reth-primitives/asm-keccak", "reth/asm-keccak", "alloy-primitives/asm-keccak", - "revm/asm-keccak" + "revm/asm-keccak", + "reth-optimism-node/asm-keccak" ] test-utils = [ + "reth", + "reth-e2e-test-utils", + "alloy-genesis", + "tokio", "reth-node-builder/test-utils", "reth-chainspec/test-utils", "reth-consensus/test-utils", @@ -100,5 +112,6 @@ test-utils = [ "reth-provider/test-utils", "reth-transaction-pool/test-utils", "reth-trie-db/test-utils", - "revm/test-utils" + "revm/test-utils", + "reth-optimism-node/test-utils" ] diff --git a/crates/optimism/node/src/lib.rs b/crates/optimism/node/src/lib.rs index 6419611067e4..7af0f3b8a722 100644 --- a/crates/optimism/node/src/lib.rs +++ b/crates/optimism/node/src/lib.rs @@ -22,6 +22,10 @@ pub use node::OpNode; pub mod txpool; +/// Helpers for running test node instances. +#[cfg(feature = "test-utils")] +pub mod utils; + pub use reth_optimism_payload_builder::{ OpBuiltPayload, OpPayloadBuilder, OpPayloadBuilderAttributes, }; diff --git a/crates/optimism/node/tests/e2e/utils.rs b/crates/optimism/node/src/utils.rs similarity index 79% rename from crates/optimism/node/tests/e2e/utils.rs rename to crates/optimism/node/src/utils.rs index c3b6acddc5a6..b54015fef0cc 100644 --- a/crates/optimism/node/tests/e2e/utils.rs +++ b/crates/optimism/node/src/utils.rs @@ -1,3 +1,4 @@ +use crate::{node::OpAddOns, OpBuiltPayload, OpNode as OtherOpNode, OpPayloadBuilderAttributes}; use alloy_genesis::Genesis; use alloy_primitives::{Address, B256}; use reth::{rpc::types::engine::PayloadAttributes, tasks::TaskManager}; @@ -5,9 +6,6 @@ use reth_e2e_test_utils::{ transaction::TransactionTestContext, wallet::Wallet, Adapter, NodeHelperType, }; use reth_optimism_chainspec::OpChainSpecBuilder; -use reth_optimism_node::{ - node::OpAddOns, OpBuiltPayload, OpNode as OtherOpNode, OpPayloadBuilderAttributes, -}; use reth_payload_builder::EthPayloadBuilderAttributes; use std::sync::Arc; use tokio::sync::Mutex; @@ -15,8 +13,10 @@ use tokio::sync::Mutex; /// Optimism Node Helper type pub(crate) type OpNode = NodeHelperType>>; -pub(crate) async fn setup(num_nodes: usize) -> eyre::Result<(Vec, TaskManager, Wallet)> { - let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap(); +/// Creates the initial setup with `num_nodes` of the node config, started and connected. +pub async fn setup(num_nodes: usize) -> eyre::Result<(Vec, TaskManager, Wallet)> { + let genesis: Genesis = + serde_json::from_str(include_str!("../tests/assets/genesis.json")).unwrap(); reth_e2e_test_utils::setup( num_nodes, Arc::new(OpChainSpecBuilder::base_mainnet().genesis(genesis).ecotone_activated().build()), @@ -27,7 +27,7 @@ pub(crate) async fn setup(num_nodes: usize) -> eyre::Result<(Vec, TaskMa } /// Advance the chain with sequential payloads returning them in the end. -pub(crate) async fn advance_chain( +pub async fn advance_chain( length: usize, node: &mut OpNode, wallet: Arc>, @@ -49,7 +49,7 @@ pub(crate) async fn advance_chain( } /// Helper function to create a new eth payload attributes -pub(crate) fn optimism_payload_attributes(timestamp: u64) -> OpPayloadBuilderAttributes { +pub fn optimism_payload_attributes(timestamp: u64) -> OpPayloadBuilderAttributes { let attributes = PayloadAttributes { timestamp, prev_randao: B256::ZERO, diff --git a/crates/optimism/node/tests/e2e/main.rs b/crates/optimism/node/tests/e2e/main.rs index 3438c766048b..7f4b22ba7e04 100644 --- a/crates/optimism/node/tests/e2e/main.rs +++ b/crates/optimism/node/tests/e2e/main.rs @@ -3,7 +3,4 @@ #[cfg(feature = "optimism")] mod p2p; -#[cfg(feature = "optimism")] -mod utils; - const fn main() {} diff --git a/crates/optimism/node/tests/e2e/p2p.rs b/crates/optimism/node/tests/e2e/p2p.rs index 6240b7814726..3db4cfab8698 100644 --- a/crates/optimism/node/tests/e2e/p2p.rs +++ b/crates/optimism/node/tests/e2e/p2p.rs @@ -1,7 +1,7 @@ -use crate::utils::{advance_chain, setup}; use alloy_rpc_types_engine::PayloadStatusEnum; use futures::StreamExt; use reth::blockchain_tree::error::BlockchainTreeError; +use reth_optimism_node::utils::{advance_chain, setup}; use std::sync::Arc; use tokio::sync::Mutex; diff --git a/crates/optimism/node/tests/it/main.rs b/crates/optimism/node/tests/it/main.rs index b84dd7426c24..d0533fc4541c 100644 --- a/crates/optimism/node/tests/it/main.rs +++ b/crates/optimism/node/tests/it/main.rs @@ -3,4 +3,7 @@ #[cfg(feature = "optimism")] mod builder; +#[cfg(feature = "optimism")] +mod priority; + const fn main() {} diff --git a/crates/optimism/node/tests/it/priority.rs b/crates/optimism/node/tests/it/priority.rs new file mode 100644 index 000000000000..52e3bef3d918 --- /dev/null +++ b/crates/optimism/node/tests/it/priority.rs @@ -0,0 +1,190 @@ +//! Node builder test that customizes priority of transactions in the block. + +use alloy_consensus::TxEip1559; +use alloy_genesis::Genesis; +use alloy_network::TxSignerSync; +use alloy_primitives::{Address, ChainId, TxKind}; +use reth::{args::DatadirArgs, tasks::TaskManager}; +use reth_chainspec::EthChainSpec; +use reth_db::test_utils::create_test_rw_db_with_path; +use reth_e2e_test_utils::{ + node::NodeTestContext, transaction::TransactionTestContext, wallet::Wallet, +}; +use reth_node_api::{FullNodeTypes, NodeTypesWithEngine}; +use reth_node_builder::{ + components::ComponentsBuilder, EngineNodeLauncher, NodeBuilder, NodeConfig, +}; +use reth_optimism_chainspec::{OpChainSpec, OpChainSpecBuilder}; +use reth_optimism_node::{ + args::RollupArgs, + node::{ + OpAddOns, OpConsensusBuilder, OpExecutorBuilder, OpNetworkBuilder, OpPayloadBuilder, + OpPoolBuilder, + }, + utils::optimism_payload_attributes, + OpEngineTypes, OpNode, +}; +use reth_optimism_payload_builder::builder::OpPayloadTransactions; +use reth_primitives::{SealedBlock, Transaction, TransactionSigned, TransactionSignedEcRecovered}; +use reth_provider::providers::BlockchainProvider2; +use reth_transaction_pool::{ + pool::{BestPayloadTransactions, PayloadTransactionsChain, PayloadTransactionsFixed}, + PayloadTransactions, +}; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[derive(Clone, Debug)] +struct CustomTxPriority { + chain_id: ChainId, +} + +impl OpPayloadTransactions for CustomTxPriority { + fn best_transactions( + &self, + pool: Pool, + attr: reth_transaction_pool::BestTransactionsAttributes, + ) -> impl PayloadTransactions + where + Pool: reth_transaction_pool::TransactionPool, + { + // Block composition: + // 1. Best transactions from the pool (up to 250k gas) + // 2. End-of-block transaction created by the node (up to 100k gas) + + // End of block transaction should send a 0-value transfer to a random address. + let sender = Wallet::default().inner; + let mut end_of_block_tx = TxEip1559 { + chain_id: self.chain_id, + nonce: 1, // it will be 2nd tx after L1 block info tx that uses the same sender + gas_limit: 21000, + max_fee_per_gas: 20e9 as u128, + to: TxKind::Call(Address::random()), + value: 0.try_into().unwrap(), + ..Default::default() + }; + let signature = sender.sign_transaction_sync(&mut end_of_block_tx).unwrap(); + let end_of_block_tx = TransactionSignedEcRecovered::from_signed_transaction( + TransactionSigned::from_transaction_and_signature( + Transaction::Eip1559(end_of_block_tx), + signature, + ), + sender.address(), + ); + + PayloadTransactionsChain::new( + BestPayloadTransactions::new(pool.best_transactions_with_attributes(attr)), + // Allow 250k gas for the transactions from the pool + Some(250_000), + PayloadTransactionsFixed::single(end_of_block_tx), + // Allow 100k gas for the end-of-block transaction + Some(100_000), + ) + } +} + +/// Builds the node with custom transaction priority service within default payload builder. +fn build_components( + chain_id: ChainId, +) -> ComponentsBuilder< + Node, + OpPoolBuilder, + OpPayloadBuilder, + OpNetworkBuilder, + OpExecutorBuilder, + OpConsensusBuilder, +> +where + Node: + FullNodeTypes>, +{ + let RollupArgs { disable_txpool_gossip, compute_pending_block, discovery_v4, .. } = + RollupArgs::default(); + ComponentsBuilder::default() + .node_types::() + .pool(OpPoolBuilder::default()) + .payload( + OpPayloadBuilder::new(compute_pending_block) + .with_transactions(CustomTxPriority { chain_id }), + ) + .network(OpNetworkBuilder { disable_txpool_gossip, disable_discovery_v4: !discovery_v4 }) + .executor(OpExecutorBuilder::default()) + .consensus(OpConsensusBuilder::default()) +} + +#[tokio::test] +async fn test_custom_block_priority_config() { + reth_tracing::init_test_tracing(); + + let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap(); + let chain_spec = + Arc::new(OpChainSpecBuilder::base_mainnet().genesis(genesis).ecotone_activated().build()); + + // This wallet is going to send: + // 1. L1 block info tx + // 2. End-of-block custom tx + let wallet = Arc::new(Mutex::new(Wallet::default().with_chain_id(chain_spec.chain().into()))); + + // Configure and launch the node. + let config = NodeConfig::new(chain_spec).with_datadir_args(DatadirArgs { + datadir: reth_db::test_utils::tempdir_path().into(), + ..Default::default() + }); + let db = create_test_rw_db_with_path( + config + .datadir + .datadir + .unwrap_or_chain_default(config.chain.chain(), config.datadir.clone()) + .db(), + ); + let tasks = TaskManager::current(); + let node_handle = NodeBuilder::new(config.clone()) + .with_database(db) + .with_types_and_provider::>() + .with_components(build_components(config.chain.chain_id())) + .with_add_ons(OpAddOns::default()) + .launch_with_fn(|builder| { + let launcher = EngineNodeLauncher::new( + tasks.executor(), + builder.config.datadir(), + Default::default(), + ); + builder.launch_with(launcher) + }) + .await + .expect("Failed to launch node"); + + // Advance the chain with a single block. + let block_payloads = NodeTestContext::new(node_handle.node, optimism_payload_attributes) + .await + .unwrap() + .advance(1, |_| { + let wallet = wallet.clone(); + Box::pin(async move { + let mut wallet = wallet.lock().await; + let tx_fut = TransactionTestContext::optimism_l1_block_info_tx( + wallet.chain_id, + wallet.inner.clone(), + // This doesn't matter in the current test (because it's only one block), + // but make sure you're not reusing the nonce from end-of-block tx + // if they have the same signer. + wallet.inner_nonce * 2, + ); + wallet.inner_nonce += 1; + tx_fut.await + }) + }) + .await + .unwrap(); + assert_eq!(block_payloads.len(), 1); + let (block_payload, _) = block_payloads.first().unwrap(); + let block_payload: SealedBlock = block_payload.block().clone(); + assert_eq!(block_payload.body.transactions.len(), 2); // L1 block info tx + end-of-block custom tx + + // Check that last transaction in the block looks like a transfer to a random address. + let end_of_block_tx = block_payload.body.transactions.last().unwrap(); + let end_of_block_tx = end_of_block_tx.transaction.as_eip1559().unwrap(); + assert_eq!(end_of_block_tx.nonce, 1); + assert_eq!(end_of_block_tx.gas_limit, 21_000); + assert!(end_of_block_tx.input.is_empty()); +} diff --git a/crates/optimism/payload/src/builder.rs b/crates/optimism/payload/src/builder.rs index dc6084f48813..42326de6ea49 100644 --- a/crates/optimism/payload/src/builder.rs +++ b/crates/optimism/payload/src/builder.rs @@ -2,7 +2,7 @@ use std::{fmt::Display, sync::Arc}; -use alloy_consensus::EMPTY_OMMER_ROOT_HASH; +use alloy_consensus::{Transaction, EMPTY_OMMER_ROOT_HASH}; use alloy_eips::merge::BEACON_NONCE; use alloy_primitives::{Address, Bytes, U256}; use alloy_rpc_types_engine::PayloadId; @@ -23,8 +23,7 @@ use reth_primitives::{ use reth_provider::{ProviderError, StateProviderFactory, StateRootProvider}; use reth_revm::database::StateProviderDatabase; use reth_transaction_pool::{ - noop::NoopTransactionPool, BestTransactions, BestTransactionsAttributes, BestTransactionsFor, - TransactionPool, + noop::NoopTransactionPool, BestTransactionsAttributes, PayloadTransactions, TransactionPool, }; use reth_trie::HashedPostState; use revm::{ @@ -39,6 +38,7 @@ use crate::{ payload::{OpBuiltPayload, OpPayloadBuilderAttributes}, }; use op_alloy_consensus::DepositTransaction; +use reth_transaction_pool::pool::BestPayloadTransactions; /// Optimism's payload builder #[derive(Debug, Clone, PartialEq, Eq)] @@ -390,7 +390,7 @@ where } } -/// A type that returns a the [`BestTransactions`] that should be included in the pool. +/// A type that returns a the [`PayloadTransactions`] that should be included in the pool. pub trait OpPayloadTransactions: Clone + Send + Sync + Unpin + 'static { /// Returns an iterator that yields the transaction in the order they should get included in the /// new payload. @@ -398,7 +398,7 @@ pub trait OpPayloadTransactions: Clone + Send + Sync + Unpin + 'static { &self, pool: Pool, attr: BestTransactionsAttributes, - ) -> BestTransactionsFor; + ) -> impl PayloadTransactions; } impl OpPayloadTransactions for () { @@ -406,8 +406,8 @@ impl OpPayloadTransactions for () { &self, pool: Pool, attr: BestTransactionsAttributes, - ) -> BestTransactionsFor { - pool.best_transactions_with_attributes(attr) + ) -> impl PayloadTransactions { + BestPayloadTransactions::new(pool.best_transactions_with_attributes(attr)) } } @@ -730,7 +730,7 @@ where &self, info: &mut ExecutionInfo, db: &mut State, - mut best_txs: BestTransactionsFor, + mut best_txs: impl PayloadTransactions, ) -> Result>, PayloadBuilderError> where DB: Database, @@ -746,19 +746,19 @@ where ); let mut evm = self.evm_config.evm_with_env(&mut *db, env); - while let Some(pool_tx) = best_txs.next() { + while let Some(tx) = best_txs.next(()) { // ensure we still have capacity for this transaction - if info.cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit { + if info.cumulative_gas_used + tx.gas_limit() > block_gas_limit { // we can't fit this transaction into the block, so we need to mark it as // invalid which also removes all dependent transaction from // the iterator before we can continue - best_txs.mark_invalid(&pool_tx); + best_txs.mark_invalid(tx.signer(), tx.nonce()); continue } // A sequencer's block should never contain blob or deposit transactions from the pool. - if pool_tx.is_eip4844() || pool_tx.tx_type() == TxType::Deposit as u8 { - best_txs.mark_invalid(&pool_tx); + if tx.is_eip4844() || tx.tx_type() == TxType::Deposit as u8 { + best_txs.mark_invalid(tx.signer(), tx.nonce()); continue } @@ -767,9 +767,6 @@ where return Ok(Some(BuildOutcomeKind::Cancelled)) } - // convert tx to a signed transaction - let tx = pool_tx.to_recovered_transaction(); - // Configure the environment for the tx. *evm.tx_mut() = self.evm_config.tx_env(tx.as_signed(), tx.signer()); @@ -785,7 +782,7 @@ where // if the transaction is invalid, we can skip it and all of its // descendants trace!(target: "payload_builder", %err, ?tx, "skipping invalid transaction and its descendants"); - best_txs.mark_invalid(&pool_tx); + best_txs.mark_invalid(tx.signer(), tx.nonce()); } continue @@ -819,7 +816,7 @@ where // update add to total fees let miner_fee = tx - .effective_tip_per_gas(Some(base_fee)) + .effective_tip_per_gas(base_fee) .expect("fee is always valid; execution succeeded"); info.total_fees += U256::from(miner_fee) * U256::from(gas_used); diff --git a/crates/transaction-pool/src/pool/best.rs b/crates/transaction-pool/src/pool/best.rs index 068ef9899538..17165611794e 100644 --- a/crates/transaction-pool/src/pool/best.rs +++ b/crates/transaction-pool/src/pool/best.rs @@ -1,10 +1,12 @@ use crate::{ identifier::{SenderId, TransactionId}, pool::pending::PendingTransaction, - PoolTransaction, TransactionOrdering, ValidPoolTransaction, + PayloadTransactions, PoolTransaction, TransactionOrdering, ValidPoolTransaction, }; +use alloy_consensus::Transaction; use alloy_primitives::Address; use core::fmt; +use reth_primitives::TransactionSignedEcRecovered; use std::{ collections::{BTreeMap, BTreeSet, HashSet, VecDeque}, sync::Arc, @@ -48,7 +50,7 @@ impl Iterator for BestTransactionsWithFees { fn next(&mut self) -> Option { // find the next transaction that satisfies the base fee loop { - let best = self.best.next()?; + let best = Iterator::next(&mut self.best)?; // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return // the transaction if best.transaction.max_fee_per_gas() >= self.base_fee as u128 && @@ -205,6 +207,49 @@ impl Iterator for BestTransactions { } } +/// Wrapper struct that allows to convert `BestTransactions` (used in tx pool) to +/// `PayloadTransactions` (used in block composition). +#[derive(Debug)] +pub struct BestPayloadTransactions +where + T: PoolTransaction>, + I: Iterator>>, +{ + invalid: HashSet
, + best: I, +} + +impl BestPayloadTransactions +where + T: PoolTransaction>, + I: Iterator>>, +{ + /// Create a new `BestPayloadTransactions` with the given iterator. + pub fn new(best: I) -> Self { + Self { invalid: Default::default(), best } + } +} + +impl PayloadTransactions for BestPayloadTransactions +where + T: PoolTransaction>, + I: Iterator>>, +{ + fn next(&mut self, _ctx: ()) -> Option { + loop { + let tx = self.best.next()?; + if self.invalid.contains(&tx.sender()) { + continue + } + return Some(tx.to_recovered_transaction()) + } + } + + fn mark_invalid(&mut self, sender: Address, _nonce: u64) { + self.invalid.insert(sender); + } +} + /// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the /// transactions of iter with predicate. /// @@ -350,6 +395,130 @@ where } } +/// An implementation of [`crate::traits::PayloadTransactions`] that yields +/// a pre-defined set of transactions. +/// +/// This is useful to put a sequencer-specified set of transactions into the block +/// and compose it with the rest of the transactions. +#[derive(Debug)] +pub struct PayloadTransactionsFixed { + transactions: Vec, + index: usize, +} + +impl PayloadTransactionsFixed { + /// Constructs a new [`PayloadTransactionsFixed`]. + pub fn new(transactions: Vec) -> Self { + Self { transactions, index: Default::default() } + } + + /// Constructs a new [`PayloadTransactionsFixed`] with a single transaction. + pub fn single(transaction: T) -> Self { + Self { transactions: vec![transaction], index: Default::default() } + } +} + +impl PayloadTransactions for PayloadTransactionsFixed { + fn next(&mut self, _ctx: ()) -> Option { + (self.index < self.transactions.len()).then(|| { + let tx = self.transactions[self.index].clone(); + self.index += 1; + tx + }) + } + + fn mark_invalid(&mut self, _sender: Address, _nonce: u64) {} +} + +/// Wrapper over [`crate::traits::PayloadTransactions`] that combines transactions from multiple +/// `PayloadTransactions` iterators and keeps track of the gas for both of iterators. +/// +/// We can't use [`Iterator::chain`], because: +/// (a) we need to propagate the `mark_invalid` and `no_updates` +/// (b) we need to keep track of the gas +/// +/// Notes that [`PayloadTransactionsChain`] fully drains the first iterator +/// before moving to the second one. +/// +/// If the `before` iterator has transactions that are not fitting into the block, +/// the after iterator will get propagated a `mark_invalid` call for each of them. +#[derive(Debug)] +pub struct PayloadTransactionsChain { + /// Iterator that will be used first + before: B, + /// Allowed gas for the transactions from `before` iterator. If `None`, no gas limit is + /// enforced. + before_max_gas: Option, + /// Gas used by the transactions from `before` iterator + before_gas: u64, + /// Iterator that will be used after `before` iterator + after: A, + /// Allowed gas for the transactions from `after` iterator. If `None`, no gas limit is + /// enforced. + after_max_gas: Option, + /// Gas used by the transactions from `after` iterator + after_gas: u64, +} + +impl PayloadTransactionsChain { + /// Constructs a new [`PayloadTransactionsChain`]. + pub fn new( + before: B, + before_max_gas: Option, + after: A, + after_max_gas: Option, + ) -> Self { + Self { + before, + before_max_gas, + before_gas: Default::default(), + after, + after_max_gas, + after_gas: Default::default(), + } + } +} + +impl PayloadTransactions for PayloadTransactionsChain +where + B: PayloadTransactions, + A: PayloadTransactions, +{ + fn next(&mut self, ctx: ()) -> Option { + while let Some(tx) = self.before.next(ctx) { + if let Some(before_max_gas) = self.before_max_gas { + if self.before_gas + tx.transaction.gas_limit() <= before_max_gas { + self.before_gas += tx.transaction.gas_limit(); + return Some(tx); + } + self.before.mark_invalid(tx.signer(), tx.transaction.nonce()); + self.after.mark_invalid(tx.signer(), tx.transaction.nonce()); + } else { + return Some(tx); + } + } + + while let Some(tx) = self.after.next(ctx) { + if let Some(after_max_gas) = self.after_max_gas { + if self.after_gas + tx.transaction.gas_limit() <= after_max_gas { + self.after_gas += tx.transaction.gas_limit(); + return Some(tx); + } + self.after.mark_invalid(tx.signer(), tx.transaction.nonce()); + } else { + return Some(tx); + } + } + + None + } + + fn mark_invalid(&mut self, sender: Address, nonce: u64) { + self.before.mark_invalid(sender, nonce); + self.after.mark_invalid(sender, nonce); + } +} + #[cfg(test)] mod tests { use super::*; @@ -428,9 +597,9 @@ mod tests { dyn crate::traits::BestTransactions>>, > = Box::new(pool.best()); - let tx = best.next().unwrap(); - best.mark_invalid(&tx); - assert!(best.next().is_none()); + let tx = Iterator::next(&mut best).unwrap(); + crate::traits::BestTransactions::mark_invalid(&mut *best, &tx); + assert!(Iterator::next(&mut best).is_none()); } #[test] @@ -737,4 +906,119 @@ mod tests { assert_eq!(tx.nonce() % 2, 0); } } + + #[test] + fn test_best_transactions_prioritized_senders() { + let mut pool = PendingPool::new(MockOrdering::default()); + let mut f = MockTransactionFactory::default(); + + // Add 5 plain transactions from different senders with increasing gas price + for gas_price in 0..5 { + let tx = MockTransaction::eip1559().with_gas_price(gas_price); + let valid_tx = f.validated(tx); + pool.add_transaction(Arc::new(valid_tx), 0); + } + + // Add another transaction with 0 gas price that's going to be prioritized by sender + let prioritized_tx = MockTransaction::eip1559().with_gas_price(0); + let valid_prioritized_tx = f.validated(prioritized_tx.clone()); + pool.add_transaction(Arc::new(valid_prioritized_tx), 0); + + let prioritized_senders = HashSet::from([prioritized_tx.sender()]); + let best = + BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best()); + + // Verify that the prioritized transaction is returned first + // and the rest are returned in the reverse order of gas price + let mut iter = best.into_iter(); + let top_of_block_tx = iter.next().unwrap(); + assert_eq!(top_of_block_tx.max_fee_per_gas(), 0); + assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender()); + for gas_price in (0..5).rev() { + assert_eq!(iter.next().unwrap().max_fee_per_gas(), gas_price); + } + + // TODO: Test that gas limits for prioritized transactions are respected + } + + #[test] + fn test_best_transactions_chained_iterators() { + let mut priority_pool = PendingPool::new(MockOrdering::default()); + let mut pool = PendingPool::new(MockOrdering::default()); + let mut f = MockTransactionFactory::default(); + + // Block composition + // === + // (1) up to 100 gas: custom top-of-block transaction + // (2) up to 100 gas: transactions from the priority pool + // (3) up to 200 gas: only transactions from address A + // (4) up to 200 gas: only transactions from address B + // (5) until block gas limit: all transactions from the main pool + + // Notes: + // - If prioritized addresses overlap, a single transaction will be prioritized twice and + // therefore use the per-segment gas limit twice. + // - Priority pool and main pool must synchronize between each other to make sure there are + // no conflicts for the same nonce. For example, in this scenario, pools can't reject + // transactions with seemingly incorrect nonces, because previous transactions might be in + // the other pool. + + let address_top_of_block = Address::random(); + let address_in_priority_pool = Address::random(); + let address_a = Address::random(); + let address_b = Address::random(); + let address_regular = Address::random(); + + // Add transactions to the main pool + { + let prioritized_tx_a = + MockTransaction::eip1559().with_gas_price(5).with_sender(address_a); + // without our custom logic, B would be prioritized over A due to gas price: + let prioritized_tx_b = + MockTransaction::eip1559().with_gas_price(10).with_sender(address_b); + let regular_tx = + MockTransaction::eip1559().with_gas_price(15).with_sender(address_regular); + pool.add_transaction(Arc::new(f.validated(prioritized_tx_a)), 0); + pool.add_transaction(Arc::new(f.validated(prioritized_tx_b)), 0); + pool.add_transaction(Arc::new(f.validated(regular_tx)), 0); + } + + // Add transactions to the priority pool + { + let prioritized_tx = + MockTransaction::eip1559().with_gas_price(0).with_sender(address_in_priority_pool); + let valid_prioritized_tx = f.validated(prioritized_tx); + priority_pool.add_transaction(Arc::new(valid_prioritized_tx), 0); + } + + let mut block = PayloadTransactionsChain::new( + PayloadTransactionsFixed::single( + MockTransaction::eip1559().with_sender(address_top_of_block).into(), + ), + Some(100), + PayloadTransactionsChain::new( + BestPayloadTransactions::new(priority_pool.best()), + Some(100), + BestPayloadTransactions::new(BestTransactionsWithPrioritizedSenders::new( + HashSet::from([address_a]), + 200, + BestTransactionsWithPrioritizedSenders::new( + HashSet::from([address_b]), + 200, + pool.best(), + ), + )), + None, + ), + None, + ); + + assert_eq!(block.next(()).unwrap().signer(), address_top_of_block); + assert_eq!(block.next(()).unwrap().signer(), address_in_priority_pool); + assert_eq!(block.next(()).unwrap().signer(), address_a); + assert_eq!(block.next(()).unwrap().signer(), address_b); + assert_eq!(block.next(()).unwrap().signer(), address_regular); + } + + // TODO: Same nonce test } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 0841c0d3d281..76b2490b12fa 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -106,7 +106,10 @@ use crate::{ traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind}, validate::ValidTransaction, }; -pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders}; +pub use best::{ + BestPayloadTransactions, BestTransactionFilter, BestTransactionsWithPrioritizedSenders, + PayloadTransactionsChain, PayloadTransactionsFixed, +}; pub use blob::{blob_tx_priority, fee_delta}; pub use events::{FullTransactionEvent, TransactionEvent}; pub use listener::{AllTransactionsEvents, TransactionEvents}; diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index aa99e7af615c..185c08c109a8 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1501,6 +1501,24 @@ impl Stream for NewSubpoolTransactionStream { } } +/// Iterator that returns transactions for the block building process in the order they should be +/// included in the block. +/// +/// Can include transactions from the pool and other sources (alternative pools, +/// sequencer-originated transactions, etc.). +pub trait PayloadTransactions { + /// Returns the next transaction to include in the block. + fn next( + &mut self, + // In the future, `ctx` can include access to state for block building purposes. + ctx: (), + ) -> Option; + + /// Exclude descendants of the transaction with given sender and nonce from the iterator, + /// because this transaction won't be included in the block. + fn mark_invalid(&mut self, sender: Address, nonce: u64); +} + #[cfg(test)] mod tests { use super::*;