Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, VecDeque},
sync::RwLock,
};

Expand All @@ -17,23 +17,27 @@ use ethrex_common::{
};
use ethrex_storage::error::StoreError;
use std::collections::HashSet;
use tracing::warn;

// Max number of transactions in the mempool
const MEMPOOL_MAX_SIZE: usize = 10000; //TODO: Define

// Max number of transactions to let the mempool order queue grow before pruning it
const MEMPOOL_PRUNE_THRESHOLD: usize = MEMPOOL_MAX_SIZE + MEMPOOL_MAX_SIZE / 2;

#[derive(Debug, Default)]
struct MempoolInner {
broadcast_pool: HashSet<H256>,
transaction_pool: HashMap<H256, MempoolTransaction>,
blobs_bundle_pool: HashMap<H256, BlobsBundle>,
txs_by_sender_nonce: BTreeMap<(H160, u64), H256>,
txs_order: Vec<H256>,
txs_order: VecDeque<H256>,
}

impl MempoolInner {
fn new() -> Self {
MempoolInner {
txs_order: Vec::with_capacity(MEMPOOL_MAX_SIZE),
txs_order: VecDeque::with_capacity(MEMPOOL_MAX_SIZE * 2),
..Default::default()
}
}
Expand All @@ -48,16 +52,23 @@ impl MempoolInner {
self.txs_by_sender_nonce.remove(&(tx.sender(), tx.nonce()));
self.transaction_pool.remove(hash);
self.broadcast_pool.remove(hash);
self.txs_order.retain(|h| h != hash);
};

Ok(())
}

/// Remove the oldest transaction in the pool
fn remove_oldest_transaction(&mut self) -> Result<(), StoreError> {
if let Some(&oldest_hash) = self.txs_order.first() {
self.remove_transaction_with_lock(&oldest_hash)?;
// Remove elements from the order queue until one is present in the pool
while self.transaction_pool.len() >= MEMPOOL_MAX_SIZE {
if let Some(oldest_hash) = self.txs_order.pop_front() {
self.remove_transaction_with_lock(&oldest_hash)?;
} else {
warn!(
"Mempool is full but there are no transactions to remove, this should not happen and will make the mempool grow indefinitely"
);
break;
}
}

Ok(())
Expand Down Expand Up @@ -95,10 +106,17 @@ impl Mempool {
transaction: MempoolTransaction,
) -> Result<(), StoreError> {
let mut inner = self.write()?;
// Prune the order queue if it has grown too much
if inner.txs_order.len() > MEMPOOL_PRUNE_THRESHOLD {
// NOTE: we do this to avoid borrow checker errors
let txpool = core::mem::take(&mut inner.transaction_pool);
inner.txs_order.retain(|tx| txpool.contains_key(tx));
inner.transaction_pool = txpool;
}
if inner.transaction_pool.len() >= MEMPOOL_MAX_SIZE {
inner.remove_oldest_transaction()?;
}
inner.txs_order.push(hash);
inner.txs_order.push_back(hash);
inner
.txs_by_sender_nonce
.insert((transaction.sender(), transaction.nonce()), hash);
Expand Down
Loading