Skip to content

Commit

Permalink
Merge pull request #733 from zeroqn/backport-feat-block-producer-trac…
Browse files Browse the repository at this point in the history
…e-chain-task-run

backport: feat(block-producer): trace chain task run func
  • Loading branch information
zeroqn committed Jun 20, 2022
2 parents ebef0fd + 21a2304 commit 659f5e8
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 34 deletions.
2 changes: 1 addition & 1 deletion crates/block-producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl BlockProducer {
self.last_submitted_tx_hash.clone()
}

#[instrument(skip_all, fields(event = %event))]
#[instrument(skip_all, name = "block producer handle_event")]
pub async fn handle_event(&mut self, event: ChainEvent) -> Result<()> {
if let Some(ref tests_control) = self.tests_control {
match tests_control.payload().await {
Expand Down
2 changes: 2 additions & 0 deletions crates/block-producer/src/challenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use gw_utils::genesis_info::CKBGenesisInfo;
use gw_utils::transaction_skeleton::TransactionSkeleton;
use gw_utils::wallet::Wallet;
use tokio::sync::Mutex;
use tracing::instrument;

use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
Expand Down Expand Up @@ -110,6 +111,7 @@ impl Challenger {
}
}

#[instrument(skip_all, name = "challenger handle_event")]
pub async fn handle_event(&mut self, event: ChainEvent) -> Result<()> {
if let Some(ref tests_control) = self.tests_control {
match tests_control.payload().await {
Expand Down
2 changes: 2 additions & 0 deletions crates/block-producer/src/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use gw_types::core::Status;
use gw_types::offchain::{global_state_from_slice, CellInfo, InputCellInfo, TxStatus};
use gw_types::packed::{CellDep, CellInput, Transaction, WitnessArgs};
use gw_types::prelude::Unpack;
use tracing::instrument;

use std::collections::HashSet;
use std::convert::TryFrom;
Expand Down Expand Up @@ -72,6 +73,7 @@ impl Cleaner {
}
}

#[instrument(skip_all, name = "cleaner handle_event")]
pub async fn handle_event(&self, _event: ChainEvent) -> Result<()> {
if matches!(self.query_rollup_status().await?, Status::Halting) {
return Ok(());
Expand Down
2 changes: 1 addition & 1 deletion crates/block-producer/src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl ChainUpdater {
}

// Start syncing
#[instrument(skip_all, fields(event = %_event))]
#[instrument(skip_all, name = "chain updater handle_event")]
pub async fn handle_event(&mut self, _event: ChainEvent) -> Result<()> {
let initial_syncing = !self.initialized;
// Always start from last valid tip on l1
Expand Down
117 changes: 85 additions & 32 deletions crates/block-producer/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use tokio::{
spawn,
sync::{broadcast, mpsc, Mutex},
};
use tracing::{info_span, instrument};

const MIN_CKB_VERSION: &str = "0.40.0";
const EVENT_TIMEOUT_SECONDS: u64 = 30;
Expand All @@ -76,6 +77,21 @@ struct ChainTaskContext {
withdrawal_unlocker: Option<FinalizedWithdrawalUnlocker>,
cleaner: Option<Arc<Cleaner>>,
}

struct ChainTaskRunStatus {
opt_tip_number_hash: Option<(u64, H256)>,
last_event_time: Instant,
}

impl Default for ChainTaskRunStatus {
fn default() -> Self {
ChainTaskRunStatus {
opt_tip_number_hash: None,
last_event_time: Instant::now(),
}
}
}

struct ChainTask {
rpc_client: RPCClient,
poll_interval: Duration,
Expand Down Expand Up @@ -137,6 +153,7 @@ impl ChainTask {
}
}

#[instrument(skip_all, fields(tip_number = tip_number, tip_hash = %tip_hash.pack()))]
async fn sync_next(
&self,
tip_number: u64,
Expand Down Expand Up @@ -303,36 +320,32 @@ impl ChainTask {
}
}

async fn run(&mut self, backoff: &mut ExponentialBackoff) -> Result<()> {
// How to get tip_number and tip_hash only once? then loop chain task run only?
#[instrument(skip_all, err(Debug))]
async fn run(&mut self, status: &ChainTaskRunStatus) -> Result<ChainTaskRunStatus> {
// get tip
let (mut tip_number, mut tip_hash) = {
let tip = self.rpc_client.get_tip().await?;
let tip_number: u64 = tip.number().unpack();
let tip_hash: H256 = tip.block_hash().unpack();
(tip_number, tip_hash)
let (tip_number, tip_hash) = match status.opt_tip_number_hash {
Some((number, hash)) => (number, hash),
None => {
let tip = self.rpc_client.get_tip().await?;
let tip_number: u64 = tip.number().unpack();
let tip_hash: H256 = tip.block_hash().unpack();
(tip_number, tip_hash)
}
};

let mut last_event_time = Instant::now();
let opt_tip_number_hash = self
.metrics_monitor
.instrument(self.sync_next(tip_number, tip_hash, &status.last_event_time))
.await?;

loop {
// Exit if shutdown event is received.
if self.shutdown_event.try_recv().is_ok() {
log::info!("ChainTask existed successfully");
return Ok(());
}
let updated_status = ChainTaskRunStatus {
opt_tip_number_hash: opt_tip_number_hash
.or_else(|| status.opt_tip_number_hash.to_owned()),
last_event_time: Instant::now(),
};

if let Some((_tip_number, _tip_hash)) = self
.metrics_monitor
.instrument(self.sync_next(tip_number, tip_hash, &last_event_time))
.await?
{
tip_number = _tip_number;
tip_hash = _tip_hash;
last_event_time = Instant::now();
}
backoff.reset();
tokio::time::sleep(self.poll_interval).await;
}
Ok(updated_status)
}
}

Expand Down Expand Up @@ -870,6 +883,8 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
let shutdown_send = shutdown_send.clone();
move || {
rt_handle.block_on(async move {
use tracing::Instrument;

let _tx = chain_task_ended_tx;
let ctx = ChainTaskContext {
chain_updater,
Expand All @@ -886,13 +901,51 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
shutdown_send,
shutdown_event_recv,
);
while let Err(err) = chain_task.run(&mut backoff).await {
if err.is::<RPCRequestError>() {
log::error!("chain polling loop request error, will retry: {}", err);
tokio::time::sleep(backoff.next_sleep()).await;
} else {
log::error!("chain polling loop exit unexpected, error: {}", err);
break;

let mut run_status = ChainTaskRunStatus::default();
loop {
// Exit if shutdown event is received.
if chain_task.shutdown_event.try_recv().is_ok() {
log::info!("ChainTask existed successfully");
return;
}

let run_span = info_span!("chain_task_run");
match chain_task
.run(&run_status)
.instrument(run_span.clone())
.await
{
Ok(updated_status) => {
run_status = updated_status;
backoff.reset();

let sleep_span =
info_span!(parent: &run_span, "chain_task interval sleep");
tokio::time::sleep(chain_task.poll_interval)
.instrument(sleep_span)
.await;
}
Err(err) if err.is::<RPCRequestError>() => {
// Reset status and refresh tip number hash
run_status = ChainTaskRunStatus::default();
let backoff_sleep = backoff.next_sleep();
log::error!(
"chain polling loop request error, will retry in {}s: {}",
backoff_sleep.as_secs(),
err
);

let sleep_span =
info_span!(parent: &run_span, "chain_task backoff sleep");
tokio::time::sleep(backoff_sleep)
.instrument(sleep_span)
.await;
}
Err(err) => {
log::error!("chain polling loop exit unexpected, error: {}", err);
break;
}
}
}
});
Expand Down
2 changes: 2 additions & 0 deletions crates/block-producer/src/withdrawal_unlocker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use gw_utils::fee::fill_tx_fee;
use gw_utils::genesis_info::CKBGenesisInfo;
use gw_utils::transaction_skeleton::TransactionSkeleton;
use gw_utils::wallet::Wallet;
use tracing::instrument;

use crate::types::ChainEvent;
use crate::utils;
Expand Down Expand Up @@ -50,6 +51,7 @@ impl FinalizedWithdrawalUnlocker {
}
}

#[instrument(skip_all, name = "withdrawal unlocker handle_event")]
pub async fn handle_event(&mut self, _event: &ChainEvent) -> Result<()> {
let unlocked = &self.unlocked_set;
let rpc_client = &self.unlocker.rpc_client;
Expand Down

0 comments on commit 659f5e8

Please sign in to comment.