Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v0.9.x refactor: output response if rpc-client fail parsing response #520

Merged
merged 1 commit into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 39 additions & 44 deletions crates/block-producer/src/poller.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#![allow(clippy::mutable_key_type)]

use crate::{types::ChainEvent, utils::to_result};
use crate::types::ChainEvent;
use anyhow::{anyhow, Result};
use async_jsonrpc_client::{Params as ClientParams, Transport};
use async_jsonrpc_client::Params as ClientParams;
use ckb_fixed_hash::H256;
use gw_chain::chain::{
Chain, ChallengeCell, L1Action, L1ActionContext, RevertL1ActionContext, RevertedL1Action,
Expand Down Expand Up @@ -154,21 +154,19 @@ impl ChainUpdater {
// here needs revising, once we relax this constraint for more performance.
let mut last_cursor = None;
loop {
let txs: Pagination<Tx> = to_result(
self.rpc_client
.indexer
.client()
.request(
"get_transactions",
Some(ClientParams::Array(vec![
json!(search_key),
json!(order),
json!(limit),
json!(last_cursor),
])),
)
.await?,
)?;
let txs: Pagination<Tx> = self
.rpc_client
.indexer
.request(
"get_transactions",
Some(ClientParams::Array(vec![
json!(search_key),
json!(order),
json!(limit),
json!(last_cursor),
])),
)
.await?;
if txs.objects.is_empty() {
break;
}
Expand Down Expand Up @@ -198,15 +196,14 @@ impl ChainUpdater {
}
self.last_tx_hash = Some(tx_hash.clone());

let tx: Option<TransactionWithStatus> = to_result(
self.rpc_client
.ckb
.request(
"get_transaction",
Some(ClientParams::Array(vec![json!(tx_hash)])),
)
.await?,
)?;
let tx: Option<TransactionWithStatus> = self
.rpc_client
.ckb
.request(
"get_transaction",
Some(ClientParams::Array(vec![json!(tx_hash)])),
)
.await?;
let tx_with_status =
tx.ok_or_else(|| anyhow::anyhow!("Cannot locate transaction: {:x}", tx_hash))?;
let tx = {
Expand All @@ -216,15 +213,14 @@ impl ChainUpdater {
let block_hash = tx_with_status.tx_status.block_hash.ok_or_else(|| {
anyhow::anyhow!("Transaction {:x} is not committed on chain!", tx_hash)
})?;
let header_view: Option<HeaderView> = to_result(
self.rpc_client
.ckb
.request(
"get_header",
Some(ClientParams::Array(vec![json!(block_hash)])),
)
.await?,
)?;
let header_view: Option<HeaderView> = self
.rpc_client
.ckb
.request(
"get_header",
Some(ClientParams::Array(vec![json!(block_hash)])),
)
.await?;
let header_view =
header_view.ok_or_else(|| anyhow::anyhow!("Cannot locate block: {:x}", block_hash))?;
let l2block_committed_info = L2BlockCommittedInfo::new_builder()
Expand Down Expand Up @@ -461,15 +457,14 @@ impl ChainUpdater {
// Load cell denoted by the transaction input
let tx_hash: H256 = input.previous_output().tx_hash().unpack();
let index = input.previous_output().index().unpack();
let tx: Option<TransactionWithStatus> = to_result(
self.rpc_client
.ckb
.request(
"get_transaction",
Some(ClientParams::Array(vec![json!(tx_hash)])),
)
.await?,
)?;
let tx: Option<TransactionWithStatus> = self
.rpc_client
.ckb
.request(
"get_transaction",
Some(ClientParams::Array(vec![json!(tx_hash)])),
)
.await?;
let tx_with_status =
tx.ok_or_else(|| anyhow::anyhow!("Cannot locate transaction: {:x}", tx_hash))?;
let tx = {
Expand Down
15 changes: 6 additions & 9 deletions crates/block-producer/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
types::ChainEvent,
};
use anyhow::{anyhow, Context, Result};
use async_jsonrpc_client::HttpClient;
use ckb_types::core::hardfork::HardForkSwitch;
use gw_chain::chain::Chain;
use gw_challenge::offchain::{OffChainMockContext, OffChainMockContextBuildArgs};
Expand All @@ -29,7 +28,10 @@ use gw_mem_pool::{
traits::MemPoolErrorTxHandler,
};
use gw_poa::PoA;
use gw_rpc_client::{contract::ContractsCellDepManager, rpc_client::RPCClient};
use gw_rpc_client::{
ckb_client::CKBClient, contract::ContractsCellDepManager, indexer_client::CKBIndexerClient,
rpc_client::RPCClient,
};
use gw_rpc_server::{
registry::{Registry, RegistryArgs},
server::start_jsonrpc_server,
Expand Down Expand Up @@ -60,7 +62,6 @@ const MIN_CKB_VERSION: &str = "0.40.0";
const SMOL_THREADS_ENV_VAR: &str = "SMOL_THREADS";
const DEFAULT_RUNTIME_THREADS: usize = 8;
const EVENT_TIMEOUT_SECONDS: u64 = 30;
const DEFAULT_HTTP_TIMEOUT: Duration = Duration::from_secs(15);

async fn poll_loop(
rpc_client: RPCClient,
Expand Down Expand Up @@ -252,12 +253,8 @@ impl BaseInitComponents {
};
let rollup_type_script: Script = config.chain.rollup_type_script.clone().into();
let rpc_client = {
let indexer_client = HttpClient::builder()
.timeout(DEFAULT_HTTP_TIMEOUT)
.build(config.rpc_client.indexer_url.to_owned())?;
let ckb_client = HttpClient::builder()
.timeout(DEFAULT_HTTP_TIMEOUT)
.build(config.rpc_client.ckb_url.to_owned())?;
let indexer_client = CKBIndexerClient::with_url(&config.rpc_client.indexer_url)?;
let ckb_client = CKBClient::with_url(&config.rpc_client.ckb_url)?;
let rollup_type_script =
ckb_types::packed::Script::new_unchecked(rollup_type_script.as_bytes());
RPCClient::new(
Expand Down
9 changes: 5 additions & 4 deletions crates/replay-chain/src/setup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{path::PathBuf, sync::Arc};

use anyhow::{anyhow, Context as AnyHowContext, Result};
use async_jsonrpc_client::HttpClient;
use ckb_types::{bytes::Bytes, prelude::Entity};
use gw_chain::chain::Chain;
use gw_config::{Config, StoreConfig};
Expand All @@ -15,7 +14,9 @@ use gw_generator::{
genesis::init_genesis,
Generator,
};
use gw_rpc_client::rpc_client::RPCClient;
use gw_rpc_client::{
ckb_client::CKBClient, indexer_client::CKBIndexerClient, rpc_client::RPCClient,
};
use gw_store::Store;
use gw_types::{offchain::RollupContext, packed::RollupConfig, prelude::Unpack};

Expand Down Expand Up @@ -61,8 +62,8 @@ pub fn setup(args: SetupArgs) -> Result<Context> {
};
let secp_data: Bytes = {
let rpc_client = {
let indexer_client = HttpClient::new(config.rpc_client.indexer_url.to_owned())?;
let ckb_client = HttpClient::new(config.rpc_client.ckb_url.to_owned())?;
let indexer_client = CKBIndexerClient::with_url(&config.rpc_client.indexer_url)?;
let ckb_client = CKBClient::with_url(&config.rpc_client.ckb_url)?;
let rollup_type_script =
ckb_types::packed::Script::new_unchecked(rollup_type_script.as_bytes());
RPCClient::new(
Expand Down
76 changes: 76 additions & 0 deletions crates/rpc-client/src/ckb_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crate::utils::{to_result, DEFAULT_HTTP_TIMEOUT};
use anyhow::{anyhow, bail, Result};
use async_jsonrpc_client::{HttpClient, Params as ClientParams, Transport};
use gw_jsonrpc_types::blockchain::CellDep;
use serde::de::DeserializeOwned;
use serde_json::json;

#[derive(Clone)]
pub struct CKBClient(HttpClient);

impl CKBClient {
pub fn new(ckb_client: HttpClient) -> Self {
Self(ckb_client)
}

pub fn with_url(url: &str) -> Result<Self> {
let client = HttpClient::builder()
.timeout(DEFAULT_HTTP_TIMEOUT)
.build(url)?;
Ok(Self::new(client))
}

fn client(&self) -> &HttpClient {
&self.0
}

pub async fn request<T: DeserializeOwned>(
&self,
method: &str,
params: Option<ClientParams>,
) -> Result<T> {
let response = self
.client()
.request(method, params)
.await
.map_err(|err| anyhow!("ckb client error, method: {} error: {}", method, err))?;
let response_str = response.to_string();
match to_result(response) {
Ok(r) => Ok(r),
Err(err) => {
log::error!(
"[ckb-client] Failed to parse response, method: {}, response: {}",
method,
response_str
);
Err(err)
}
}
}

pub async fn query_type_script(
&self,
contract: &str,
cell_dep: CellDep,
) -> Result<gw_jsonrpc_types::blockchain::Script> {
use gw_jsonrpc_types::ckb_jsonrpc_types::TransactionWithStatus;

let tx_hash = cell_dep.out_point.tx_hash;
let get_transaction = self.request(
"get_transaction",
Some(ClientParams::Array(vec![json!(tx_hash)])),
);
let tx = match to_result::<Option<TransactionWithStatus>>(get_transaction.await?)? {
Some(tx_with_status) => tx_with_status.transaction.inner,
None => bail!("{} {} tx not found", contract, tx_hash),
};

match tx.outputs.get(cell_dep.out_point.index.value() as usize) {
Some(output) => match output.type_.as_ref() {
Some(script) => Ok(script.to_owned().into()),
None => Err(anyhow!("{} {} tx hasn't type script", contract, tx_hash)),
},
None => Err(anyhow!("{} {} tx index not found", contract, tx_hash)),
}
}
}
35 changes: 4 additions & 31 deletions crates/rpc-client/src/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Instant;

use anyhow::{anyhow, bail, Result};
use arc_swap::{ArcSwap, Guard};
use async_jsonrpc_client::{HttpClient, Params as ClientParams, Transport};
use async_jsonrpc_client::Params as ClientParams;
use gw_config::{BlockProducerConfig, ContractTypeScriptConfig, ContractsCellDep};
use gw_jsonrpc_types::blockchain::{CellDep, Script};
use gw_types::packed::RollupConfig;
Expand All @@ -13,7 +13,6 @@ use serde_json::json;

use crate::indexer_types::{Cell, Order, Pagination, ScriptType, SearchKey};
use crate::rpc_client::RPCClient;
use crate::utils::to_result;

// Used in block producer and challenge
#[derive(Clone)]
Expand Down Expand Up @@ -145,32 +144,6 @@ pub async fn query_cell_deps(
})
}

pub async fn query_type_script(
ckb_client: &HttpClient,
contract: &str,
cell_dep: CellDep,
) -> Result<Script> {
use gw_jsonrpc_types::ckb_jsonrpc_types::TransactionWithStatus;

let tx_hash = cell_dep.out_point.tx_hash;
let get_transaction = ckb_client.request(
"get_transaction",
Some(ClientParams::Array(vec![json!(tx_hash)])),
);
let tx = match to_result::<Option<TransactionWithStatus>>(get_transaction.await?)? {
Some(tx_with_status) => tx_with_status.transaction.inner,
None => bail!("{} {} tx not found", contract, tx_hash),
};

match tx.outputs.get(cell_dep.out_point.index.value() as usize) {
Some(output) => match output.type_.as_ref() {
Some(script) => Ok(script.to_owned().into()),
None => Err(anyhow!("{} {} tx hasn't type script", contract, tx_hash)),
},
None => Err(anyhow!("{} {} tx index not found", contract, tx_hash)),
}
}

// For old config compatibility
#[allow(deprecated)]
#[deprecated]
Expand All @@ -179,7 +152,7 @@ pub async fn query_type_script_from_old_config(
config: &BlockProducerConfig,
) -> Result<ContractTypeScriptConfig> {
let query = |contract: &'static str, cell_dep: CellDep| -> _ {
query_type_script(&rpc_client.ckb, contract, cell_dep)
rpc_client.ckb.query_type_script(contract, cell_dep)
};

let state_validator = query("state validator", config.rollup_cell_type_dep.clone()).await?;
Expand Down Expand Up @@ -230,7 +203,7 @@ async fn query_by_type_script(
let order = Order::Desc;
let limit = Uint32::from(1);

let get_contract_cell = rpc_client.indexer.client().request(
let get_contract_cell = rpc_client.indexer.request(
"get_cells",
Some(ClientParams::Array(vec![
json!(search_key),
Expand All @@ -239,7 +212,7 @@ async fn query_by_type_script(
])),
);

let mut cells: Pagination<Cell> = to_result(get_contract_cell.await?)?;
let mut cells: Pagination<Cell> = get_contract_cell.await?;
match cells.objects.pop() {
Some(cell) => Ok(Into::into(CellDep {
dep_type: DepType::Code,
Expand Down
Loading