Skip to content

Commit

Permalink
metrics: add a hacky exporter of tokio metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef committed Dec 13, 2024
1 parent ebb13e2 commit 7850757
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 30 deletions.
31 changes: 29 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ itertools = "0.12"
libc = "0.2"
metrics = "0.23"
metrics-exporter-prometheus = "0.15"
metrics-util = "0.17.0"
moka = { version = "0.12", features = ["sync"] }
parking_lot = { version = "0.12.1" }
parking_lot_core = "0.9.9"
Expand Down Expand Up @@ -104,6 +105,7 @@ tikv-jemallocator = { version = "0.6.0", features = [
tikv-jemalloc-ctl = { version = "0.6.0", features = ["stats"] }
tl-proto = "0.4"
tokio = { version = "1", default-features = false }
tokio-metrics = { version = "0.3.1" }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.10", features = ["codec"] }
tower = "0.5"
Expand Down Expand Up @@ -138,9 +140,10 @@ tycho-util = { path = "./util", version = "0.1.4" }
weedb = { version = "0.3.8", git = "https://github.com/broxus/weedb.git", rev = "59728ea0c8703dd28a4c37dee05c1321cd81b966" }

[workspace.lints.rust]
future_incompatible = "warn"
nonstandard_style = "warn"
rust_2018_idioms = "warn"
future_incompatible = { level = "warn", priority = -1 }
nonstandard_style = { level = "warn", priority = -1 }
rust_2018_idioms = { level = "warn", priority = -1 }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }

[workspace.lints.clippy]
all = { level = "warn", priority = -1 }
Expand Down Expand Up @@ -214,6 +217,7 @@ useless_transmute = "warn"
verbose_file_reads = "warn"
zero_sized_map_values = "warn"


[profile.release]
lto = "thin"
codegen-units = 1
Expand Down
3 changes: 3 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ serde = { workspace = true }
serde_json = { workspace = true, features = ["preserve_order"] }
serde_path_to_error = { workspace = true }
tempfile = { workspace = true }
tokio-metrics = { workspace = true, optional = true }
metrics-util = { workspace = true, optional = true }
tikv-jemallocator = { workspace = true, features = [
"unprefixed_malloc_on_supported_platforms",
"background_threads",
Expand Down Expand Up @@ -73,6 +75,7 @@ rustc_version = { workspace = true }
default = ["jemalloc"]
jemalloc = ["dep:tikv-jemallocator", "dep:tikv-jemalloc-ctl", "dep:metrics"]
deadlock-detection = ["parking_lot/deadlock_detection"]
tokio-metrics = ["dep:tokio-metrics", "dep:metrics-util"]
debug = ["tycho-consensus/test"]
lto = ["weedb/lto"]

Expand Down
176 changes: 157 additions & 19 deletions cli/src/cmd/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::PathBuf;

use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use tokio::runtime::Runtime;
use tycho_core::global_config::GlobalConfig;
use tycho_util::cli::logger::{init_logger, set_abort_with_tracing};
use tycho_util::cli::{resolve_public_ip, signal};
Expand Down Expand Up @@ -75,27 +76,24 @@ impl CmdRun {
.stack_size(8 * 1024 * 1024)
.thread_name(|_| "rayon_worker".to_string())
.num_threads(node_config.threads.rayon_threads)
.build_global()
.unwrap();

tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(node_config.threads.tokio_workers)
.build()?
.block_on(async move {
let run_fut = tokio::spawn(self.run_impl(args, node_config));
let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS);
tokio::select! {
res = run_fut => res.unwrap(),
signal = stop_fut => match signal {
Ok(signal) => {
tracing::info!(?signal, "received termination signal");
Ok(())
}
Err(e) => Err(e.into()),
.build_global()?;

let rt = build_tokio_runtime(&node_config)?;

rt.block_on(async move {
let run_fut = tokio::spawn(self.run_impl(args, node_config));
let stop_fut = signal::any_signal(signal::TERMINATION_SIGNALS);
tokio::select! {
res = run_fut => res.unwrap(),
signal = stop_fut => match signal {
Ok(signal) => {
tracing::info!(?signal, "received termination signal");
Ok(())
}
Err(e) => Err(e.into()),
}
})
}
})
}

async fn run_impl(self, args: BaseArgs, node_config: NodeConfig) -> Result<()> {
Expand Down Expand Up @@ -157,6 +155,146 @@ impl CmdRun {
}
}

fn build_tokio_runtime(node_config: &NodeConfig) -> Result<Runtime> {
#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
use std::time::Duration;

#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
use tokio::runtime::{HistogramConfiguration, LogHistogram};

let mut rt = tokio::runtime::Builder::new_multi_thread();

let num_workers = node_config.threads.tokio_workers;
rt.enable_all().worker_threads(num_workers);

#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
let hist_params = LogHistogram::builder()
.min_value(Duration::from_micros(100))
.max_value(Duration::from_secs_f64(3.2))
.max_buckets(NUM_BUCKETS)?;

#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
const NUM_BUCKETS: usize = 16;
#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
{
rt.enable_metrics_poll_time_histogram()
.metrics_poll_time_histogram_configuration(HistogramConfiguration::log(hist_params));
}

let rt = rt.build()?;

#[cfg(all(feature = "tokio-metrics", tokio_unstable))]
rt.spawn(async move {
const fn fill_log_buckets() -> [f64; NUM_BUCKETS] {
let mut boundaries = [0.0; NUM_BUCKETS];
let mut i = 0;
let mut current_micros = 100.0;

while i < NUM_BUCKETS {
boundaries[i] = current_micros / 1_000_000.0;
current_micros *= 2.0;
i += 1;
}

boundaries
}
const LOG_BUCKETS: [f64; NUM_BUCKETS] = fill_log_buckets();


// we can use histogram when https://github.com/metrics-rs/metrics/issues/509 is resolved
// otherwise it will burn CPU and memory
let handle = tokio::runtime::Handle::current();
let runtime_monitor = tokio_metrics::RuntimeMonitor::new(&handle);

const METRIC_NAME: &str = "tycho_tokio_poll_count_time_bucket";
const METRIC_SUM: &str = "tycho_tokio_poll_count_time_sum";
const METRIC_COUNT: &str = "tycho_tokio_poll_count_time_count";

for interval in runtime_monitor.intervals() {
let histogram = interval.poll_count_histogram;

let mut cumulative_count = 0;
let mut sum = 0.0;

// poll time histogram via gauages
for (idx, value) in histogram.iter().enumerate() {
let bucket = LOG_BUCKETS[idx];
cumulative_count += *value;
let le = format!("{:.6}", bucket);
metrics::gauge!(METRIC_NAME, "le" => le).set(cumulative_count as f64);
sum += bucket * (*value as f64);
}
// Add sum and count
metrics::gauge!(METRIC_SUM).set(sum);
metrics::gauge!(METRIC_COUNT).set(cumulative_count as f64);
// Add +Inf bucket
metrics::gauge!(METRIC_NAME, "le" => "+Inf").set(cumulative_count as f64);

let mean_poll_time = interval.mean_poll_duration.as_secs_f64();
metrics::gauge!("tycho_tokio_mean_poll_time").set(mean_poll_time);

let max_poll_time = interval.mean_poll_duration_worker_max.as_secs_f64();
metrics::gauge!("tycho_tokio_max_poll_time").set(max_poll_time);

let metrics = handle.metrics();
metrics::gauge!("tycho_tokio_num_alive_tasks").set(metrics.num_alive_tasks() as f64);

let global_queue_depth = metrics.global_queue_depth();
metrics::gauge!("tycho_tokio_global_queue_depth").set(global_queue_depth as f64);

let num_blocking_threads = metrics.num_blocking_threads();
metrics::gauge!("tycho_tokio_num_blocking_threads").set(num_blocking_threads as f64);

let spawned_tasks = metrics.spawned_tasks_count();
metrics::gauge!("tycho_tokio_spawned_tasks_count").set(spawned_tasks as f64);


metrics::gauge!("tycho_tokio_num_idle_blocking_threads")
.set(metrics.num_idle_blocking_threads() as f64);

metrics::gauge!("tycho_tokio_injection_queue_depth")
.set(metrics.global_queue_depth() as f64);

let blocking_queue_length = metrics.blocking_queue_depth();
metrics::gauge!("tycho_tokio_blocking_queue_depth").set(blocking_queue_length as f64);

for worker_id in 0..num_workers {
let park_count = metrics.worker_park_count(worker_id);
metrics::gauge!("tycho_tokio_worker_park_count", "worker_id" => format!("{worker_id}")).set(park_count as f64);

let worker_noop_count = metrics.worker_noop_count(worker_id);
metrics::gauge!("tycho_tokio_worker_noop_count", "worker_id" => format!("{worker_id}")).set(worker_noop_count as f64);

let worker_steal_count = metrics.worker_steal_count(worker_id);
metrics::gauge!("tycho_tokio_worker_steal_count", "worker_id" => format!("{worker_id}")).set(worker_steal_count as f64);

let worker_steal_operations = metrics.worker_steal_operations(worker_id);
metrics::gauge!("tycho_tokio_worker_steal_operations", "worker_id" => format!("{worker_id}")).set(worker_steal_operations as f64);

let worker_local_queue_depth = metrics.worker_local_queue_depth(worker_id);
metrics::gauge!("tycho_tokio_worker_local_queue_depth", "worker_id" => format!("{worker_id}")).set(worker_local_queue_depth as f64);

let worker_mean_poll_time = metrics.worker_mean_poll_time(worker_id).as_secs_f64();
metrics::gauge!("tycho_tokio_worker_mean_poll_time", "worker_id" => format!("{worker_id}")).set(worker_mean_poll_time);

let worker_busy_time = metrics.worker_total_busy_duration(worker_id).as_secs_f64();
metrics::gauge!("tycho_tokio_worker_busy_time", "worker_id" => format!("{worker_id}")).set(worker_busy_time);
}
metrics::gauge!("tycho_tokio_io_driver_fd_registered_count").set(metrics.io_driver_fd_registered_count() as f64);
metrics::gauge!("tycho_tokio_io_driver_fd_deregistered_count").set(metrics.io_driver_fd_deregistered_count() as f64);

metrics::gauge!("tycho_tokio_remote_schedule_count").set(metrics.remote_schedule_count() as f64);

metrics::gauge!("tycho_tokio_budget_forced_yield_count").set(metrics.budget_forced_yield_count() as f64);


tokio::time::sleep(Duration::from_millis(5000)).await;
}
});

Ok(rt)
}

fn init_metrics(config: &MetricsConfig) -> Result<()> {
use metrics_exporter_prometheus::Matcher;
const EXPONENTIAL_SECONDS: &[f64] = &[
Expand Down
1 change: 1 addition & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ check_format: install_fmt
# Clippy go brr.
lint:
#cargo clippy --all-targets --all-features --workspace # update when clippy is fixed
export RUSTFLAGS="-cfg tokio_unstable"
cargo clippy --all-targets --all-features -p tycho-block-util -p tycho-core -p tycho-network -p tycho-rpc -p tycho-storage -p tycho-consensus -p tycho-util -p tycho-collator -p tycho-control -p tycho-light-node -p tycho-cli -- -D warnings

# Generates cargo docs.
Expand Down
4 changes: 3 additions & 1 deletion scripts/check-metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def process_metric_arg(
if constant_value:
metric_names.add(constant_value)
else:
if not any(dir in file_path for dir in blacklisted_dirs):
if not any(
dir in file_path for dir in blacklisted_dirs
) and not arg.startswith("format!"):
print(f"Warning: Unresolved metric name '{arg}' in {file_path}")


Expand Down
Loading

0 comments on commit 7850757

Please sign in to comment.