Skip to content

Commit c9882b1

Browse files
authored
feat(l1,l2,replay): mempool max size parameter (#4639)
**Motivation** We want to make the mempool max size a parameter <!-- Why does this pull request exist? What are its goals? --> **Description** Adds the mempool max size as a parameter to ethrex command <!-- A clear and concise general description of the changes this PR introduces --> <!-- Link to issues: Resolves #111, Resolves #222 --> Closes #4600
1 parent cbcfb5d commit c9882b1

File tree

13 files changed

+119
-56
lines changed

13 files changed

+119
-56
lines changed

cmd/ethrex/bench/build_block_benchmark.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use criterion::{
1111
measurement::{Measurement, ValueFormatter},
1212
};
1313
use ethrex_blockchain::{
14-
Blockchain, BlockchainType,
14+
Blockchain, BlockchainOptions, BlockchainType,
1515
payload::{BuildPayloadArgs, PayloadBuildResult, create_payload},
1616
};
1717
use ethrex_common::{
@@ -236,8 +236,11 @@ pub fn build_block_benchmark(c: &mut Criterion<GasMeasurement>) {
236236
let (store_with_genesis, genesis) = setup_genesis(&addresses).await;
237237
let block_chain = Blockchain::new(
238238
store_with_genesis.clone(),
239-
BlockchainType::L1, // TODO: Should we support L2?
240-
false,
239+
BlockchainOptions {
240+
r#type: BlockchainType::L1, // TODO: Should we support L2?
241+
perf_logs_enabled: false,
242+
..Default::default()
243+
},
241244
);
242245
fill_mempool(&block_chain, accounts).await;
243246

cmd/ethrex/bench/import_blocks_benchmark.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use ethrex::{
33
cli::{import_blocks, remove_db},
44
utils::{default_datadir, init_datadir},
55
};
6-
use ethrex_blockchain::BlockchainType;
6+
use ethrex_blockchain::{BlockchainOptions, BlockchainType};
77
use ethrex_config::networks::Network;
88

99
#[inline]
@@ -23,7 +23,10 @@ fn block_import() {
2323
"../../fixtures/blockchain/l2-1k-erc20.rlp",
2424
&datadir,
2525
genesis,
26-
blockchain_type,
26+
BlockchainOptions {
27+
r#type: blockchain_type,
28+
..Default::default()
29+
},
2730
))
2831
.expect("Failed to import blocks on the Tokio runtime");
2932
}

cmd/ethrex/cli.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
};
77

88
use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand};
9-
use ethrex_blockchain::{BlockchainType, error::ChainError};
9+
use ethrex_blockchain::{BlockchainOptions, BlockchainType, error::ChainError};
1010
use ethrex_common::types::{Block, Genesis};
1111
use ethrex_p2p::sync::SyncMode;
1212
use ethrex_p2p::types::Node;
@@ -107,6 +107,14 @@ pub struct Options {
107107
long_help = "Possible values: info, debug, trace, warn, error",
108108
help_heading = "Node options")]
109109
pub log_level: Level,
110+
#[arg(
111+
help = "Maximum size of the mempool in number of transactions",
112+
long = "mempool.maxsize",
113+
default_value_t = 10_000,
114+
value_name = "MEMPOOL_MAX_SIZE",
115+
help_heading = "Node options"
116+
)]
117+
pub mempool_max_size: usize,
110118
#[arg(
111119
long = "http.addr",
112120
default_value = "0.0.0.0",
@@ -237,6 +245,7 @@ impl Default for Options {
237245
metrics_enabled: Default::default(),
238246
dev: Default::default(),
239247
force: false,
248+
mempool_max_size: Default::default(),
240249
extra_data: get_minimal_client_version(),
241250
}
242251
}
@@ -331,7 +340,17 @@ impl Subcommand {
331340
} else {
332341
BlockchainType::L1
333342
};
334-
import_blocks(&path, &opts.datadir, genesis, blockchain_type).await?;
343+
import_blocks(
344+
&path,
345+
&opts.datadir,
346+
genesis,
347+
BlockchainOptions {
348+
max_mempool_size: opts.mempool_max_size,
349+
r#type: blockchain_type,
350+
..Default::default()
351+
},
352+
)
353+
.await?;
335354
}
336355
Subcommand::Export { path, first, last } => {
337356
export_blocks(&path, &opts.datadir, first, last).await
@@ -378,12 +397,12 @@ pub async fn import_blocks(
378397
path: &str,
379398
datadir: &Path,
380399
genesis: Genesis,
381-
blockchain_type: BlockchainType,
400+
blockchain_opts: BlockchainOptions,
382401
) -> Result<(), ChainError> {
383402
let start_time = Instant::now();
384403
init_datadir(datadir);
385404
let store = init_store(datadir, genesis).await;
386-
let blockchain = init_blockchain(store.clone(), blockchain_type, false);
405+
let blockchain = init_blockchain(store.clone(), blockchain_opts);
387406
let path_metadata = metadata(path).expect("Failed to read path");
388407

389408
// If it's an .rlp file it will be just one chain, but if it's a directory there can be multiple chains.

cmd/ethrex/initializers.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
read_jwtsecret_file, read_node_config_file,
66
},
77
};
8-
use ethrex_blockchain::{Blockchain, BlockchainType};
8+
use ethrex_blockchain::{Blockchain, BlockchainOptions, BlockchainType};
99
use ethrex_common::types::Genesis;
1010
use ethrex_config::networks::Network;
1111

