Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions cmd/ethrex/bench/build_block_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use criterion::{
measurement::{Measurement, ValueFormatter},
};
use ethrex_blockchain::{
Blockchain, BlockchainType,
Blockchain, BlockchainOptions, BlockchainType,
payload::{BuildPayloadArgs, PayloadBuildResult, create_payload},
};
use ethrex_common::{
Expand Down Expand Up @@ -236,8 +236,11 @@ pub fn build_block_benchmark(c: &mut Criterion<GasMeasurement>) {
let (store_with_genesis, genesis) = setup_genesis(&addresses).await;
let block_chain = Blockchain::new(
store_with_genesis.clone(),
BlockchainType::L1, // TODO: Should we support L2?
false,
BlockchainOptions {
r#type: BlockchainType::L1, // TODO: Should we support L2?
perf_logs_enabled: false,
..Default::default()
},
);
fill_mempool(&block_chain, accounts).await;

Expand Down
7 changes: 5 additions & 2 deletions cmd/ethrex/bench/import_blocks_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use ethrex::{
cli::{import_blocks, remove_db},
utils::{default_datadir, init_datadir},
};
use ethrex_blockchain::BlockchainType;
use ethrex_blockchain::{BlockchainOptions, BlockchainType};
use ethrex_config::networks::Network;

#[inline]
Expand All @@ -23,7 +23,10 @@ fn block_import() {
"../../fixtures/blockchain/l2-1k-erc20.rlp",
&datadir,
genesis,
blockchain_type,
BlockchainOptions {
r#type: blockchain_type,
..Default::default()
},
))
.expect("Failed to import blocks on the Tokio runtime");
}
Expand Down
27 changes: 23 additions & 4 deletions cmd/ethrex/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};

use clap::{ArgAction, Parser as ClapParser, Subcommand as ClapSubcommand};
use ethrex_blockchain::{BlockchainType, error::ChainError};
use ethrex_blockchain::{BlockchainOptions, BlockchainType, error::ChainError};
use ethrex_common::types::{Block, Genesis};
use ethrex_p2p::sync::SyncMode;
use ethrex_p2p::types::Node;
Expand Down Expand Up @@ -107,6 +107,14 @@ pub struct Options {
long_help = "Possible values: info, debug, trace, warn, error",
help_heading = "Node options")]
pub log_level: Level,
#[arg(
help = "Maximum size of the mempool in number of transactions",
long = "mempool.maxsize",
default_value_t = 10_000,
value_name = "MEMPOOL_MAX_SIZE",
help_heading = "Node options"
)]
pub mempool_max_size: usize,
#[arg(
long = "http.addr",
default_value = "0.0.0.0",
Expand Down Expand Up @@ -237,6 +245,7 @@ impl Default for Options {
metrics_enabled: Default::default(),
dev: Default::default(),
force: false,
mempool_max_size: Default::default(),
extra_data: get_minimal_client_version(),
}
}
Expand Down Expand Up @@ -331,7 +340,17 @@ impl Subcommand {
} else {
BlockchainType::L1
};
import_blocks(&path, &opts.datadir, genesis, blockchain_type).await?;
import_blocks(
&path,
&opts.datadir,
genesis,
BlockchainOptions {
max_mempool_size: opts.mempool_max_size,
r#type: blockchain_type,
..Default::default()
},
)
.await?;
}
Subcommand::Export { path, first, last } => {
export_blocks(&path, &opts.datadir, first, last).await
Expand Down Expand Up @@ -378,12 +397,12 @@ pub async fn import_blocks(
path: &str,
datadir: &Path,
genesis: Genesis,
blockchain_type: BlockchainType,
blockchain_opts: BlockchainOptions,
) -> Result<(), ChainError> {
let start_time = Instant::now();
init_datadir(datadir);
let store = init_store(datadir, genesis).await;
let blockchain = init_blockchain(store.clone(), blockchain_type, false);
let blockchain = init_blockchain(store.clone(), blockchain_opts);
let path_metadata = metadata(path).expect("Failed to read path");

// If it's an .rlp file it will be just one chain, but if it's a directory there can be multiple chains.
Expand Down
19 changes: 11 additions & 8 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
read_jwtsecret_file, read_node_config_file,
},
};
use ethrex_blockchain::{Blockchain, BlockchainType};
use ethrex_blockchain::{Blockchain, BlockchainOptions, BlockchainType};
use ethrex_common::types::Genesis;
use ethrex_config::networks::Network;

