Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mempool): minor mempool improvements #3113

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions core/lib/mempool/src/mempool_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{hash_map, BTreeSet, HashMap, HashSet};
use std::collections::{hash_map, BTreeSet, HashMap};

use zksync_types::{
l1::L1Tx, l2::L2Tx, Address, ExecuteTransactionCommon, Nonce, PriorityOpId, Transaction,
Expand Down Expand Up @@ -221,22 +221,57 @@ impl MempoolStore {
}

fn gc(&mut self) -> Vec<Address> {
if self.size >= self.capacity {
let index: HashSet<_> = self
if self.size > self.capacity {
let mut transactions = std::mem::take(&mut self.l2_transactions_per_account);
let mut possibly_kept: Vec<_> = self
.l2_priority_queue
.iter()
.map(|pointer| pointer.account)
.rev()
.filter_map(|pointer| {
transactions
.remove(&pointer.account)
.map(|txs| (pointer.account, txs))
})
.collect();
let transactions = std::mem::take(&mut self.l2_transactions_per_account);
let (kept, drained) = transactions

let mut sum = 0;
let mut number_of_accounts_kept = 0;
for (_, txs) in &possibly_kept {
sum += txs.len();
if sum <= self.capacity as usize {
number_of_accounts_kept += 1;
} else {
break;
}
}
if number_of_accounts_kept == 0 && !possibly_kept.is_empty() {
tracing::warn!("mempool capacity is too low to handle txs from single account, consider increasing capacity");
// Keep at least one entry, otherwise mempool won't return any new L2 tx to process.
number_of_accounts_kept = 1;
}
let (kept, drained) = {
let mut drained: Vec<_> = transactions.into_keys().collect();
let also_drained = possibly_kept
.split_off(number_of_accounts_kept)
.into_iter()
perekopskiy marked this conversation as resolved.
Show resolved Hide resolved
.map(|(address, _)| address);
drained.extend(also_drained);

(possibly_kept, drained)
};

let l2_priority_queue = std::mem::take(&mut self.l2_priority_queue);
self.l2_priority_queue = l2_priority_queue
.into_iter()
.partition(|(address, _)| index.contains(address));
self.l2_transactions_per_account = kept;
.rev()
.take(number_of_accounts_kept)
.collect();
self.l2_transactions_per_account = kept.into_iter().collect();
self.size = self
.l2_transactions_per_account
.iter()
.fold(0, |agg, (_, tnxs)| agg + tnxs.len() as u64);
return drained.into_keys().collect();
.fold(0, |agg, (_, txs)| agg + txs.len() as u64);
return drained;
}
vec![]
}
Expand Down
50 changes: 36 additions & 14 deletions core/lib/mempool/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,32 +321,26 @@ fn stashed_accounts() {

#[test]
fn mempool_capacity() {
let mut mempool = MempoolStore::new(PriorityOpId(0), 5);
let mut mempool = MempoolStore::new(PriorityOpId(0), 4);
let account0 = Address::random();
let account1 = Address::random();
let account2 = Address::random();
let account3 = Address::random();
let transactions = vec![
gen_l2_tx(account0, Nonce(0)),
gen_l2_tx(account0, Nonce(1)),
gen_l2_tx(account0, Nonce(2)),
gen_l2_tx(account1, Nonce(1)),
gen_l2_tx(account2, Nonce(1)),
gen_l2_tx_with_timestamp(account1, Nonce(0), unix_timestamp_ms() + 1),
gen_l2_tx_with_timestamp(account2, Nonce(0), unix_timestamp_ms() + 2),
gen_l2_tx(account3, Nonce(1)),
];
mempool.insert(transactions, HashMap::new());
// the mempool is full. Accounts with non-sequential nonces got stashed
// Mempool is full. Accounts with non-sequential nonces and some accounts with lowest score should be purged.
assert_eq!(
HashSet::<_>::from_iter(mempool.get_mempool_info().purged_accounts),
HashSet::<_>::from_iter(vec![account1, account2]),
);
// verify that existing good-to-go transactions and new ones got picked
mempool.insert(
vec![gen_l2_tx_with_timestamp(
account1,
Nonce(0),
unix_timestamp_ms() + 1,
)],
HashMap::new(),
HashSet::from([account2, account3]),
);
// verify that good-to-go transactions are kept.
for _ in 0..3 {
assert_eq!(
mempool
Expand All @@ -363,6 +357,34 @@ fn mempool_capacity() {
.initiator_account(),
account1
);
assert!(!mempool.has_next(&L2TxFilter::default()));
}

#[test]
fn mempool_does_not_purge_all_accounts() {
let mut mempool = MempoolStore::new(PriorityOpId(0), 1);
let account0 = Address::random();
let account1 = Address::random();
let transactions = vec![
gen_l2_tx(account0, Nonce(0)),
gen_l2_tx(account0, Nonce(1)),
gen_l2_tx(account1, Nonce(1)),
];
mempool.insert(transactions, HashMap::new());
// Mempool is full. Account 1 has tx with non-sequential nonce so it should be purged.
// Txs from account 0 have sequential nonces but their number is greater than capacity; they should be kept.
assert_eq!(mempool.get_mempool_info().purged_accounts, vec![account1]);
// verify that good-to-go transactions are kept.
for _ in 0..2 {
assert_eq!(
mempool
.next_transaction(&L2TxFilter::default())
.unwrap()
.initiator_account(),
account0
);
}
assert!(!mempool.has_next(&L2TxFilter::default()));
}

fn gen_l2_tx(address: Address, nonce: Nonce) -> Transaction {
Expand Down
31 changes: 23 additions & 8 deletions core/node/state_keeper/src/mempool_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,35 @@ impl MempoolFetcher {
.await
.context("failed getting pending protocol version")?;

let l2_tx_filter = l2_tx_filter(
self.batch_fee_input_provider.as_ref(),
protocol_version.into(),
)
.await
.context("failed creating L2 transaction filter")?;
let (fee_per_gas, gas_per_pubdata) = if let Some(unsealed_batch) = storage
.blocks_dal()
.get_unsealed_l1_batch()
.await
.context("failed getting unsealed batch")?
{
let (fee_per_gas, gas_per_pubdata) = derive_base_fee_and_gas_per_pubdata(
unsealed_batch.fee_input,
protocol_version.into(),
);
(fee_per_gas, gas_per_pubdata as u32)
} else {
let filter = l2_tx_filter(
self.batch_fee_input_provider.as_ref(),
protocol_version.into(),
)
.await
.context("failed creating L2 transaction filter")?;
itegulov marked this conversation as resolved.
Show resolved Hide resolved

(filter.fee_per_gas, filter.gas_per_pubdata)
};

let transactions = storage
.transactions_dal()
.sync_mempool(
&mempool_info.stashed_accounts,
&mempool_info.purged_accounts,
l2_tx_filter.gas_per_pubdata,
l2_tx_filter.fee_per_gas,
gas_per_pubdata,
fee_per_gas,
self.sync_batch_size,
)
.await
Expand Down
Loading