diff --git a/cmd/ethrex/bench/build_block_benchmark.rs b/cmd/ethrex/bench/build_block_benchmark.rs index 7600983047..52470d4558 100644 --- a/cmd/ethrex/bench/build_block_benchmark.rs +++ b/cmd/ethrex/bench/build_block_benchmark.rs @@ -11,7 +11,7 @@ use criterion::{ measurement::{Measurement, ValueFormatter}, }; use ethrex_blockchain::{ - Blockchain, BlockchainType, + Blockchain, BlockchainOptions, BlockchainType, payload::{BuildPayloadArgs, PayloadBuildResult, create_payload}, }; use ethrex_common::{ @@ -236,8 +236,11 @@ pub fn build_block_benchmark(c: &mut Criterion) { let (store_with_genesis, genesis) = setup_genesis(&addresses).await; let block_chain = Blockchain::new( store_with_genesis.clone(), - BlockchainType::L1, // TODO: Should we support L2? - false, + BlockchainOptions { + r#type: BlockchainType::L1, // TODO: Should we support L2? + perf_logs_enabled: false, + ..Default::default() + }, ); fill_mempool(&block_chain, accounts).await; diff --git a/cmd/ethrex/bench/import_blocks_benchmark.rs b/cmd/ethrex/bench/import_blocks_benchmark.rs index 81c7f4e2a2..22451ae5f9 100644 --- a/cmd/ethrex/bench/import_blocks_benchmark.rs +++ b/cmd/ethrex/bench/import_blocks_benchmark.rs @@ -3,7 +3,7 @@ use ethrex::{ cli::{import_blocks, remove_db}, utils::{default_datadir, init_datadir}, }; -use ethrex_blockchain::BlockchainType; +use ethrex_blockchain::{BlockchainOptions, BlockchainType}; use ethrex_config::networks::Network; #[inline] @@ -23,7 +23,10 @@ fn block_import() { "../../fixtures/blockchain/l2-1k-erc20.rlp", &datadir, genesis, - blockchain_type, + BlockchainOptions { + r#type: blockchain_type, + ..Default::default() + }, )) .expect("Failed to import blocks on the Tokio runtime"); } diff --git a/cmd/ethrex/cli.rs b/cmd/ethrex/cli.rs index 302cd776bd..c2c3f82fae 100644 --- a/cmd/ethrex/cli.rs +++ b/cmd/ethrex/cli.rs @@ -6,7 +6,7 @@ use std::{ }; use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand}; -use ethrex_blockchain::{BlockchainType, error::ChainError}; +use ethrex_blockchain::{BlockchainOptions, BlockchainType, error::ChainError}; use ethrex_common::types::{Block, Genesis}; use ethrex_p2p::sync::SyncMode; use ethrex_p2p::types::Node; @@ -107,6 +107,14 @@ pub struct Options { long_help = "Possible values: info, debug, trace, warn, error", help_heading = "Node options")] pub log_level: Level, + #[arg( + help = "Maximum size of the mempool in number of transactions", + long = "mempool.maxsize", + default_value_t = 10_000, + value_name = "MEMPOOL_MAX_SIZE", + help_heading = "Node options" + )] + pub mempool_max_size: usize, #[arg( long = "http.addr", default_value = "0.0.0.0", @@ -237,6 +245,7 @@ impl Default for Options { metrics_enabled: Default::default(), dev: Default::default(), force: false, + mempool_max_size: Default::default(), extra_data: get_minimal_client_version(), } } @@ -331,7 +340,17 @@ impl Subcommand { } else { BlockchainType::L1 }; - import_blocks(&path, &opts.datadir, genesis, blockchain_type).await?; + import_blocks( + &path, + &opts.datadir, + genesis, + BlockchainOptions { + max_mempool_size: opts.mempool_max_size, + r#type: blockchain_type, + ..Default::default() + }, + ) + .await?; } Subcommand::Export { path, first, last } => { export_blocks(&path, &opts.datadir, first, last).await @@ -378,12 +397,12 @@ pub async fn import_blocks( path: &str, datadir: &Path, genesis: Genesis, - blockchain_type: BlockchainType, + blockchain_opts: BlockchainOptions, ) -> Result<(), ChainError> { let start_time = Instant::now(); init_datadir(datadir); let store = init_store(datadir, genesis).await; - let blockchain = init_blockchain(store.clone(), blockchain_type, false); + let blockchain = init_blockchain(store.clone(), blockchain_opts); let path_metadata = metadata(path).expect("Failed to read path"); // If it's an .rlp file it will be just one chain, but if it's a directory there can be multiple chains. diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 51475fc058..e0d330ff70 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -5,7 +5,7 @@ use crate::{ read_jwtsecret_file, read_node_config_file, }, }; -use ethrex_blockchain::{Blockchain, BlockchainType}; +use ethrex_blockchain::{Blockchain, BlockchainOptions, BlockchainType}; use ethrex_common::types::Genesis; use ethrex_config::networks::Network; @@ -114,16 +114,12 @@ pub fn open_store(datadir: &Path) -> Store { } } -pub fn init_blockchain( - store: Store, - blockchain_type: BlockchainType, - perf_logs_enabled: bool, -) -> Arc { +pub fn init_blockchain(store: Store, blockchain_opts: BlockchainOptions) -> Arc { #[cfg(feature = "revm")] info!("Initiating blockchain with revm"); #[cfg(not(feature = "revm"))] info!("Initiating blockchain with levm"); - Blockchain::new(store, blockchain_type, perf_logs_enabled).into() + Blockchain::new(store, blockchain_opts).into() } #[allow(clippy::too_many_arguments)] @@ -390,7 +386,14 @@ pub async fn init_l1( #[cfg(feature = "sync-test")] set_sync_block(&store).await; - let blockchain = init_blockchain(store.clone(), BlockchainType::L1, true); + let blockchain = init_blockchain( + store.clone(), + BlockchainOptions { + max_mempool_size: opts.mempool_max_size, + perf_logs_enabled: true, + r#type: BlockchainType::L1, + }, + ); let signer = get_signer(datadir); diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index 3fb7b489c8..8d256352bd 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -170,7 +170,13 @@ pub async fn init_l2( let store = init_store(&datadir, genesis).await; let rollup_store = init_rollup_store(&rollup_store_dir).await; - let blockchain = init_blockchain(store.clone(), BlockchainType::L2, true); + let blockchain_opts = ethrex_blockchain::BlockchainOptions { + max_mempool_size: opts.node_opts.mempool_max_size, + r#type: BlockchainType::L2, + perf_logs_enabled: true, + }; + + let blockchain = init_blockchain(store.clone(), blockchain_opts); let signer = get_signer(&datadir); diff --git a/cmd/ethrex_replay/src/cli.rs b/cmd/ethrex_replay/src/cli.rs index b34582444c..8cf4ec743a 100644 --- a/cmd/ethrex_replay/src/cli.rs +++ b/cmd/ethrex_replay/src/cli.rs @@ -8,7 +8,7 @@ use std::{ use clap::{ArgGroup, Parser, Subcommand, ValueEnum}; use ethrex_blockchain::{ - Blockchain, BlockchainType, + Blockchain, fork_choice::apply_fork_choice, payload::{BuildPayloadArgs, PayloadBuildResult, create_payload}, }; @@ -896,7 +896,10 @@ pub async fn replay_custom_l1_blocks( store_inner }; - let blockchain = Arc::new(Blockchain::new(store.clone(), BlockchainType::L1, false)); + let blockchain = Arc::new(Blockchain::new( + store.clone(), + ethrex_blockchain::BlockchainOptions::default(), + )); let blocks = produce_l1_blocks( blockchain.clone(), diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 02101bae70..5782369849 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -48,6 +48,7 @@ use ethrex_metrics::metrics_blocks::METRICS_BLOCKS; use ethrex_common::types::BlobsBundle; const MAX_PAYLOADS: usize = 10; +const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000; //TODO: Implement a struct Chain or BlockChain to encapsulate //functionality and canonical chain state and config @@ -67,14 +68,30 @@ pub struct Blockchain { /// This will be set to true once the initial sync has taken place and wont be set to false after /// This does not reflect whether there is an ongoing sync process is_synced: AtomicBool, - /// Whether performance logs should be emitted - pub perf_logs_enabled: bool, - pub r#type: BlockchainType, + pub options: BlockchainOptions, /// Mapping from a payload id to either a complete payload or a payload build task /// We need to keep completed payloads around in case consensus requests them twice pub payloads: Arc>>, } +#[derive(Debug, Clone)] +pub struct BlockchainOptions { + pub max_mempool_size: usize, + /// Whether performance logs should be emitted + pub perf_logs_enabled: bool, + pub r#type: BlockchainType, +} + +impl Default for BlockchainOptions { + fn default() -> Self { + Self { + max_mempool_size: MAX_MEMPOOL_SIZE_DEFAULT, + perf_logs_enabled: false, + r#type: BlockchainType::default(), + } + } +} + #[derive(Debug, Clone)] pub struct BatchBlockProcessingFailure { pub last_valid_hash: H256, @@ -94,25 +111,23 @@ fn log_batch_progress(batch_size: u32, current_block: u32) { } impl Blockchain { - pub fn new(store: Store, blockchain_type: BlockchainType, perf_logs_enabled: bool) -> Self { + pub fn new(store: Store, blockchain_opts: BlockchainOptions) -> Self { Self { storage: store, - mempool: Mempool::new(), + mempool: Mempool::new(blockchain_opts.max_mempool_size), is_synced: AtomicBool::new(false), - r#type: blockchain_type, payloads: Arc::new(TokioMutex::new(Vec::new())), - perf_logs_enabled, + options: blockchain_opts, } } pub fn default_with_store(store: Store) -> Self { Self { storage: store, - mempool: Mempool::new(), + mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT), is_synced: AtomicBool::new(false), - r#type: BlockchainType::default(), payloads: Arc::new(TokioMutex::new(Vec::new())), - perf_logs_enabled: false, + options: BlockchainOptions::default(), } } @@ -204,7 +219,7 @@ impl Blockchain { let vm_db: DynVmDatabase = Box::new(StoreVmDatabase::new(self.storage.clone(), parent_hash)); let logger = Arc::new(DatabaseLogger::new(Arc::new(Mutex::new(Box::new(vm_db))))); - let mut vm = match self.r#type { + let mut vm = match self.options.r#type { BlockchainType::L1 => Evm::new_from_db_for_l1(logger.clone()), BlockchainType::L2 => Evm::new_from_db_for_l2(logger.clone()), }; @@ -422,7 +437,7 @@ impl Blockchain { let result = self.store_block(block, account_updates_list, res).await; let stored = Instant::now(); - if self.perf_logs_enabled { + if self.options.perf_logs_enabled { Self::print_add_block_logs(block, since, executed, merkleized, stored); } result @@ -613,7 +628,7 @@ impl Blockchain { METRICS_BLOCKS.set_latest_gigagas(throughput); ); - if self.perf_logs_enabled { + if self.options.perf_logs_enabled { info!( "[METRICS] Executed and stored: Range: {}, Last block num: {}, Last block gas limit: {}, Total transactions: {}, Total Gas: {}, Throughput: {} Gigagas/s", blocks_len, @@ -883,7 +898,7 @@ impl Blockchain { } pub fn new_evm(&self, vm_db: StoreVmDatabase) -> Result { - let evm = match self.r#type { + let evm = match self.options.r#type { BlockchainType::L1 => Evm::new_for_l1(vm_db), BlockchainType::L2 => Evm::new_for_l2(vm_db)?, }; diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index 8bf9733725..285e83bed3 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -19,12 +19,6 @@ use ethrex_storage::error::StoreError; use std::collections::HashSet; use tracing::warn; -// Max number of transactions in the mempool -const MEMPOOL_MAX_SIZE: usize = 10000; //TODO: Define - -// Max number of transactions to let the mempool order queue grow before pruning it -const MEMPOOL_PRUNE_THRESHOLD: usize = MEMPOOL_MAX_SIZE + MEMPOOL_MAX_SIZE / 2; - #[derive(Debug, Default)] struct MempoolInner { broadcast_pool: HashSet, @@ -32,12 +26,18 @@ struct MempoolInner { blobs_bundle_pool: HashMap, txs_by_sender_nonce: BTreeMap<(H160, u64), H256>, txs_order: VecDeque, + max_mempool_size: usize, + // Max number of transactions to let the mempool order queue grow before pruning it + mempool_prune_threshold: usize, } impl MempoolInner { - fn new() -> Self { + fn new(max_mempool_size: usize) -> Self { MempoolInner { - txs_order: VecDeque::with_capacity(MEMPOOL_MAX_SIZE * 2), + txs_order: VecDeque::with_capacity(max_mempool_size * 2), + transaction_pool: HashMap::with_capacity(max_mempool_size), + max_mempool_size, + mempool_prune_threshold: max_mempool_size + max_mempool_size / 2, ..Default::default() } } @@ -60,7 +60,7 @@ impl MempoolInner { /// Remove the oldest transaction in the pool fn remove_oldest_transaction(&mut self) -> Result<(), StoreError> { // Remove elements from the order queue until one is present in the pool - while self.transaction_pool.len() >= MEMPOOL_MAX_SIZE { + while self.transaction_pool.len() >= self.max_mempool_size { if let Some(oldest_hash) = self.txs_order.pop_front() { self.remove_transaction_with_lock(&oldest_hash)?; } else { @@ -81,9 +81,9 @@ pub struct Mempool { } impl Mempool { - pub fn new() -> Self { + pub fn new(max_mempool_size: usize) -> Self { Mempool { - inner: RwLock::new(MempoolInner::new()), + inner: RwLock::new(MempoolInner::new(max_mempool_size)), } } @@ -107,13 +107,13 @@ impl Mempool { ) -> Result<(), StoreError> { let mut inner = self.write()?; // Prune the order queue if it has grown too much - if inner.txs_order.len() > MEMPOOL_PRUNE_THRESHOLD { + if inner.txs_order.len() > inner.mempool_prune_threshold { // NOTE: we do this to avoid borrow checker errors let txpool = core::mem::take(&mut inner.transaction_pool); inner.txs_order.retain(|tx| txpool.contains_key(tx)); inner.transaction_pool = txpool; } - if inner.transaction_pool.len() >= MEMPOOL_MAX_SIZE { + if inner.transaction_pool.len() >= inner.max_mempool_size { inner.remove_oldest_transaction()?; } inner.txs_order.push_back(hash); @@ -491,6 +491,8 @@ mod tests { use ethrex_storage::EngineType; use ethrex_storage::{Store, error::StoreError}; + const MEMPOOL_MAX_SIZE_TEST: usize = 10_000; + async fn setup_storage(config: ChainConfig, header: BlockHeader) -> Result { let store = Store::new("test", EngineType::InMemory)?; let block_number = header.number; @@ -846,7 +848,7 @@ mod tests { let blob_tx = MempoolTransaction::new(blob_tx_decoded, blob_tx_sender); let plain_tx_hash = plain_tx.hash(); let blob_tx_hash = blob_tx.hash(); - let mempool = Mempool::new(); + let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST); let filter = |tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) }; mempool @@ -861,7 +863,7 @@ mod tests { fn blobs_bundle_loadtest() { // Write a bundle of 6 blobs 10 times // If this test fails please adjust the max_size in the DB config - let mempool = Mempool::new(); + let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST); for i in 0..300 { let blobs = [[i as u8; BYTES_PER_BLOB]; 6]; let commitments = [[i as u8; 48]; 6]; diff --git a/crates/blockchain/payload.rs b/crates/blockchain/payload.rs index a2e3975382..da186ec699 100644 --- a/crates/blockchain/payload.rs +++ b/crates/blockchain/payload.rs @@ -389,9 +389,10 @@ impl Blockchain { debug!("Building payload"); let base_fee = payload.header.base_fee_per_gas.unwrap_or_default(); - let mut context = PayloadBuildContext::new(payload, &self.storage, self.r#type.clone())?; + let mut context = + PayloadBuildContext::new(payload, &self.storage, self.options.r#type.clone())?; - if let BlockchainType::L1 = self.r#type { + if let BlockchainType::L1 = self.options.r#type { self.apply_system_operations(&mut context)?; } self.apply_withdrawals(&mut context)?; diff --git a/crates/l2/sequencer/block_producer/payload_builder.rs b/crates/l2/sequencer/block_producer/payload_builder.rs index 984fcbfc09..8d1340b937 100644 --- a/crates/l2/sequencer/block_producer/payload_builder.rs +++ b/crates/l2/sequencer/block_producer/payload_builder.rs @@ -45,7 +45,7 @@ pub async fn build_payload( let gas_limit = payload.header.gas_limit; debug!("Building payload"); - let mut context = PayloadBuildContext::new(payload, store, blockchain.r#type.clone())?; + let mut context = PayloadBuildContext::new(payload, store, blockchain.options.r#type.clone())?; fill_transactions( blockchain.clone(), diff --git a/docs/CLI.md b/docs/CLI.md index 082e051c5a..d9d5788cec 100644 --- a/docs/CLI.md +++ b/docs/CLI.md @@ -57,6 +57,11 @@ Node options: [default: INFO] + --mempool.maxsize + Maximum size of the mempool in number of transactions + + [default: 10000] + P2P options: --bootnodes ... Comma separated enode URLs for P2P discovery bootstrap. diff --git a/tooling/ef_tests/blockchain/test_runner.rs b/tooling/ef_tests/blockchain/test_runner.rs index 1fbc2753e2..301b01d423 100644 --- a/tooling/ef_tests/blockchain/test_runner.rs +++ b/tooling/ef_tests/blockchain/test_runner.rs @@ -6,7 +6,7 @@ use crate::{ types::{BlockChainExpectedException, BlockExpectedException, BlockWithRLP, TestUnit}, }; use ethrex_blockchain::{ - Blockchain, BlockchainType, + Blockchain, BlockchainOptions, error::{ChainError, InvalidBlockError}, fork_choice::apply_fork_choice, }; @@ -105,7 +105,7 @@ pub async fn run_ef_test( check_prestate_against_db(test_key, test, &store); // Blockchain EF tests are meant for L1. - let blockchain = Blockchain::new(store.clone(), BlockchainType::L1, false); + let blockchain = Blockchain::new(store.clone(), BlockchainOptions::default()); // Early return if the exception is in the rlp decoding of the block for bf in &test.blocks { diff --git a/tooling/ef_tests/state_v2/src/modules/block_runner.rs b/tooling/ef_tests/state_v2/src/modules/block_runner.rs index dc63679a52..ed579f332f 100644 --- a/tooling/ef_tests/state_v2/src/modules/block_runner.rs +++ b/tooling/ef_tests/state_v2/src/modules/block_runner.rs @@ -1,6 +1,6 @@ use bytes::Bytes; +use ethrex_blockchain::Blockchain; use ethrex_blockchain::get_total_blob_gas; -use ethrex_blockchain::{Blockchain, BlockchainType}; use ethrex_common::constants::DEFAULT_REQUESTS_HASH; use ethrex_common::types::{ Block, BlockBody, BlockHeader, Fork, Receipt, Transaction, compute_receipts_root, @@ -144,7 +144,10 @@ pub async fn run_test(test: &Test, test_case: &TestCase) -> Result<(), RunnerErr // 3. Create Blockchain and add block. - let blockchain = Blockchain::new(store.clone(), BlockchainType::L1, false); + let blockchain = Blockchain::new( + store.clone(), + ethrex_blockchain::BlockchainOptions::default(), + ); let result = blockchain.add_block(&block).await;