Expand Down Expand Up @@ -114,16 +114,12 @@ pub fn open_store(datadir: &Path) -> Store {
}
}

pub fn init_blockchain(
store: Store,
blockchain_type: BlockchainType,
perf_logs_enabled: bool,
) -> Arc<Blockchain> {
pub fn init_blockchain(store: Store, blockchain_opts: BlockchainOptions) -> Arc<Blockchain> {
#[cfg(feature = "revm")]
info!("Initiating blockchain with revm");
#[cfg(not(feature = "revm"))]
info!("Initiating blockchain with levm");
Blockchain::new(store, blockchain_type, perf_logs_enabled).into()
Blockchain::new(store, blockchain_opts).into()
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -390,7 +386,14 @@ pub async fn init_l1(
#[cfg(feature = "sync-test")]
set_sync_block(&store).await;

let blockchain = init_blockchain(store.clone(), BlockchainType::L1, true);
let blockchain = init_blockchain(
store.clone(),
BlockchainOptions {
max_mempool_size: opts.mempool_max_size,
perf_logs_enabled: true,
r#type: BlockchainType::L1,
},
);

let signer = get_signer(datadir);

Expand Down
8 changes: 7 additions & 1 deletion cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,13 @@ pub async fn init_l2(
let store = init_store(&datadir, genesis).await;
let rollup_store = init_rollup_store(&rollup_store_dir).await;

let blockchain = init_blockchain(store.clone(), BlockchainType::L2, true);
let blockchain_opts = ethrex_blockchain::BlockchainOptions {
max_mempool_size: opts.node_opts.mempool_max_size,
r#type: BlockchainType::L2,
perf_logs_enabled: true,
};

let blockchain = init_blockchain(store.clone(), blockchain_opts);

let signer = get_signer(&datadir);

Expand Down
7 changes: 5 additions & 2 deletions cmd/ethrex_replay/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use clap::{ArgGroup, Parser, Subcommand, ValueEnum};
use ethrex_blockchain::{
Blockchain, BlockchainType,
Blockchain,
fork_choice::apply_fork_choice,
payload::{BuildPayloadArgs, PayloadBuildResult, create_payload},
};
Expand Down Expand Up @@ -896,7 +896,10 @@ pub async fn replay_custom_l1_blocks(
store_inner
};

let blockchain = Arc::new(Blockchain::new(store.clone(), BlockchainType::L1, false));
let blockchain = Arc::new(Blockchain::new(
store.clone(),
ethrex_blockchain::BlockchainOptions::default(),
));

let blocks = produce_l1_blocks(
blockchain.clone(),
Expand Down
43 changes: 29 additions & 14 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use ethrex_metrics::metrics_blocks::METRICS_BLOCKS;
use ethrex_common::types::BlobsBundle;

const MAX_PAYLOADS: usize = 10;
const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000;

//TODO: Implement a struct Chain or BlockChain to encapsulate
//functionality and canonical chain state and config
Expand All @@ -67,14 +68,30 @@ pub struct Blockchain {
/// This will be set to true once the initial sync has taken place and wont be set to false after
/// This does not reflect whether there is an ongoing sync process
is_synced: AtomicBool,
/// Whether performance logs should be emitted
pub perf_logs_enabled: bool,
pub r#type: BlockchainType,
pub options: BlockchainOptions,
/// Mapping from a payload id to either a complete payload or a payload build task
/// We need to keep completed payloads around in case consensus requests them twice
pub payloads: Arc<TokioMutex<Vec<(u64, PayloadOrTask)>>>,
}

#[derive(Debug, Clone)]
pub struct BlockchainOptions {
pub max_mempool_size: usize,
/// Whether performance logs should be emitted
pub perf_logs_enabled: bool,
pub r#type: BlockchainType,
}

impl Default for BlockchainOptions {
fn default() -> Self {
Self {
max_mempool_size: MAX_MEMPOOL_SIZE_DEFAULT,
perf_logs_enabled: false,
r#type: BlockchainType::default(),
}
}
}

#[derive(Debug, Clone)]
pub struct BatchBlockProcessingFailure {
pub last_valid_hash: H256,
Expand All @@ -94,25 +111,23 @@ fn log_batch_progress(batch_size: u32, current_block: u32) {
}

impl Blockchain {
pub fn new(store: Store, blockchain_type: BlockchainType, perf_logs_enabled: bool) -> Self {
pub fn new(store: Store, blockchain_opts: BlockchainOptions) -> Self {
Self {
storage: store,
mempool: Mempool::new(),
mempool: Mempool::new(blockchain_opts.max_mempool_size),
is_synced: AtomicBool::new(false),
r#type: blockchain_type,
payloads: Arc::new(TokioMutex::new(Vec::new())),
perf_logs_enabled,
options: blockchain_opts,
}
}

pub fn default_with_store(store: Store) -> Self {
Self {
storage: store,
mempool: Mempool::new(),
mempool: Mempool::new(MAX_MEMPOOL_SIZE_DEFAULT),
is_synced: AtomicBool::new(false),
r#type: BlockchainType::default(),
payloads: Arc::new(TokioMutex::new(Vec::new())),
perf_logs_enabled: false,
options: BlockchainOptions::default(),
}
}

Expand Down Expand Up @@ -204,7 +219,7 @@ impl Blockchain {
let vm_db: DynVmDatabase =
Box::new(StoreVmDatabase::new(self.storage.clone(), parent_hash));
let logger = Arc::new(DatabaseLogger::new(Arc::new(Mutex::new(Box::new(vm_db)))));
let mut vm = match self.r#type {
let mut vm = match self.options.r#type {
BlockchainType::L1 => Evm::new_from_db_for_l1(logger.clone()),
BlockchainType::L2 => Evm::new_from_db_for_l2(logger.clone()),
};
Expand Down Expand Up @@ -422,7 +437,7 @@ impl Blockchain {
let result = self.store_block(block, account_updates_list, res).await;
let stored = Instant::now();

if self.perf_logs_enabled {
if self.options.perf_logs_enabled {
Self::print_add_block_logs(block, since, executed, merkleized, stored);
}
result
Expand Down Expand Up @@ -613,7 +628,7 @@ impl Blockchain {
METRICS_BLOCKS.set_latest_gigagas(throughput);
);

if self.perf_logs_enabled {
if self.options.perf_logs_enabled {
info!(
"[METRICS] Executed and stored: Range: {}, Last block num: {}, Last block gas limit: {}, Total transactions: {}, Total Gas: {}, Throughput: {} Gigagas/s",
blocks_len,
Expand Down Expand Up @@ -883,7 +898,7 @@ impl Blockchain {
}

pub fn new_evm(&self, vm_db: StoreVmDatabase) -> Result<Evm, EvmError> {
let evm = match self.r#type {
let evm = match self.options.r#type {
BlockchainType::L1 => Evm::new_for_l1(vm_db),
BlockchainType::L2 => Evm::new_for_l2(vm_db)?,
};
Expand Down
32 changes: 17 additions & 15 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@ use ethrex_storage::error::StoreError;
use std::collections::HashSet;
use tracing::warn;

// Max number of transactions in the mempool
const MEMPOOL_MAX_SIZE: usize = 10000; //TODO: Define

// Max number of transactions to let the mempool order queue grow before pruning it
const MEMPOOL_PRUNE_THRESHOLD: usize = MEMPOOL_MAX_SIZE + MEMPOOL_MAX_SIZE / 2;

#[derive(Debug, Default)]
struct MempoolInner {
broadcast_pool: HashSet<H256>,
transaction_pool: HashMap<H256, MempoolTransaction>,
blobs_bundle_pool: HashMap<H256, BlobsBundle>,
txs_by_sender_nonce: BTreeMap<(H160, u64), H256>,
txs_order: VecDeque<H256>,
max_mempool_size: usize,
// Max number of transactions to let the mempool order queue grow before pruning it
mempool_prune_threshold: usize,
}

impl MempoolInner {
fn new() -> Self {
fn new(max_mempool_size: usize) -> Self {
MempoolInner {
txs_order: VecDeque::with_capacity(MEMPOOL_MAX_SIZE * 2),
txs_order: VecDeque::with_capacity(max_mempool_size * 2),
transaction_pool: HashMap::with_capacity(max_mempool_size),
max_mempool_size,
mempool_prune_threshold: max_mempool_size + max_mempool_size / 2,
..Default::default()
}
}
Expand All @@ -60,7 +60,7 @@ impl MempoolInner {
/// Remove the oldest transaction in the pool
fn remove_oldest_transaction(&mut self) -> Result<(), StoreError> {
// Remove elements from the order queue until one is present in the pool
while self.transaction_pool.len() >= MEMPOOL_MAX_SIZE {
while self.transaction_pool.len() >= self.max_mempool_size {
if let Some(oldest_hash) = self.txs_order.pop_front() {
self.remove_transaction_with_lock(&oldest_hash)?;
} else {
Expand All @@ -81,9 +81,9 @@ pub struct Mempool {
}

impl Mempool {
pub fn new() -> Self {
pub fn new(max_mempool_size: usize) -> Self {
Mempool {
inner: RwLock::new(MempoolInner::new()),
inner: RwLock::new(MempoolInner::new(max_mempool_size)),
}
}

Expand All @@ -107,13 +107,13 @@ impl Mempool {
) -> Result<(), StoreError> {
let mut inner = self.write()?;
// Prune the order queue if it has grown too much
if inner.txs_order.len() > MEMPOOL_PRUNE_THRESHOLD {
if inner.txs_order.len() > inner.mempool_prune_threshold {
// NOTE: we do this to avoid borrow checker errors
let txpool = core::mem::take(&mut inner.transaction_pool);
inner.txs_order.retain(|tx| txpool.contains_key(tx));
inner.transaction_pool = txpool;
}
if inner.transaction_pool.len() >= MEMPOOL_MAX_SIZE {
if inner.transaction_pool.len() >= inner.max_mempool_size {
inner.remove_oldest_transaction()?;
}
inner.txs_order.push_back(hash);
Expand Down Expand Up @@ -491,6 +491,8 @@ mod tests {
use ethrex_storage::EngineType;
use ethrex_storage::{Store, error::StoreError};

const MEMPOOL_MAX_SIZE_TEST: usize = 10_000;

async fn setup_storage(config: ChainConfig, header: BlockHeader) -> Result<Store, StoreError> {
let store = Store::new("test", EngineType::InMemory)?;
let block_number = header.number;
Expand Down Expand Up @@ -846,7 +848,7 @@ mod tests {
let blob_tx = MempoolTransaction::new(blob_tx_decoded, blob_tx_sender);
let plain_tx_hash = plain_tx.hash();
let blob_tx_hash = blob_tx.hash();
let mempool = Mempool::new();
let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST);
let filter =
|tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) };
mempool
Expand All @@ -861,7 +863,7 @@ mod tests {
fn blobs_bundle_loadtest() {
// Write a bundle of 6 blobs 10 times
// If this test fails please adjust the max_size in the DB config
let mempool = Mempool::new();
let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST);
for i in 0..300 {
let blobs = [[i as u8; BYTES_PER_BLOB]; 6];
let commitments = [[i as u8; 48]; 6];
Expand Down
Loading