Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: feat(block-producer): trace chain task run func #733

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/block-producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl BlockProducer {
self.last_submitted_tx_hash.clone()
}

#[instrument(skip_all, fields(event = %event))]
#[instrument(skip_all, name = "block producer handle_event")]
pub async fn handle_event(&mut self, event: ChainEvent) -> Result<()> {
if let Some(ref tests_control) = self.tests_control {
match tests_control.payload().await {
Expand Down
2 changes: 2 additions & 0 deletions crates/block-producer/src/challenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use gw_utils::genesis_info::CKBGenesisInfo;
use gw_utils::transaction_skeleton::TransactionSkeleton;
use gw_utils::wallet::Wallet;
use tokio::sync::Mutex;
use tracing::instrument;

use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
Expand Down Expand Up @@ -110,6 +111,7 @@ impl Challenger {
}
}

#[instrument(skip_all, name = "challenger handle_event")]
pub async fn handle_event(&mut self, event: ChainEvent) -> Result<()> {
if let Some(ref tests_control) = self.tests_control {
match tests_control.payload().await {
Expand Down
2 changes: 2 additions & 0 deletions crates/block-producer/src/cleaner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use gw_types::core::Status;
use gw_types::offchain::{global_state_from_slice, CellInfo, InputCellInfo, TxStatus};
use gw_types::packed::{CellDep, CellInput, Transaction, WitnessArgs};
use gw_types::prelude::Unpack;
use tracing::instrument;

use std::collections::HashSet;
use std::convert::TryFrom;
Expand Down Expand Up @@ -72,6 +73,7 @@ impl Cleaner {
}
}

#[instrument(skip_all, name = "cleaner handle_event")]
pub async fn handle_event(&self, _event: ChainEvent) -> Result<()> {
if matches!(self.query_rollup_status().await?, Status::Halting) {
return Ok(());
Expand Down
2 changes: 1 addition & 1 deletion crates/block-producer/src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl ChainUpdater {
}

// Start syncing
#[instrument(skip_all, fields(event = %_event))]
#[instrument(skip_all, name = "chain updater handle_event")]
pub async fn handle_event(&mut self, _event: ChainEvent) -> Result<()> {
let initial_syncing = !self.initialized;
// Always start from last valid tip on l1
Expand Down
117 changes: 85 additions & 32 deletions crates/block-producer/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use tokio::{
spawn,
sync::{broadcast, mpsc, Mutex},
};
use tracing::{info_span, instrument};

const MIN_CKB_VERSION: &str = "0.40.0";
const EVENT_TIMEOUT_SECONDS: u64 = 30;
Expand All @@ -76,6 +77,21 @@ struct ChainTaskContext {
withdrawal_unlocker: Option<FinalizedWithdrawalUnlocker>,
cleaner: Option<Arc<Cleaner>>,
}

struct ChainTaskRunStatus {
opt_tip_number_hash: Option<(u64, H256)>,
last_event_time: Instant,
}

impl Default for ChainTaskRunStatus {
fn default() -> Self {
ChainTaskRunStatus {
opt_tip_number_hash: None,
last_event_time: Instant::now(),
}
}
}

struct ChainTask {
rpc_client: RPCClient,
poll_interval: Duration,
Expand Down Expand Up @@ -137,6 +153,7 @@ impl ChainTask {
}
}

#[instrument(skip_all, fields(tip_number = tip_number, tip_hash = %tip_hash.pack()))]
async fn sync_next(
&self,
tip_number: u64,
Expand Down Expand Up @@ -303,36 +320,32 @@ impl ChainTask {
}
}

async fn run(&mut self, backoff: &mut ExponentialBackoff) -> Result<()> {
// How to get tip_number and tip_hash only once? then loop chain task run only?
#[instrument(skip_all, err(Debug))]
async fn run(&mut self, status: &ChainTaskRunStatus) -> Result<ChainTaskRunStatus> {
// get tip
let (mut tip_number, mut tip_hash) = {
let tip = self.rpc_client.get_tip().await?;
let tip_number: u64 = tip.number().unpack();
let tip_hash: H256 = tip.block_hash().unpack();
(tip_number, tip_hash)
let (tip_number, tip_hash) = match status.opt_tip_number_hash {
Some((number, hash)) => (number, hash),
None => {
let tip = self.rpc_client.get_tip().await?;
let tip_number: u64 = tip.number().unpack();
let tip_hash: H256 = tip.block_hash().unpack();
(tip_number, tip_hash)
}
};

let mut last_event_time = Instant::now();
let opt_tip_number_hash = self
.metrics_monitor
.instrument(self.sync_next(tip_number, tip_hash, &status.last_event_time))
.await?;

loop {
// Exit if shutdown event is received.
if self.shutdown_event.try_recv().is_ok() {
log::info!("ChainTask existed successfully");
return Ok(());
}
let updated_status = ChainTaskRunStatus {
opt_tip_number_hash: opt_tip_number_hash
.or_else(|| status.opt_tip_number_hash.to_owned()),
last_event_time: Instant::now(),
};

if let Some((_tip_number, _tip_hash)) = self
.metrics_monitor
.instrument(self.sync_next(tip_number, tip_hash, &last_event_time))
.await?
{
tip_number = _tip_number;
tip_hash = _tip_hash;
last_event_time = Instant::now();
}
backoff.reset();
tokio::time::sleep(self.poll_interval).await;
}
Ok(updated_status)
}
}

Expand Down Expand Up @@ -870,6 +883,8 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
let shutdown_send = shutdown_send.clone();
move || {
rt_handle.block_on(async move {
use tracing::Instrument;

let _tx = chain_task_ended_tx;
let ctx = ChainTaskContext {
chain_updater,
Expand All @@ -886,13 +901,51 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
shutdown_send,
shutdown_event_recv,
);
while let Err(err) = chain_task.run(&mut backoff).await {
if err.is::<RPCRequestError>() {
log::error!("chain polling loop request error, will retry: {}", err);
tokio::time::sleep(backoff.next_sleep()).await;
} else {
log::error!("chain polling loop exit unexpected, error: {}", err);
break;

let mut run_status = ChainTaskRunStatus::default();
loop {
// Exit if shutdown event is received.
if chain_task.shutdown_event.try_recv().is_ok() {
log::info!("ChainTask existed successfully");
return;
}

let run_span = info_span!("chain_task_run");
match chain_task
.run(&run_status)
.instrument(run_span.clone())
.await
{
Ok(updated_status) => {
run_status = updated_status;
backoff.reset();

let sleep_span =
info_span!(parent: &run_span, "chain_task interval sleep");
tokio::time::sleep(chain_task.poll_interval)
.instrument(sleep_span)
.await;
}
Err(err) if err.is::<RPCRequestError>() => {
// Reset status and refresh tip number hash
run_status = ChainTaskRunStatus::default();
let backoff_sleep = backoff.next_sleep();
log::error!(
"chain polling loop request error, will retry in {}s: {}",
backoff_sleep.as_secs(),
err
);

let sleep_span =
info_span!(parent: &run_span, "chain_task backoff sleep");
tokio::time::sleep(backoff_sleep)
.instrument(sleep_span)
.await;
}
Err(err) => {
log::error!("chain polling loop exit unexpected, error: {}", err);
break;
}
}
}
});
Expand Down
2 changes: 2 additions & 0 deletions crates/block-producer/src/withdrawal_unlocker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use gw_utils::fee::fill_tx_fee;
use gw_utils::genesis_info::CKBGenesisInfo;
use gw_utils::transaction_skeleton::TransactionSkeleton;
use gw_utils::wallet::Wallet;
use tracing::instrument;

use crate::types::ChainEvent;
use crate::utils;
Expand Down Expand Up @@ -50,6 +51,7 @@ impl FinalizedWithdrawalUnlocker {
}
}

#[instrument(skip_all, name = "withdrawal unlocker handle_event")]
pub async fn handle_event(&mut self, _event: &ChainEvent) -> Result<()> {
let unlocked = &self.unlocked_set;
let rpc_client = &self.unlocker.rpc_client;
Expand Down