Skip to content

Commit

Permalink
perf: use BTreeSet in FeeQueue (#641)
Browse files Browse the repository at this point in the history
This also increases FeeQueue MAX_QUEUE_SIZE and adds benchmarks for FeeQueue
  • Loading branch information
Yin Guanhao authored Apr 11, 2022
1 parent 0b1756d commit 07357c1
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 17 deletions.
1 change: 1 addition & 0 deletions crates/benches/benches/bench_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ criterion_main! {
benchmarks::init_db::init_db,
benchmarks::sudt::sudt,
benchmarks::smt::smt,
benchmarks::fee_queue::fee_queue,
}
182 changes: 182 additions & 0 deletions crates/benches/benches/benchmarks/fee_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
use criterion::{criterion_group, Bencher, Criterion};
use gw_common::{h256_ext::H256Ext, state::State, H256};
use gw_config::GenesisConfig;
use gw_generator::genesis::init_genesis;
use gw_mem_pool::fee::{
queue::FeeQueue,
types::{FeeEntry, FeeItem},
};
use gw_store::{
mem_pool_state::MemStore, state::state_db::StateContext, traits::chain_store::ChainStore, Store,
};
use gw_types::{
bytes::Bytes,
packed::{
AllowedTypeHash, L2BlockCommittedInfo, L2Transaction, RawL2Transaction, RollupConfig,
},
prelude::{Builder, Entity, Pack, PackVec, Unpack},
};

const MAX_QUEUE_SIZE: usize = 100_000;

fn bench_add_full(b: &mut Bencher) {
let mut queue = FeeQueue::new();

let store = Store::open_tmp().expect("open store");
setup_genesis(&store);
{
let db = store.begin_transaction();
let genesis = db.get_tip_block().expect("tip");
assert_eq!(genesis.raw().number().unpack(), 0);
let mut state = db.state_tree(StateContext::AttachBlock(1)).expect("state");

// create accounts
for i in 0..4 {
state.create_account(H256::from_u32(i)).unwrap();
}

db.commit().expect("commit");
}

for i in 0..(MAX_QUEUE_SIZE as u32) {
let entry1 = FeeEntry {
item: FeeItem::Tx(
L2Transaction::new_builder()
.raw(RawL2Transaction::new_builder().nonce(i.pack()).build())
.build(),
),
fee: 100 * 1000,
cycles_limit: 1000,
sender: 2,
order: queue.len(),
};
queue.add(entry1);
}

assert_eq!(queue.len(), MAX_QUEUE_SIZE);

b.iter(|| {
let entry1 = FeeEntry {
item: FeeItem::Tx(
L2Transaction::new_builder()
.raw(
RawL2Transaction::new_builder()
.nonce(10001u32.pack())
.build(),
)
.build(),
),
fee: 100 * 1000,
cycles_limit: 1000,
sender: 2,
order: queue.len(),
};
queue.add(entry1);
});
}

fn bench_add_fetch_20(b: &mut Bencher) {
let mut queue = FeeQueue::new();

let store = Store::open_tmp().expect("open store");
setup_genesis(&store);
{
let db = store.begin_transaction();
let genesis = db.get_tip_block().expect("tip");
assert_eq!(genesis.raw().number().unpack(), 0);
let mut state = db.state_tree(StateContext::AttachBlock(1)).expect("state");

// create accounts
for i in 0..4 {
state.create_account(H256::from_u32(i)).unwrap();
}

db.commit().expect("commit");
}
let snap = store.get_snapshot();

for i in 0..(MAX_QUEUE_SIZE as u32) - 100 {
let entry1 = FeeEntry {
item: FeeItem::Tx(
L2Transaction::new_builder()
.raw(RawL2Transaction::new_builder().nonce(i.pack()).build())
.build(),
),
fee: 100 * 1000,
cycles_limit: 1000,
sender: 2,
order: queue.len(),
};
queue.add(entry1);
}

let mem_store = MemStore::new(snap);
let tree = mem_store.state().unwrap();

b.iter(|| {
for i in 0..20 {
let entry1 = FeeEntry {
item: FeeItem::Tx(
L2Transaction::new_builder()
.raw(
RawL2Transaction::new_builder()
.nonce((MAX_QUEUE_SIZE + i).pack())
.build(),
)
.build(),
),
fee: 100 * 1000,
cycles_limit: 1000,
sender: 2,
order: queue.len(),
};
queue.add(entry1);
}
queue.fetch(&tree, 20)
});
}

pub fn bench(c: &mut Criterion) {
c.bench_function("FeeQueue add when full", |b| {
bench_add_full(b);
});
c.bench_function("FeeQueue add and fetch 20", |b| {
bench_add_fetch_20(b);
});
}

criterion_group! {
name = fee_queue;
config = Criterion::default().sample_size(10);
targets = bench
}

const ALWAYS_SUCCESS_CODE_HASH: [u8; 32] = [42u8; 32];

fn setup_genesis(store: &Store) {
let rollup_type_hash = H256::from_u32(42);
let rollup_config = RollupConfig::new_builder()
.allowed_eoa_type_hashes(
vec![AllowedTypeHash::from_unknown(ALWAYS_SUCCESS_CODE_HASH)].pack(),
)
.finality_blocks(0.pack())
.build();
let genesis_config = GenesisConfig {
timestamp: 0,
meta_contract_validator_type_hash: Default::default(),
rollup_config: rollup_config.into(),
rollup_type_hash: {
let h: [u8; 32] = rollup_type_hash.into();
h.into()
},
secp_data_dep: Default::default(),
};
let genesis_committed_info = L2BlockCommittedInfo::default();
init_genesis(
store,
&genesis_config,
genesis_committed_info,
Bytes::default(),
)
.unwrap();
}
1 change: 1 addition & 0 deletions crates/benches/benches/benchmarks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod fee_queue;
pub mod init_db;
pub mod smt;
pub mod sudt;
38 changes: 22 additions & 16 deletions crates/mem-pool/src/fee/queue.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::Result;
use gw_common::state::State;
use std::collections::{BinaryHeap, HashMap};
use std::collections::{BTreeSet, HashMap};
use tracing::instrument;

/// Max queue size
const MAX_QUEUE_SIZE: usize = 10000;
const MAX_QUEUE_SIZE: usize = 100_000;
/// Drop size when queue is full
const DROP_SIZE: usize = 100;

Expand All @@ -13,20 +13,23 @@ use super::types::FeeEntry;
/// Txs & withdrawals queue sorted by fee rate
pub struct FeeQueue {
// priority queue to store tx and withdrawal
queue: BinaryHeap<FeeEntry>,
queue: BTreeSet<FeeEntry>,
}

impl FeeQueue {
#[inline]
pub fn new() -> Self {
Self {
queue: BinaryHeap::with_capacity(MAX_QUEUE_SIZE + DROP_SIZE),
queue: BTreeSet::new(),
}
}

#[inline]
pub fn len(&self) -> usize {
self.queue.len()
}

#[inline]
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
Expand All @@ -41,32 +44,34 @@ impl FeeQueue {
entry.item.kind(),
hex::encode(entry.item.hash().as_slice())
);
self.queue.push(entry);
self.queue.insert(entry);

// drop items if full
if self.is_full() {
let mut new_queue = BinaryHeap::with_capacity(MAX_QUEUE_SIZE + DROP_SIZE);
let expected_len = self.queue.len().saturating_sub(DROP_SIZE);
while let Some(entry) = self.queue.pop() {
new_queue.push(entry);
if new_queue.len() >= expected_len {
break;
}
if let Some(first_to_keep) = self.queue.iter().nth(DROP_SIZE + 1).cloned() {
self.queue = self.queue.split_off(&first_to_keep);
}
self.queue = new_queue;
log::debug!(
"QueueLen: {} | Fee queue is full, drop {} items, new size: {}",
"QueueLen: {} | Fee queue is full, drop {} items",
self.len(),
DROP_SIZE,
expected_len
);
}
}

#[inline]
pub fn is_full(&self) -> bool {
self.queue.len() > MAX_QUEUE_SIZE
}

fn pop_last(&mut self) -> Option<FeeEntry> {
if let Some(entry) = self.queue.iter().next_back().cloned() {
self.queue.take(&entry)
} else {
None
}
}

/// Fetch items by fee sort
#[instrument(skip_all, fields(count = count))]
pub fn fetch(&mut self, state: &impl State, count: usize) -> Result<Vec<FeeEntry>> {
Expand All @@ -77,7 +82,7 @@ impl FeeQueue {
let mut future_queue = Vec::default();

// Fetch item from PQ
while let Some(entry) = self.queue.pop() {
while let Some(entry) = self.pop_last() {
let nonce = match fetched_senders.get(&entry.sender) {
Some(&nonce) => nonce,
None => state.get_nonce(entry.sender)?,
Expand Down Expand Up @@ -138,6 +143,7 @@ impl FeeQueue {
}

impl Default for FeeQueue {
#[inline]
fn default() -> Self {
Self::new()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/mem-pool/src/fee/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl PartialOrd for FeeItem {
}
}

#[derive(PartialEq, Eq)]
#[derive(PartialEq, Eq, Clone)]
pub struct FeeEntry {
/// item: tx or withdrawal
pub item: FeeItem,
Expand Down

0 comments on commit 07357c1

Please sign in to comment.