Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tracing #567

Merged
merged 2 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
411 changes: 295 additions & 116 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ async-channel = "1.4.2"
async-jsonrpc-client = { version = "0.3.0", default-features = false, features = ["http-tokio"] }
clap = "2.33.3"
ctrlc = {version = "3.2.1", features = ["termination"]}
env_logger = "0.8.3"
futures = "0.3.13"
log = "0.4.14"
serde_json = "1.0"
Expand All @@ -53,9 +52,15 @@ hex = "0.4"
async-trait = "0.1"
semver = "1.0"
rayon = "1.5"
sentry = "0.23.0"
sentry-log = "0.23.0"
thiserror = "1.0"
# For latest tracing
sentry = { git = "https://github.com/getsentry/sentry-rust", rev = "df694a49595d6890c510d80b85cfbb4b5ae6159a" }
sentry-tracing = { git = "https://github.com/getsentry/sentry-rust", rev = "df694a49595d6890c510d80b85cfbb4b5ae6159a" }
tracing = { version = "0.1", features = ["attributes"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "parking_lot", "smallvec", "tracing-log"] }
tracing-opentelemetry = "0.16"
opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] }
opentelemetry = { version = "0.16", features = ["rt-tokio"] }

[target.'cfg(all(not(target_env = "msvc"), not(target_os="macos")))'.dependencies]
tikv-jemallocator = { version = "0.4.0", features = ["unprefixed_malloc_on_supported_platforms"] }
Expand Down
7 changes: 7 additions & 0 deletions crates/block-producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use std::{
time::{Duration, Instant},
};
use tokio::sync::Mutex;
use tracing::instrument;

const MAX_BLOCK_OUTPUT_PARAM_RETRY_COUNT: usize = 10;
const TRANSACTION_SCRIPT_ERROR: &str = "TransactionScriptError";
Expand Down Expand Up @@ -79,7 +80,9 @@ fn generate_custodian_cells(
deposit_cells.iter().map(to_custodian).collect()
}

