Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/refactor some CKB/godwoken RPC related code #702

Merged
merged 5 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 20 additions & 44 deletions crates/block-producer/src/challenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::cleaner::{Cleaner, Verifier};
use crate::test_mode_control::TestModeControl;
use crate::types::ChainEvent;
use crate::utils;
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, Context, Result};
use ckb_types::prelude::{Builder, Entity, Reader};
use gw_chain::chain::{Chain, ChallengeCell, SyncEvent};
use gw_challenge::cancel_challenge::{
Expand Down Expand Up @@ -41,7 +41,7 @@ use tokio::sync::Mutex;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;

const MAX_CANCEL_CYCLES: u64 = 7000_0000;
const MAX_CANCEL_TX_BYTES: u64 = ckb_chain_spec::consensus::MAX_BLOCK_BYTES;
Expand Down Expand Up @@ -291,7 +291,12 @@ impl Challenger {
let verifier_tx_hash = self.rpc_client.send_transaction(&tx).await?;
log::info!("Create verifier in tx {}", to_hex(&verifier_tx_hash));

self.wait_tx_proposed(verifier_tx_hash).await?;
tokio::time::timeout(
Duration::from_secs(30),
self.rpc_client.ckb.wait_tx_proposed(verifier_tx_hash),
)
.await
.with_context(|| format!("waiting for tx proposed 0x{}", to_hex(&verifier_tx_hash)))??;

// Build cancellation transaction
let challenge_input = to_input_cell_info(challenge_cell);
Expand Down Expand Up @@ -372,13 +377,17 @@ impl Challenger {
let challenge_cell = to_cell_info(challenge_cell);
let challenge_tx_block_number = {
let tx_hash: H256 = challenge_cell.out_point.tx_hash().unpack();
let tx_status = self.rpc_client.get_transaction_status(tx_hash).await?;
let tx_status = self.rpc_client.ckb.get_transaction_status(tx_hash).await?;
if !matches!(tx_status, Some(TxStatus::Committed)) {
log::debug!("challenge tx isn't committed");
return Ok(());
}

let query = self.rpc_client.get_transaction_block_number(tx_hash).await;
let query = self
.rpc_client
.ckb
.get_transaction_block_number(tx_hash)
.await;
query?.ok_or_else(|| anyhow!("challenge tx block number not found"))?
};

Expand Down Expand Up @@ -614,7 +623,12 @@ impl Challenger {
}

log::debug!("can't find a owner cell for verifier, try wait verifier tx committed");
self.wait_tx_committed(verifier_tx_hash).await?;
tokio::time::timeout(
Duration::from_secs(30),
self.rpc_client.ckb.wait_tx_committed(verifier_tx_hash),
)
.await
.with_context(|| format!("wait for tx committed 0x{}", to_hex(&verifier_tx_hash)))??;

let owner_lock = self.wallet.lock_script().to_owned();
let cell = {
Expand All @@ -625,44 +639,6 @@ impl Challenger {
Ok(to_input_cell_info(cell))
}

async fn wait_tx_proposed(&self, tx_hash: H256) -> Result<()> {
let timeout = Duration::new(30, 0);
let now = Instant::now();

loop {
match self.rpc_client.get_transaction_status(tx_hash).await? {
Some(TxStatus::Proposed) | Some(TxStatus::Committed) => return Ok(()),
Some(TxStatus::Pending) => (),
None => return Err(anyhow!("tx hash {} not found", to_hex(&tx_hash))),
}

if now.elapsed() >= timeout {
return Err(anyhow!("wait tx hash {} timeout", to_hex(&tx_hash)));
}

tokio::time::sleep(Duration::new(3, 0)).await;
}
}

async fn wait_tx_committed(&self, tx_hash: H256) -> Result<()> {
let timeout = Duration::new(30, 0);
let now = Instant::now();

loop {
match self.rpc_client.get_transaction_status(tx_hash).await? {
Some(TxStatus::Committed) => return Ok(()),
Some(TxStatus::Proposed) | Some(TxStatus::Pending) => (),
None => return Err(anyhow!("tx hash {} not found", to_hex(&tx_hash))),
}

if now.elapsed() >= timeout {
return Err(anyhow!("wait tx hash {} timeout", to_hex(&tx_hash)));
}

tokio::time::sleep(Duration::new(3, 0)).await;
}
}

async fn dry_run_transaction(&self, tx: &Transaction, action: &str) -> Result<()> {
match self.rpc_client.dry_run_transaction(tx).await {
Ok(cycles) => {
Expand Down
10 changes: 7 additions & 3 deletions crates/block-producer/src/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ impl Cleaner {
let rpc_client = &self.rpc_client;
let tip_l1_block_number = rpc_client.get_tip().await?.number().unpack();
for tx_hash in consumed_txs {
let tx_status = rpc_client.get_transaction_status(tx_hash).await?;
let tx_status = rpc_client.ckb.get_transaction_status(tx_hash).await?;
if !matches!(tx_status, Some(TxStatus::Committed)) {
continue;
}

if let Some(block_nubmer) = rpc_client.get_transaction_block_number(tx_hash).await? {
if let Some(block_nubmer) = rpc_client.ckb.get_transaction_block_number(tx_hash).await?
{
if block_nubmer < tip_l1_block_number.saturating_sub(L1_FINALITY_BLOCKS) {
confirmed.insert(tx_hash);
}
Expand Down Expand Up @@ -140,7 +141,9 @@ impl Cleaner {
let rpc_client = &self.rpc_client;
for (idx, tx_hash) in consumed_txs {
let consumed = match tx_hash {
Some(tx_hash) => !matches!(rpc_client.get_transaction_status(tx_hash).await?, None),
Some(tx_hash) => {
!matches!(rpc_client.ckb.get_transaction_status(tx_hash).await?, None)
}
None => false,
};
if consumed {
Expand All @@ -152,6 +155,7 @@ impl Cleaner {
verifiers.get(idx).expect("exists").to_owned().0
};
let verifier_status = rpc_client
.ckb
.get_transaction_status(verifier.tx_hash())
.await?;
if !matches!(verifier_status, Some(TxStatus::Committed)) {
Expand Down
14 changes: 11 additions & 3 deletions crates/block-producer/src/debugger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ pub async fn build_mock_transaction(
.ok_or_else(|| anyhow!("can't find input cell"))?;

let input_tx_hash = input.previous_output().tx_hash().unpack();
let input_block_hash = match rpc_client.get_transaction_status(input_tx_hash).await? {
let input_block_hash = match rpc_client.ckb.get_transaction_status(input_tx_hash).await? {
Some(TxStatus::Committed) => {
let block_hash = rpc_client.get_transaction_block_hash(input_tx_hash).await?;
let block_hash = rpc_client
.ckb
.get_transaction_block_hash(input_tx_hash)
.await?;
Some(block_hash.ok_or_else(|| anyhow!("not found input cell tx hash"))?)
}
_ => None,
Expand Down Expand Up @@ -160,9 +163,14 @@ pub async fn build_mock_transaction(
}
.ok_or_else(|| anyhow!("can't find dep cell"))?;
let dep_cell_tx_hash = cell_dep.out_point().tx_hash().unpack();
let dep_cell_block_hash = match rpc_client.get_transaction_status(dep_cell_tx_hash).await? {
let dep_cell_block_hash = match rpc_client
.ckb
.get_transaction_status(dep_cell_tx_hash)
.await?
{
Some(TxStatus::Committed) => {
let query = rpc_client
.ckb
.get_transaction_block_hash(dep_cell_tx_hash)
.await?;
Some(query.ok_or_else(|| anyhow!("not found dep cell tx hash"))?)
Expand Down
40 changes: 16 additions & 24 deletions crates/block-producer/src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use gw_chain::chain::{
Chain, ChallengeCell, L1Action, L1ActionContext, RevertL1ActionContext, RevertedL1Action,
SyncParam,
};
use gw_jsonrpc_types::ckb_jsonrpc_types::{BlockNumber, HeaderView, TransactionWithStatus, Uint32};
use gw_jsonrpc_types::ckb_jsonrpc_types::{BlockNumber, HeaderView, Uint32};
use gw_rpc_client::{
indexer_types::{Order, Pagination, ScriptType, SearchKey, SearchKeyFilter, Tx},
rpc_client::RPCClient,
Expand Down Expand Up @@ -233,18 +233,18 @@ impl ChainUpdater {
}
}

let tx: Option<TransactionWithStatus> = self
let tx_with_status = self
.rpc_client
.ckb
.request(
"get_transaction",
Some(ClientParams::Array(vec![json!(tx_hash)])),
)
.await?;
let tx_with_status =
tx.ok_or_else(|| QueryL1TxError::new(tx_hash, anyhow!("cannot locate tx")))?;
.get_transaction_with_status(tx_hash.0.into())
.await?
.ok_or_else(|| QueryL1TxError::new(tx_hash, anyhow!("cannot locate tx")))?;
let tx = {
let tx: ckb_types::packed::Transaction = tx_with_status.transaction.inner.into();
let tx: ckb_types::packed::Transaction = tx_with_status
.transaction
.ok_or_else(|| QueryL1TxError::new(tx_hash, anyhow!("cannot locate tx")))?
.inner
.into();
Transaction::new_unchecked(tx.as_bytes())
};
let block_hash = tx_with_status.tx_status.block_hash.ok_or_else(|| {
Expand Down Expand Up @@ -335,13 +335,13 @@ impl ChainUpdater {
let rpc_client = &self.rpc_client;
let tx_hash: gw_common::H256 =
From::<[u8; 32]>::from(committed_info.transaction_hash().unpack());
let tx_status = rpc_client.get_transaction_status(tx_hash).await?;
let tx_status = rpc_client.ckb.get_transaction_status(tx_hash).await?;
if !matches!(tx_status, Some(TxStatus::Committed)) {
return Ok(false);
}

let block_hash: [u8; 32] = committed_info.block_hash().unpack();
let l1_block_hash = rpc_client.get_transaction_block_hash(tx_hash).await?;
let l1_block_hash = rpc_client.ckb.get_transaction_block_hash(tx_hash).await?;
Ok(l1_block_hash == Some(block_hash))
}

Expand Down Expand Up @@ -524,20 +524,12 @@ impl ChainUpdater {
// Load cell denoted by the transaction input
let tx_hash: H256 = input.previous_output().tx_hash().unpack();
let index = input.previous_output().index().unpack();
let tx: Option<TransactionWithStatus> = self
let tx = self
.rpc_client
.ckb
.request(
"get_transaction",
Some(ClientParams::Array(vec![json!(tx_hash)])),
)
.await?;
let tx_with_status =
tx.ok_or_else(|| QueryL1TxError::new(&tx_hash, anyhow!("cannot locate tx")))?;
let tx = {
let tx: ckb_types::packed::Transaction = tx_with_status.transaction.inner.into();
Transaction::new_unchecked(tx.as_bytes())
};
.get_transaction(tx_hash.0.into())
.await?
.ok_or_else(|| QueryL1TxError::new(&tx_hash, anyhow!("cannot locate tx")))?;
let cell_output = tx
.raw()
.outputs()
Expand Down
1 change: 1 addition & 0 deletions crates/block-producer/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ impl BaseInitComponents {
let secp_data: Bytes = {
let out_point = config.genesis.secp_data_dep.out_point.clone();
rpc_client
.ckb
.get_transaction(out_point.tx_hash.0.into())
.await?
.ok_or_else(|| anyhow!("can not found transaction: {:?}", out_point.tx_hash))?
Expand Down
2 changes: 1 addition & 1 deletion crates/block-producer/src/withdrawal_unlocker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl FinalizedWithdrawalUnlocker {
// Check unlock tx
let mut drop_txs = vec![];
for (tx_hash, withdrawal_to_unlock) in self.unlock_txs.iter() {
match rpc_client.get_transaction_status(*tx_hash).await {
match rpc_client.ckb.get_transaction_status(*tx_hash).await {
Err(err) => {
// Always drop this unlock tx and retry to avoid "lock" withdrawal cell
log::info!("[unlock withdrawal] get unlock tx failed {}, drop it", err);
Expand Down
Loading