Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into remove-web3-indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
classicalliu committed Jun 2, 2022
2 parents 17f8ba5 + c77e019 commit 6c2ec12
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 78 deletions.
38 changes: 15 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 23 additions & 10 deletions crates/block-producer/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use gw_mem_pool::{
};
use gw_p2p_network::P2PNetwork;
use gw_rpc_client::{
ckb_client::CKBClient, contract::ContractsCellDepManager, indexer_client::CKBIndexerClient,
rpc_client::RPCClient,
ckb_client::CKBClient, contract::ContractsCellDepManager, error::RPCRequestError,
indexer_client::CKBIndexerClient, rpc_client::RPCClient,
};
use gw_rpc_server::{
registry::{Registry, RegistryArgs},
Expand All @@ -49,7 +49,10 @@ use gw_types::{
packed::{Byte32, CellDep, NumberHash, RollupConfig, Script},
prelude::*,
};
use gw_utils::{genesis_info::CKBGenesisInfo, since::EpochNumberWithFraction, wallet::Wallet};
use gw_utils::{
exponential_backoff::ExponentialBackoff, genesis_info::CKBGenesisInfo,
since::EpochNumberWithFraction, wallet::Wallet,
};
use semver::Version;
use std::{
collections::HashMap,
Expand Down Expand Up @@ -300,7 +303,7 @@ impl ChainTask {
}
}

async fn run(mut self) -> Result<()> {
async fn run(&mut self, backoff: &mut ExponentialBackoff) -> Result<()> {
// get tip
let (mut tip_number, mut tip_hash) = {
let tip = self.rpc_client.get_tip().await?;
Expand All @@ -327,6 +330,7 @@ impl ChainTask {
tip_hash = _tip_hash;
last_event_time = Instant::now();
}
backoff.reset();
tokio::time::sleep(self.poll_interval).await;
}
}
Expand Down Expand Up @@ -572,8 +576,11 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
let offchain_mock_context = base
.init_offchain_mock_context(block_producer_config)
.await?;
let mem_pool_provider =
DefaultMemPoolProvider::new(base.rpc_client.clone(), base.store.clone());
let mem_pool_provider = DefaultMemPoolProvider::new(
base.rpc_client.clone(),
base.store.clone(),
config.mem_pool.mem_block.clone(),
);
let notify_controller = {
let opt_ws_listen = config.rpc_server.err_receipt_ws_listen.as_ref();
opt_ws_listen.map(|_| NotifyService::new().start())
Expand Down Expand Up @@ -871,15 +878,22 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
withdrawal_unlocker,
cleaner,
};
let chain_task = ChainTask::create(
let mut backoff = ExponentialBackoff::new(Duration::from_secs(1));
let mut chain_task = ChainTask::create(
rpc_client,
Duration::from_secs(3),
ctx,
shutdown_send,
shutdown_event_recv,
);
if let Err(err) = chain_task.run().await {
log::error!("chain polling loop exit unexpected, error: {}", err);
while let Err(err) = chain_task.run(&mut backoff).await {
if err.is::<RPCRequestError>() {
log::error!("chain polling loop request error, will retry: {}", err);
tokio::time::sleep(backoff.next_sleep()).await;
} else {
log::error!("chain polling loop exit unexpected, error: {}", err);
break;
}
}
});
}
Expand Down Expand Up @@ -1092,7 +1106,6 @@ fn is_hardfork_switch_eq(l: &HardForkSwitch, r: &HardForkSwitch) -> bool {

fn is_l1_query_error(err: &anyhow::Error) -> bool {
use crate::poller::QueryL1TxError;
use gw_rpc_client::error::RPCRequestError;

// TODO: filter rpc request method?
err.downcast_ref::<RPCRequestError>().is_some()
Expand Down
19 changes: 19 additions & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,14 @@ pub struct MemPoolConfig {
pub restore_path: PathBuf,
pub publish: Option<PublishMemPoolConfig>,
pub subscribe: Option<SubscribeMemPoolConfig>,
pub mem_block: MemBlockConfig,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct MemBlockConfig {
pub max_deposits: usize,
pub max_withdrawals: usize,
pub max_txs: usize,
}

// Field default value for backward config file compitability
Expand All @@ -298,6 +306,17 @@ impl Default for MemPoolConfig {
restore_path: default_restore_path(),
publish: None,
subscribe: None,
mem_block: MemBlockConfig::default(),
}
}
}

impl Default for MemBlockConfig {
fn default() -> Self {
Self {
max_deposits: 100,
max_withdrawals: 100,
max_txs: 1000,
}
}
}
Expand Down
6 changes: 0 additions & 6 deletions crates/mem-pool/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
/// MAX deposits in the mem block
pub const MAX_MEM_BLOCK_DEPOSITS: usize = 100;
/// MAX withdrawals in the mem block
pub const MAX_MEM_BLOCK_WITHDRAWALS: usize = 100;
/// MAX withdrawals in the mem block
pub const MAX_MEM_BLOCK_TXS: usize = 2000;
/// MIN CKB deposit capacity, calculated from custodian cell size
pub const MIN_CKB_DEPOSIT_CAPACITY: u64 = 298_00000000;
/// MIN Simple UDT deposit capacity, calculated from custodian cell size + simple UDT script
Expand Down
14 changes: 10 additions & 4 deletions crates/mem-pool/src/default_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use anyhow::{bail, Result};
use async_trait::async_trait;
use gw_config::MemBlockConfig;
use gw_rpc_client::rpc_client::RPCClient;
use gw_store::{traits::chain_store::ChainStore, Store};
use gw_types::{
Expand All @@ -12,7 +13,7 @@ use gw_types::{
use tracing::instrument;

use crate::{
constants::{MAX_MEM_BLOCK_DEPOSITS, MIN_CKB_DEPOSIT_CAPACITY, MIN_SUDT_DEPOSIT_CAPACITY},
constants::{MIN_CKB_DEPOSIT_CAPACITY, MIN_SUDT_DEPOSIT_CAPACITY},
custodian::query_finalized_custodians,
traits::MemPoolProvider,
};
Expand All @@ -21,11 +22,16 @@ pub struct DefaultMemPoolProvider {
/// RPC client
rpc_client: RPCClient,
store: Store,
mem_block_config: MemBlockConfig,
}

impl DefaultMemPoolProvider {
pub fn new(rpc_client: RPCClient, store: Store) -> Self {
DefaultMemPoolProvider { rpc_client, store }
pub fn new(rpc_client: RPCClient, store: Store, mem_block_config: MemBlockConfig) -> Self {
DefaultMemPoolProvider {
rpc_client,
store,
mem_block_config,
}
}
}

Expand Down Expand Up @@ -75,7 +81,7 @@ impl MemPoolProvider for DefaultMemPoolProvider {
let rpc_client = self.rpc_client.clone();
rpc_client
.query_deposit_cells(
MAX_MEM_BLOCK_DEPOSITS,
self.mem_block_config.max_deposits,
MIN_CKB_DEPOSIT_CAPACITY,
MIN_SUDT_DEPOSIT_CAPACITY,
)
Expand Down
15 changes: 8 additions & 7 deletions crates/mem-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use gw_common::{
builtins::CKB_SUDT_ACCOUNT_ID, ckb_decimal::CKBCapacity, registry_address::RegistryAddress,
state::State, H256,
};
use gw_config::{MemPoolConfig, NodeMode};
use gw_config::{MemBlockConfig, MemPoolConfig, NodeMode};
use gw_dynamic_config::manager::DynamicConfigManager;
use gw_generator::{
constants::L2TX_MAX_CYCLES,
Expand Down Expand Up @@ -52,7 +52,6 @@ use tokio::sync::{broadcast, Mutex};
use tracing::instrument;

use crate::{
constants::{MAX_MEM_BLOCK_TXS, MAX_MEM_BLOCK_WITHDRAWALS},
custodian::AvailableCustodians,
mem_block::MemBlock,
restore_manager::RestoreManager,
Expand Down Expand Up @@ -103,6 +102,7 @@ pub struct MemPool {
mem_pool_state: Arc<MemPoolState>,
dynamic_config_manager: Arc<ArcSwap<DynamicConfigManager>>,
new_tip_publisher: broadcast::Sender<(H256, u64)>,
mem_block_config: MemBlockConfig,
}

pub struct MemPoolCreateArgs {
Expand Down Expand Up @@ -197,6 +197,7 @@ impl MemPool {
mem_pool_state,
dynamic_config_manager,
new_tip_publisher,
mem_block_config: config.mem_block,
};
mem_pool.restore_pending_withdrawals().await?;

Expand Down Expand Up @@ -259,7 +260,7 @@ impl MemPool {
}

pub fn is_mem_txs_full(&self, expect_slots: usize) -> bool {
self.mem_block.txs().len().saturating_add(expect_slots) > MAX_MEM_BLOCK_TXS
self.mem_block.txs().len().saturating_add(expect_slots) > self.mem_block_config.max_txs
}

pub fn pending_restored_tx_hashes(&mut self) -> &mut VecDeque<H256> {
Expand Down Expand Up @@ -294,10 +295,10 @@ impl MemPool {

// reject if mem block is full
// TODO: we can use the pool as a buffer
if self.mem_block.txs().len() >= MAX_MEM_BLOCK_TXS {
if self.mem_block.txs().len() >= self.mem_block_config.max_txs {
return Err(anyhow!(
"Mem block is full, MAX_MEM_BLOCK_TXS: {}",
MAX_MEM_BLOCK_TXS
self.mem_block_config.max_txs
));
}

Expand Down Expand Up @@ -645,13 +646,13 @@ impl MemPool {
withdrawals.retain(|w| filter_withdrawals(&mem_state, w));

// package withdrawals
if withdrawals.len() < MAX_MEM_BLOCK_WITHDRAWALS {
if withdrawals.len() < self.mem_block_config.max_withdrawals {
for entry in self.pending().values() {
if let Some(withdrawal) = entry.withdrawals.first() {
if filter_withdrawals(&mem_state, withdrawal) {
withdrawals.push(withdrawal.clone());
}
if withdrawals.len() >= MAX_MEM_BLOCK_WITHDRAWALS {
if withdrawals.len() >= self.mem_block_config.max_withdrawals {
break;
}
}
Expand Down
Loading

0 comments on commit 6c2ec12

Please sign in to comment.