Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize mem pool package #500

Merged
merged 3 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions crates/block-producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const TRANSACTION_EXCEEDED_MAXIMUM_BLOCK_BYTES_ERROR: &str = "ExceededMaximumBlo
const TRANSACTION_FAILED_TO_RESOLVE_ERROR: &str = "TransactionFailedToResolve";
/// 524_288 we choose this value because it is smaller than the MAX_BLOCK_BYTES which is 597K
const MAX_ROLLUP_WITNESS_SIZE: usize = 1 << 19;
const WAIT_PRODUCE_BLOCK_SECONDS: u64 = 45;
const WAIT_PRODUCE_BLOCK_SECONDS: u64 = 90;

enum SubmitResult {
Submitted,
Expand Down Expand Up @@ -236,6 +236,13 @@ impl BlockProducer {
.as_secs()
< WAIT_PRODUCE_BLOCK_SECONDS
{
log::debug!(
"skip produce new block, last committed is {}s ago",
self.last_committed_l2_block
.committed_at
.elapsed()
.as_secs()
);
return Ok(());
}

Expand Down Expand Up @@ -343,8 +350,20 @@ impl BlockProducer {

// get txs & withdrawal requests from mem pool
let (opt_finalized_custodians, block_param) = {
let t = Instant::now();
log::debug!("[compose_next_block_submit_tx] acquire mem-pool",);
let mem_pool = self.mem_pool.lock().await;
mem_pool.output_mem_block(&OutputParam::new(retry_count))?
log::debug!(
"[compose_next_block_submit_tx] unlock mem-pool {}ms",
t.elapsed().as_millis()
);
let t = Instant::now();
let output = mem_pool.output_mem_block(&OutputParam::new(retry_count))?;
log::debug!(
"[compose_next_block_submit_tx] output mem block {}ms",
t.elapsed().as_millis()
);
output
};
let deposit_cells = block_param.deposits.clone();

Expand Down
21 changes: 18 additions & 3 deletions crates/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use gw_types::{
prelude::{Builder as GWBuilder, Entity as GWEntity, Pack as GWPack, Unpack as GWUnpack},
};
use smol::lock::Mutex;
use std::{collections::HashSet, convert::TryFrom, sync::Arc};
use std::{collections::HashSet, convert::TryFrom, sync::Arc, time::Instant};

#[derive(Debug, Clone)]
pub struct ChallengeCell {
Expand Down Expand Up @@ -252,7 +252,16 @@ impl Chain {
if !self.complete_initial_syncing {
// Do first notify
let tip_block_hash: H256 = self.local_state.tip.hash().into();
smol::block_on(async { mem_pool.lock().await.notify_new_tip(tip_block_hash) })?;
smol::block_on(async {
log::debug!("[complete_initial_syncing] acquire mem-pool",);
let t = Instant::now();
mem_pool.lock().await.notify_new_tip(tip_block_hash)?;
log::debug!(
"[complete_initial_syncing] unlock mem-pool {}ms",
t.elapsed().as_millis()
);
Ok::<(), anyhow::Error>(())
})?;
}
}
self.complete_initial_syncing = true;
Expand Down Expand Up @@ -829,7 +838,13 @@ impl Chain {
&& (is_revert_happend || self.complete_initial_syncing)
{
// update mem pool state
smol::block_on(async { mem_pool.lock().await.notify_new_tip(tip_block_hash) })?;
smol::block_on(async {
log::debug!("[sync] acquire mem-pool",);
let t = Instant::now();
mem_pool.lock().await.notify_new_tip(tip_block_hash)?;
log::debug!("[sync] unlock mem-pool {}ms", t.elapsed().as_millis());
Ok::<(), anyhow::Error>(())
})?;
}
}

Expand Down
11 changes: 9 additions & 2 deletions crates/rpc-server/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ struct RequestSubmitter {
}

impl RequestSubmitter {
const MAX_CHANNEL_SIZE: usize = 1000;
const MAX_CHANNEL_SIZE: usize = 2000;
const MAX_BATCH_SIZE: usize = 20;
const INTERVAL_MS: Duration = Duration::from_millis(100);

Expand Down Expand Up @@ -347,7 +347,13 @@ impl RequestSubmitter {
// check mem block empty slots
loop {
if !self.submit_rx.is_empty() {
log::debug!("[Mem-pool background job] check mem-pool acquire mem_pool",);
let t = Instant::now();
let mem_pool = self.mem_pool.lock().await;
log::debug!(
"[Mem-pool background job] check-mem-pool unlock mem_pool {}ms",
t.elapsed().as_millis()
);
// continue to batch process if we have enough mem block slots
if !mem_pool.is_mem_txs_full(Self::MAX_BATCH_SIZE) {
break;
Expand Down Expand Up @@ -427,10 +433,11 @@ impl RequestSubmitter {
};

if !items.is_empty() {
log::debug!("[Mem-pool background job] acquire mem_pool",);
let t = Instant::now();
let mut mem_pool = self.mem_pool.lock().await;
log::debug!(
"Mem-pool background job: unlock mem_pool {}ms",
"[Mem-pool background job] unlock mem_pool {}ms",
t.elapsed().as_millis()
);
for entry in items {
Expand Down