Skip to content

Commit

Permalink
fix: stop package more withdrawals if mem pool is recovering
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr committed May 25, 2022
1 parent 93f19bb commit f9896af
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 55 deletions.
102 changes: 53 additions & 49 deletions crates/mem-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ impl MemPool {
#[instrument(skip_all)]
pub async fn notify_new_tip(&mut self, new_tip: H256) -> Result<()> {
// reset pool state
self.reset(Some(self.current_tip.0), Some(new_tip)).await?;
if self.current_tip.0 != new_tip {
self.reset(Some(self.current_tip.0), Some(new_tip)).await?;
}
Ok(())
}

Expand Down Expand Up @@ -613,15 +615,51 @@ impl MemPool {
self.remove_unexecutables(&mut mem_state, &db).await?;

log::info!("[mem-pool] reset reinject txs: {} mem-block txs: {} reinject withdrawals: {} mem-block withdrawals: {}", reinject_txs.len(), mem_block_txs.len(), reinject_withdrawals.len(), mem_block_withdrawals.len());
// re-inject withdrawals
let withdrawals_iter = reinject_withdrawals
.into_iter()
.chain(mem_block_withdrawals);
// re-inject txs
let txs_iter = reinject_txs.into_iter().chain(mem_block_txs);

if self.node_mode != NodeMode::ReadOnly {
self.prepare_next_mem_block(&db, &mut mem_state, withdrawals_iter, txs_iter)
// re-inject txs
let txs = reinject_txs.into_iter().chain(mem_block_txs).collect();
let is_mem_pool_recovery = old_tip.is_none();

// re-inject withdrawals
let mut withdrawals: Vec<_> = reinject_withdrawals.into_iter().collect();
if is_mem_pool_recovery {
// recovery mem block withdrawals
withdrawals.extend(mem_block_withdrawals);
} else {
// packages more withdrawals
fn filter_withdrawals(
state: &MemStateTree<'_>,
withdrawal: &WithdrawalRequestExtra,
) -> bool {
let id = state
.get_account_id_by_script_hash(
&withdrawal.raw().account_script_hash().unpack(),
)
.expect("get id")
.expect("id exist");
let nonce = state.get_nonce(id).expect("get nonce");
let expected_nonce: u32 = withdrawal.raw().nonce().unpack();
// ignore withdrawal mismatch the nonce
nonce == expected_nonce
}
withdrawals.retain(|w| filter_withdrawals(&mem_state, w));

// package withdrawals
if withdrawals.len() < MAX_MEM_BLOCK_WITHDRAWALS {
for entry in self.pending().values() {
if let Some(withdrawal) = entry.withdrawals.first() {
if filter_withdrawals(&mem_state, withdrawal) {
withdrawals.push(withdrawal.clone());
}
if withdrawals.len() >= MAX_MEM_BLOCK_WITHDRAWALS {
break;
}
}
}
}
}

self.prepare_next_mem_block(&db, &mut mem_state, withdrawals, txs)
.await?;
}

Expand Down Expand Up @@ -681,21 +719,18 @@ impl MemPool {
}

/// Prepare for next mem block
#[instrument(skip_all, fields(withdrawals_count = withdrawals.size_hint().1, txs_count = txs.size_hint().1))]
async fn prepare_next_mem_block<
WithdrawalIter: Iterator<Item = WithdrawalRequestExtra>,
TxIter: Iterator<Item = L2Transaction> + Clone,
>(
#[instrument(skip_all, fields(withdrawals_count = withdrawals.len(), txs_count = txs.len()))]
async fn prepare_next_mem_block(
&mut self,
db: &StoreTransaction,
state: &mut MemStateTree<'_>,
withdrawals: WithdrawalIter,
txs: TxIter,
withdrawals: Vec<WithdrawalRequestExtra>,
txs: Vec<L2Transaction>,
) -> Result<()> {
// check order of inputs
{
let mut id_to_nonce: HashMap<u32, u32> = HashMap::default();
for tx in txs.clone() {
for tx in &txs {
let id: u32 = tx.raw().from_id().unpack();
let nonce: u32 = tx.raw().nonce().unpack();
if let Some(&prev_nonce) = id_to_nonce.get(&id) {
Expand All @@ -712,7 +747,6 @@ impl MemPool {
}
// Handle state before txs
// withdrawal
let withdrawals: Vec<WithdrawalRequestExtra> = withdrawals.collect();
self.finalize_withdrawals(state, withdrawals.clone())
.await?;
// deposits
Expand Down Expand Up @@ -884,7 +918,7 @@ impl MemPool {
async fn finalize_withdrawals(
&mut self,
state: &mut MemStateTree<'_>,
mut withdrawals: Vec<WithdrawalRequestExtra>,
withdrawals: Vec<WithdrawalRequestExtra>,
) -> Result<()> {
// check mem block state
assert!(self.mem_block.withdrawals().is_empty());
Expand All @@ -893,36 +927,6 @@ impl MemPool {
assert!(self.mem_block.finalized_custodians().is_none());
assert!(self.mem_block.txs().is_empty());

fn filter_withdrawals(
state: &MemStateTree<'_>,
withdrawal: &WithdrawalRequestExtra,
) -> bool {
let id = state
.get_account_id_by_script_hash(&withdrawal.raw().account_script_hash().unpack())
.expect("get id")
.expect("id exist");
let nonce = state.get_nonce(id).expect("get nonce");
let expected_nonce: u32 = withdrawal.raw().nonce().unpack();
// ignore withdrawal mismatch the nonce
nonce == expected_nonce
}

// find withdrawals from pending
withdrawals.retain(|withdrawal| filter_withdrawals(state, withdrawal));
// package withdrawals
if withdrawals.len() < MAX_MEM_BLOCK_WITHDRAWALS {
for entry in self.pending().values() {
if let Some(withdrawal) = entry.withdrawals.first() {
if filter_withdrawals(state, withdrawal) {
withdrawals.push(withdrawal.clone());
}
if withdrawals.len() >= MAX_MEM_BLOCK_WITHDRAWALS {
break;
}
}
}
}

let max_withdrawal_capacity = std::u128::MAX;
let finalized_custodians = {
// query withdrawals from ckb-indexer
Expand Down
13 changes: 11 additions & 2 deletions crates/tests/src/testing_tool/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,22 @@ pub async fn construct_block(
mem_pool: &mut MemPool,
deposit_requests: Vec<DepositRequest>,
) -> anyhow::Result<ProduceBlockResult> {
construct_block_with_timestamp(chain, mem_pool, deposit_requests, 0).await
construct_block_with_timestamp(chain, mem_pool, deposit_requests, 0, true).await
}

pub async fn construct_block_with_timestamp(
chain: &Chain,
mem_pool: &mut MemPool,
deposit_requests: Vec<DepositRequest>,
timestamp: u64,
refresh_mem_pool: bool,
) -> anyhow::Result<ProduceBlockResult> {
if !refresh_mem_pool {
assert!(
deposit_requests.is_empty(),
"skip refresh mem pool, bug deposits isn't empty"
)
}
let stake_cell_owner_lock_hash = H256::zero();
let db = chain.store().begin_transaction();
let generator = chain.generator();
Expand Down Expand Up @@ -520,7 +527,9 @@ pub async fn construct_block_with_timestamp(
};
mem_pool.set_provider(Box::new(provider));
// refresh mem block
mem_pool.reset_mem_block().await?;
if refresh_mem_pool {
mem_pool.reset_mem_block().await?;
}
let provider = DummyMemPoolProvider {
deposit_cells: Vec::default(),
fake_blocktime: Duration::from_millis(0),
Expand Down
6 changes: 4 additions & 2 deletions crates/tests/src/tests/restore_mem_pool_pending_withdrawal.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::time::Duration;

use crate::testing_tool::chain::{build_sync_tx, construct_block, restart_chain, setup_chain};
use crate::testing_tool::chain::{
build_sync_tx, construct_block, construct_block_with_timestamp, restart_chain, setup_chain,
};
use crate::testing_tool::common::random_always_success_script;
use crate::testing_tool::mem_pool_provider::DummyMemPoolProvider;

Expand Down Expand Up @@ -188,7 +190,7 @@ async fn test_restore_mem_pool_pending_withdrawal() {
let block_result = {
let mem_pool = chain.mem_pool().as_ref().unwrap();
let mut mem_pool = mem_pool.lock().await;
construct_block(&chain, &mut mem_pool, vec![])
construct_block_with_timestamp(&chain, &mut mem_pool, vec![], 0, false)
.await
.unwrap()
};
Expand Down
4 changes: 2 additions & 2 deletions crates/tests/src/tests/unlock_withdrawal_to_owner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async fn test_build_unlock_to_owner_tx() {
let withdrawal_block_result = {
let mem_pool = chain.mem_pool().as_ref().unwrap();
let mut mem_pool = mem_pool.lock().await;
construct_block_with_timestamp(&chain, &mut mem_pool, vec![], BLOCK_TIMESTAMP)
construct_block_with_timestamp(&chain, &mut mem_pool, vec![], BLOCK_TIMESTAMP, true)
.await
.unwrap()
};
Expand Down Expand Up @@ -546,7 +546,7 @@ async fn test_build_unlock_to_owner_tx() {
let provider = DummyMemPoolProvider::default();
mem_pool.set_provider(Box::new(provider));
mem_pool.reset_mem_block().await.unwrap();
construct_block_with_timestamp(&chain, &mut mem_pool, vec![], BLOCK_TIMESTAMP2)
construct_block_with_timestamp(&chain, &mut mem_pool, vec![], BLOCK_TIMESTAMP2, true)
.await
.unwrap()
};
Expand Down

0 comments on commit f9896af

Please sign in to comment.