#[instrument(skip_all)]
async fn resolve_tx_deps(rpc_client: &RPCClient, tx_hash: [u8; 32]) -> Result<Vec<CellInfo>> {
#[instrument(skip_all)]
async fn resolve_dep_group(rpc_client: &RPCClient, dep: CellDep) -> Result<Vec<CellDep>> {
// return dep
if dep.dep_type() == DepType::Code.into() {
Expand Down Expand Up @@ -237,6 +240,7 @@ impl BlockProducer {
self.last_submitted_tx_hash.clone()
}

#[instrument(skip_all, fields(event = %event))]
pub async fn handle_event(&mut self, event: ChainEvent) -> Result<()> {
if let Some(ref tests_control) = self.tests_control {
match tests_control.payload().await {
Expand Down Expand Up @@ -372,6 +376,7 @@ impl BlockProducer {
Ok(())
}

#[instrument(skip_all, fields(retry_count = retry_count))]
async fn compose_next_block_submit_tx(
&mut self,
median_time: Duration,
Expand Down Expand Up @@ -535,6 +540,7 @@ impl BlockProducer {
}
}

#[instrument(skip_all, fields(block = block_number))]
async fn submit_block_tx(
&mut self,
block_number: u64,
Expand Down Expand Up @@ -645,6 +651,7 @@ impl BlockProducer {
}
}

#[instrument(skip_all, fields(block = args.block.raw().number().unpack()))]
async fn complete_tx_skeleton(&self, args: CompleteTxArgs) -> Result<Transaction> {
let CompleteTxArgs {
deposit_cells,
Expand Down
4 changes: 4 additions & 0 deletions crates/block-producer/src/custodian.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use anyhow::Result;
use gw_rpc_client::rpc_client::{QueryResult, RPCClient};
use gw_types::offchain::CollectedCustodianCells;
use tracing::instrument;

pub const MAX_CUSTODIANS: usize = 50;

#[instrument(skip_all, fields(last_finalized_block_number = last_finalized_block_number))]
pub async fn query_mergeable_custodians(
rpc_client: &RPCClient,
collected_custodians: CollectedCustodianCells,
Expand Down Expand Up @@ -31,6 +33,7 @@ pub async fn query_mergeable_custodians(
.await
}

#[instrument(skip_all, fields(last_finalized_block_number = last_finalized_block_number))]
async fn query_mergeable_ckb_custodians(
rpc_client: &RPCClient,
collected: CollectedCustodianCells,
Expand All @@ -49,6 +52,7 @@ async fn query_mergeable_ckb_custodians(
.await
}

#[instrument(skip_all, fields(last_finalized_block_number = last_finalized_block_number))]
async fn query_mergeable_sudt_custodians(
rpc_client: &RPCClient,
collected: CollectedCustodianCells,
Expand Down
1 change: 1 addition & 0 deletions crates/block-producer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod replay_block;
pub mod runner;
pub mod stake;
pub mod test_mode_control;
pub mod trace;
pub mod types;
pub mod utils;
pub mod withdrawal;
Expand Down
22 changes: 5 additions & 17 deletions crates/block-producer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ static GLOBAL_ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

use anyhow::{Context, Result};
use clap::{App, Arg, SubCommand};
use gw_block_producer::{db_block_validator, runner};
use gw_block_producer::{db_block_validator, runner, trace};
use gw_config::Config;
use gw_version::Version;
use sentry_log::LogFilter;
use std::{fs, path::Path};

const COMMAND_RUN: &str = "run";
Expand Down Expand Up @@ -103,15 +102,18 @@ async fn run_cli() -> Result<()> {
(COMMAND_RUN, Some(m)) => {
let config_path = m.value_of(ARG_CONFIG).unwrap();
let config = read_config(&config_path)?;
let _guard = trace::init(config.trace)?;
runner::run(config, m.is_present(ARG_SKIP_CONFIG_CHECK)).await?;
}
(COMMAND_EXAMPLE_CONFIG, Some(m)) => {
let path = m.value_of(ARG_OUTPUT_PATH).unwrap();
let _guard = trace::init(None)?;
generate_example_config(path)?;
}
(COMMAND_VERIFY_DB_BLOCK, Some(m)) => {
let config_path = m.value_of(ARG_CONFIG).unwrap();
let config = read_config(&config_path)?;
let _guard = trace::init(None)?;
let from_block: Option<u64> = m.value_of(ARG_FROM_BLOCK).map(str::parse).transpose()?;
let to_block: Option<u64> = m.value_of(ARG_TO_BLOCK).map(str::parse).transpose()?;
db_block_validator::verify(config, from_block, to_block).await?;
Expand All @@ -120,6 +122,7 @@ async fn run_cli() -> Result<()> {
// default command: start a Godwoken node
let config_path = "./config.toml";
let config = read_config(&config_path)?;
let _guard = trace::init(config.trace)?;
runner::run(config, false).await?;
}
};
Expand All @@ -130,20 +133,5 @@ async fn run_cli() -> Result<()> {
/// Default to number of cpus, pass `worker_threads` to manually configure workers.
#[tokio::main(flavor = "multi_thread")]
async fn main() {
init_log();
run_cli().await.expect("run cli");
}

fn init_log() {
let logger = env_logger::builder()
.parse_env(env_logger::Env::default().default_filter_or("info"))
.build();
let level = logger.filter();
let logger = sentry_log::SentryLogger::with_dest(logger).filter(|md| match md.level() {
log::Level::Error | log::Level::Warn => LogFilter::Event,
_ => LogFilter::Ignore,
});
log::set_boxed_logger(Box::new(logger))
.map(|()| log::set_max_level(level))
.expect("set log");
}
11 changes: 11 additions & 0 deletions crates/block-producer/src/poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use gw_web3_indexer::indexer::Web3Indexer;
use serde_json::json;
use std::{collections::HashSet, sync::Arc};
use tokio::sync::Mutex;
use tracing::instrument;

#[derive(thiserror::Error, Debug)]
#[error("chain updater query l1 tx {tx_hash} error {source}")]
Expand Down Expand Up @@ -79,6 +80,7 @@ impl ChainUpdater {
}

// Start syncing
#[instrument(skip_all, fields(event = %_event))]
pub async fn handle_event(&mut self, _event: ChainEvent) -> Result<()> {
let initial_syncing = !self.initialized;
// Always start from last valid tip on l1
Expand Down Expand Up @@ -135,6 +137,7 @@ impl ChainUpdater {
Ok(())
}

#[instrument(skip_all)]
pub async fn try_sync(&mut self) -> anyhow::Result<()> {
let valid_tip_l1_block_number = {
let chain = self.chain.lock().await;
Expand Down Expand Up @@ -195,6 +198,7 @@ impl ChainUpdater {
Ok(())
}

#[instrument(skip_all)]
pub async fn update(&mut self, txs: &[Tx]) -> anyhow::Result<()> {
for tx in txs.iter() {
self.update_single(&tx.tx_hash).await?;
Expand All @@ -204,6 +208,7 @@ impl ChainUpdater {
Ok(())
}

#[instrument(skip_all)]
async fn update_single(&mut self, tx_hash: &H256) -> anyhow::Result<()> {
if let Some(last_tx_hash) = &self.last_tx_hash {
if last_tx_hash == tx_hash {
Expand Down Expand Up @@ -306,6 +311,7 @@ impl ChainUpdater {
Ok(())
}

#[instrument(skip_all)]
async fn find_l2block_on_l1(&self, committed_info: L2BlockCommittedInfo) -> Result<bool> {
let rpc_client = &self.rpc_client;
let tx_hash: gw_common::H256 =
Expand All @@ -320,6 +326,7 @@ impl ChainUpdater {
Ok(l1_block_hash == Some(block_hash))
}

#[instrument(skip_all)]
async fn revert_to_valid_tip_on_l1(&mut self) -> Result<()> {
let db = { self.chain.lock().await.store().begin_transaction() };
let mut revert_l1_actions = Vec::new();
Expand Down Expand Up @@ -404,6 +411,7 @@ impl ChainUpdater {
Ok(())
}

#[instrument(skip_all)]
fn extract_rollup_action(&self, tx: &Transaction) -> Result<RollupAction> {
let rollup_type_hash: [u8; 32] = {
let hash = self.rollup_type_script.calc_script_hash();
Expand Down Expand Up @@ -438,6 +446,7 @@ impl ChainUpdater {
RollupAction::from_slice(&output_type).map_err(|e| anyhow!("invalid rollup action {}", e))
}

#[instrument(skip_all)]
async fn extract_challenge_context(
&self,
tx: &Transaction,
Expand Down Expand Up @@ -485,6 +494,7 @@ impl ChainUpdater {
unreachable!("challenge output not found");
}

#[instrument(skip_all)]
async fn extract_deposit_requests(
&self,
tx: &Transaction,
Expand Down Expand Up @@ -534,6 +544,7 @@ impl ChainUpdater {
}
}

#[instrument(skip_all)]
fn try_parse_deposit_request(
cell_output: &CellOutput,
cell_data: &Bytes,
Expand Down
3 changes: 3 additions & 0 deletions crates/block-producer/src/produce_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use gw_types::{
},
prelude::*,
};
use tracing::instrument;

#[derive(Clone)]
pub struct ProduceBlockResult {
Expand All @@ -45,6 +46,7 @@ pub struct ProduceBlockParam {
/// this method take txs & withdrawal requests from tx pool and produce a new block
/// the package method should packs the items in order:
/// withdrawals, then deposits, finally the txs. Thus, the state-validator can verify this correctly
#[instrument(skip_all)]
pub fn produce_block(
db: &StoreTransaction,
generator: &Generator,
Expand Down Expand Up @@ -172,6 +174,7 @@ pub fn produce_block(
}

// Generate produce block param
#[instrument(skip_all, fields(mem_block = mem_block.block_info().number().unpack()))]
pub fn generate_produce_block_param(
store: &Store,
mut mem_block: MemBlock,
Expand Down
1 change: 1 addition & 0 deletions crates/block-producer/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ pub async fn run(config: Config, skip_config_check: bool) -> Result<()> {
)),
None => sentry::init(()),
};

// Enable smol threads before smol::spawn
let runtime_threads = match std::env::var(SMOL_THREADS_ENV_VAR) {
Ok(s) => s.parse()?,
Expand Down
51 changes: 51 additions & 0 deletions crates/block-producer/src/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use anyhow::Result;
use gw_config::Trace;
use sentry_tracing::EventFilter;
use tracing_subscriber::prelude::*;

pub struct ShutdownGuard {
trace: Option<Trace>,
}

impl Drop for ShutdownGuard {
fn drop(&mut self) {
if let Some(Trace::Jaeger) = self.trace {
opentelemetry::global::shutdown_tracer_provider(); // Sending remaining spans
}
}
}

pub fn init(trace: Option<Trace>) -> Result<ShutdownGuard> {
let env_filter_layer = tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| tracing_subscriber::EnvFilter::try_new("info"))?;

// NOTE: `traces_sample_rate` in sentry client option is 0.0 by default, which disable sentry
// tracing. Here we just use sentry-log feature.
let sentry_layer = sentry_tracing::layer().event_filter(|md| match md.level() {
&tracing::Level::ERROR | &tracing::Level::WARN => EventFilter::Event,
_ => EventFilter::Ignore,
});

let registry = tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(env_filter_layer)
.with(sentry_layer);

match trace {
Some(Trace::Jaeger) => {
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());

let jaeger_layer = {
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("godwoken")
.install_batch(opentelemetry::runtime::Tokio)?;
tracing_opentelemetry::layer().with_tracer(tracer)
};

registry.with(jaeger_layer).try_init()?
}
None => registry.try_init()?,
}

Ok(ShutdownGuard { trace })
}
8 changes: 8 additions & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ use std::{
path::PathBuf,
};

#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize, Hash)]
magicalne marked this conversation as resolved.
Show resolved Hide resolved
#[serde(rename_all = "lowercase")]
pub enum Trace {
Jaeger,
}

#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
pub struct Config {
pub node_mode: NodeMode,
Expand All @@ -32,6 +38,8 @@ pub struct Config {
pub store: StoreConfig,
pub sentry_dsn: Option<String>,
#[serde(default)]
pub trace: Option<Trace>,
#[serde(default)]
pub consensus: ConsensusConfig,
pub reload_config_github_url: Option<GithubConfigUrl>,
#[serde(default)]
Expand Down
1 change: 1 addition & 0 deletions crates/generator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ log = "0.4"
hex = "0.4"
tokio = "1.15"
arc-swap = "1.5"
tracing = { version = "0.1", features = ["attributes"] }

[dev-dependencies]
gw-utils = {path = "../utils" }
Loading