@@ -114,16 +114,12 @@ pub fn open_store(datadir: &Path) -> Store {
114114
}
115115
}
116116

117-
pub fn init_blockchain(
118-
store: Store,
119-
blockchain_type: BlockchainType,
120-
perf_logs_enabled: bool,
121-
) -> Arc<Blockchain> {
117+
pub fn init_blockchain(store: Store, blockchain_opts: BlockchainOptions) -> Arc<Blockchain> {
122118
#[cfg(feature = "revm")]
123119
info!("Initiating blockchain with revm");
124120
#[cfg(not(feature = "revm"))]
125121
info!("Initiating blockchain with levm");
126-
Blockchain::new(store, blockchain_type, perf_logs_enabled).into()
122+
Blockchain::new(store, blockchain_opts).into()
127123
}
128124

129125
#[allow(clippy::too_many_arguments)]
@@ -390,7 +386,14 @@ pub async fn init_l1(
390386
#[cfg(feature = "sync-test")]
391387
set_sync_block(&store).await;
392388

393-
let blockchain = init_blockchain(store.clone(), BlockchainType::L1, true);
389+
let blockchain = init_blockchain(
390+
store.clone(),
391+
BlockchainOptions {
392+
max_mempool_size: opts.mempool_max_size,
393+
perf_logs_enabled: true,
394+
r#type: BlockchainType::L1,
395+
},
396+
);
394397

395398
let signer = get_signer(datadir);
396399

cmd/ethrex/l2/initializers.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,13 @@ pub async fn init_l2(
170170
let store = init_store(&datadir, genesis).await;
171171
let rollup_store = init_rollup_store(&rollup_store_dir).await;
172172

173-
let blockchain = init_blockchain(store.clone(), BlockchainType::L2, true);
173+
let blockchain_opts = ethrex_blockchain::BlockchainOptions {
174+
max_mempool_size: opts.node_opts.mempool_max_size,
175+
r#type: BlockchainType::L2,
176+
perf_logs_enabled: true,
177+
};
178+
179+
let blockchain = init_blockchain(store.clone(), blockchain_opts);
174180

175181
let signer = get_signer(&datadir);
176182

cmd/ethrex_replay/src/cli.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88

99
use clap::{ArgGroup, Parser, Subcommand, ValueEnum};
1010
use ethrex_blockchain::{
11-
Blockchain, BlockchainType,
11+
Blockchain,
1212
fork_choice::apply_fork_choice,
1313
payload::{BuildPayloadArgs, PayloadBuildResult, create_payload},
1414
};
@@ -896,7 +896,10 @@ pub async fn replay_custom_l1_blocks(
896896
store_inner
897897
};
898898

899-
let blockchain = Arc::new(Blockchain::new(store.clone(), BlockchainType::L1, false));
899+
let blockchain = Arc::new(Blockchain::new(
900+
store.clone(),
901+
ethrex_blockchain::BlockchainOptions::default(),
902+
));
900903

901904
let blocks = produce_l1_blocks(
902905
blockchain.clone(),

crates/blockchain/blockchain.rs

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use ethrex_metrics::metrics_blocks::METRICS_BLOCKS;
4848
use ethrex_common::types::BlobsBundle;
4949

5050
const MAX_PAYLOADS: usize = 10;
51+
const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000;
5152

5253
//TODO: Implement a struct Chain or BlockChain to encapsulate
5354
//functionality and canonical chain state and config
@@ -67,14 +68,30 @@ pub struct Blockchain {
6768
/// This will be set to true once the initial sync has taken place and wont be set to false after
6869
/// This does not reflect whether there is an ongoing sync process
6970
is_synced: AtomicBool,
70-
/// Whether performance logs should be emitted
71-
pub perf_logs_enabled: bool,
72-
pub r#type: BlockchainType,
71+
pub options: BlockchainOptions,
7372
/// Mapping from a payload id to either a complete payload or a payload build task
7473
/// We need to keep completed payloads around in case consensus requests them twice
7574
pub payloads: Arc<TokioMutex<Vec<(u64, PayloadOrTask)>>>,
7675
}
7776

77+
#[derive(Debug, Clone)]
78+
pub struct BlockchainOptions {
79+
pub max_mempool_size: usize,
80+
/// Whether performance logs should be emitted
81+
pub perf_logs_enabled: bool,
82+
pub r#type: BlockchainType,
83+
}
84+
85+
impl Default for BlockchainOptions {
86+
fn default() -> Self {
87+
Self {
88+
max_mempool_size: MAX_MEMPOOL_SIZE_DEFAULT,
89+
perf_logs_enabled: false,
90+
r#type: BlockchainType::default(),
91+
}
92+
}
93+
}
94+
7895
#[derive(Debug, Clone)]
7996
pub struct BatchBlockProcessingFailure {
8097
pub last_valid_hash: H256,
@@ -94,25 +111,23 @@ fn log_batch_progress(batch_size: u32, current_block: u32) {
94111
}
95112

96113
impl Blockchain {
97-
pub fn new(store: Store, blockchain_type: BlockchainType, perf_logs_enabled: bool) -> Self {
114+
pub fn new(store: Store, blockchain_opts: BlockchainOptions) -> Self {
98115
Self {
99116
storage: store,
100-
mempool: Mempool::new(),
117+
mempool: Mempool::new(blockchain_opts.max_mempool_size),
101118
is_synced: AtomicBool::new(false),
102-
r#type: blockchain_type,
103119
payloads: Arc::new(TokioMutex::new(Vec::new())),
104-
perf_logs_enabled,
120+
options: blockchain_opts,
105121
}
106122
}
107123

108124
pub fn default_with_store(store: Store) -> Self {
109125
Self {
110126
storage: store,
111-
mempool: Mempool::new(),
127+
mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT),
112128
is_synced: AtomicBool::new(false),
113-
r#type: BlockchainType::default(),
114129
payloads: Arc::new(TokioMutex::new(Vec::new())),
115-
perf_logs_enabled: false,
130+
options: BlockchainOptions::default(),
116131
}
117132
}
118133

@@ -204,7 +219,7 @@ impl Blockchain {
204219
let vm_db: DynVmDatabase =
205220
Box::new(StoreVmDatabase::new(self.storage.clone(), parent_hash));
206221
let logger = Arc::new(DatabaseLogger::new(Arc::new(Mutex::new(Box::new(vm_db)))));
207-
let mut vm = match self.r#type {
222+
let mut vm = match self.options.r#type {
208223
BlockchainType::L1 => Evm::new_from_db_for_l1(logger.clone()),
209224
BlockchainType::L2 => Evm::new_from_db_for_l2(logger.clone()),
210225
};
@@ -422,7 +437,7 @@ impl Blockchain {
422437
let result = self.store_block(block, account_updates_list, res).await;
423438
let stored = Instant::now();
424439

425-
if self.perf_logs_enabled {
440+
if self.options.perf_logs_enabled {
426441
Self::print_add_block_logs(block, since, executed, merkleized, stored);
427442
}
428443
result
@@ -613,7 +628,7 @@ impl Blockchain {
613628
METRICS_BLOCKS.set_latest_gigagas(throughput);
614629
);
615630

616-
if self.perf_logs_enabled {
631+
if self.options.perf_logs_enabled {
617632
info!(
618633
"[METRICS] Executed and stored: Range: {}, Last block num: {}, Last block gas limit: {}, Total transactions: {}, Total Gas: {}, Throughput: {} Gigagas/s",
619634
blocks_len,
@@ -883,7 +898,7 @@ impl Blockchain {
883898
}
884899

885900
pub fn new_evm(&self, vm_db: StoreVmDatabase) -> Result<Evm, EvmError> {
886-
let evm = match self.r#type {
901+
let evm = match self.options.r#type {
887902
BlockchainType::L1 => Evm::new_for_l1(vm_db),
888903
BlockchainType::L2 => Evm::new_for_l2(vm_db)?,
889904
};

crates/blockchain/mempool.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,25 @@ use ethrex_storage::error::StoreError;
1919
use std::collections::HashSet;
2020
use tracing::warn;
2121

22-
// Max number of transactions in the mempool
23-
const MEMPOOL_MAX_SIZE: usize = 10000; //TODO: Define
24-
25-
// Max number of transactions to let the mempool order queue grow before pruning it
26-
const MEMPOOL_PRUNE_THRESHOLD: usize = MEMPOOL_MAX_SIZE + MEMPOOL_MAX_SIZE / 2;
27-
2822
#[derive(Debug, Default)]
2923
struct MempoolInner {
3024
broadcast_pool: HashSet<H256>,
3125
transaction_pool: HashMap<H256, MempoolTransaction>,
3226
blobs_bundle_pool: HashMap<H256, BlobsBundle>,
3327
txs_by_sender_nonce: BTreeMap<(H160, u64), H256>,
3428
txs_order: VecDeque<H256>,
29+
max_mempool_size: usize,
30+
// Max number of transactions to let the mempool order queue grow before pruning it
31+
mempool_prune_threshold: usize,
3532
}
3633

3734
impl MempoolInner {
38-
fn new() -> Self {
35+
fn new(max_mempool_size: usize) -> Self {
3936
MempoolInner {
40-
txs_order: VecDeque::with_capacity(MEMPOOL_MAX_SIZE * 2),
37+
txs_order: VecDeque::with_capacity(max_mempool_size * 2),
38+
transaction_pool: HashMap::with_capacity(max_mempool_size),
39+
max_mempool_size,
40+
mempool_prune_threshold: max_mempool_size + max_mempool_size / 2,
4141
..Default::default()
4242
}
4343
}
@@ -60,7 +60,7 @@ impl MempoolInner {
6060
/// Remove the oldest transaction in the pool
6161
fn remove_oldest_transaction(&mut self) -> Result<(), StoreError> {
6262
// Remove elements from the order queue until one is present in the pool
63-
while self.transaction_pool.len() >= MEMPOOL_MAX_SIZE {
63+
while self.transaction_pool.len() >= self.max_mempool_size {
6464
if let Some(oldest_hash) = self.txs_order.pop_front() {
6565
self.remove_transaction_with_lock(&oldest_hash)?;
6666
} else {
@@ -81,9 +81,9 @@ pub struct Mempool {
8181
}
8282

8383
impl Mempool {
84-
pub fn new() -> Self {
84+
pub fn new(max_mempool_size: usize) -> Self {
8585
Mempool {
86-
inner: RwLock::new(MempoolInner::new()),
86+
inner: RwLock::new(MempoolInner::new(max_mempool_size)),
8787
}
8888
}
8989

@@ -107,13 +107,13 @@ impl Mempool {
107107
) -> Result<(), StoreError> {
108108
let mut inner = self.write()?;
109109
// Prune the order queue if it has grown too much
110-
if inner.txs_order.len() > MEMPOOL_PRUNE_THRESHOLD {
110+
if inner.txs_order.len() > inner.mempool_prune_threshold {
111111
// NOTE: we do this to avoid borrow checker errors
112112
let txpool = core::mem::take(&mut inner.transaction_pool);
113113
inner.txs_order.retain(|tx| txpool.contains_key(tx));
114114
inner.transaction_pool = txpool;
115115
}
116-
if inner.transaction_pool.len() >= MEMPOOL_MAX_SIZE {
116+
if inner.transaction_pool.len() >= inner.max_mempool_size {
117117
inner.remove_oldest_transaction()?;
118118
}
119119
inner.txs_order.push_back(hash);
@@ -491,6 +491,8 @@ mod tests {
491491
use ethrex_storage::EngineType;
492492
use ethrex_storage::{Store, error::StoreError};
493493

494+
const MEMPOOL_MAX_SIZE_TEST: usize = 10_000;
495+
494496
async fn setup_storage(config: ChainConfig, header: BlockHeader) -> Result<Store, StoreError> {
495497
let store = Store::new("test", EngineType::InMemory)?;
496498
let block_number = header.number;
@@ -846,7 +848,7 @@ mod tests {
846848
let blob_tx = MempoolTransaction::new(blob_tx_decoded, blob_tx_sender);
847849
let plain_tx_hash = plain_tx.hash();
848850
let blob_tx_hash = blob_tx.hash();
849-
let mempool = Mempool::new();
851+
let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST);
850852
let filter =
851853
|tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) };
852854
mempool
@@ -861,7 +863,7 @@ mod tests {
861863
fn blobs_bundle_loadtest() {
862864
// Write a bundle of 6 blobs 10 times
863865
// If this test fails please adjust the max_size in the DB config
864-
let mempool = Mempool::new();
866+
let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST);
865867
for i in 0..300 {
866868
let blobs = [[i as u8; BYTES_PER_BLOB]; 6];
867869
let commitments = [[i as u8; 48]; 6];

0 commit comments

Comments
 (0)