From 776965ae9d6ee818595197288d5cca379c564368 Mon Sep 17 00:00:00 2001 From: zhuliquan Date: Fri, 25 Oct 2024 09:46:29 +0800 Subject: [PATCH] feat: support record cpu profile (#761) --- crates/arroyo-server-common/Cargo.toml | 5 ++ crates/arroyo-server-common/src/lib.rs | 3 ++ crates/arroyo-server-common/src/profile.rs | 59 ++++++++++++++++++++++ 3 files changed, 67 insertions(+) create mode 100644 crates/arroyo-server-common/src/profile.rs diff --git a/crates/arroyo-server-common/Cargo.toml b/crates/arroyo-server-common/Cargo.toml index 021d0a9d7..a27ddc0e2 100644 --- a/crates/arroyo-server-common/Cargo.toml +++ b/crates/arroyo-server-common/Cargo.toml @@ -36,5 +36,10 @@ toml = "0.8.13" dirs = "5.0.1" uuid = { version = "1.8.0", features = ["v4"] } +# profile +flate2 = "1.0.30" +pprof = { version = "0.13.0", features = ["flamegraph", "protobuf-codec"] } +serde = { version = "1.0.96", features = ["derive"] } + [build-dependencies] vergen = { version = "8.0.0", features = ["build", "cargo", "git", "gitcl"] } diff --git a/crates/arroyo-server-common/src/lib.rs b/crates/arroyo-server-common/src/lib.rs index 7677cd829..1f9b6cdc5 100644 --- a/crates/arroyo-server-common/src/lib.rs +++ b/crates/arroyo-server-common/src/lib.rs @@ -1,5 +1,6 @@ #![allow(clippy::type_complexity)] +mod profile; pub mod shutdown; use anyhow::anyhow; @@ -13,6 +14,7 @@ use axum::Router; use hyper::Body; use lazy_static::lazy_static; use once_cell::sync::OnceCell; +use profile::handle_get_profile; use prometheus::{register_int_counter, Encoder, IntCounter, ProtobufEncoder, TextEncoder}; use reqwest::Client; use serde_json::{json, Value}; @@ -292,6 +294,7 @@ pub async fn start_admin_server(service: &str) -> anyhow::Result<()> { .route("/details", get(details)) .route("/config", get(config_route)) .route("/debug/pprof/heap", get(handle_get_heap)) + .route("/debug/pprof/profile", get(handle_get_profile)) .with_state(state); let addr = SocketAddr::new(addr, port); diff --git a/crates/arroyo-server-common/src/profile.rs b/crates/arroyo-server-common/src/profile.rs new file mode 100644 index 000000000..fae2d784f --- /dev/null +++ b/crates/arroyo-server-common/src/profile.rs @@ -0,0 +1,59 @@ +use axum::{ + extract::Query, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use flate2::write::GzEncoder; +use flate2::Compression; +use pprof::{protos::Message, ProfilerGuardBuilder}; +use std::time::Duration; +use tokio::time::sleep; + +pub async fn handle_get_profile( + Query(params): Query, +) -> Result { + let frequency = params.frequency.unwrap_or(3000); + let duration = params.duration.unwrap_or(30); + match generate_profile(frequency, duration).await { + Ok(body) => Ok(( + StatusCode::OK, + [("Content-Type", "application/octet-stream")], + [( + "Content-Disposition", + "attachment; filename=\"profile.pb.gz\"", + )], + body, + ) + .into_response()), + Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), + } +} + +#[derive(serde::Deserialize)] +pub struct ProfileParams { + /// CPU profile collecting frequency, unit: Hz + pub frequency: Option, + /// CPU profile collecting duration, unit: second + pub duration: Option, +} + +async fn generate_profile( + frequency: i32, + duration: u64, +) -> Result, Box> { + let guard = ProfilerGuardBuilder::default() + .frequency(frequency) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build()?; + + sleep(Duration::from_secs(duration)).await; + + let profile = guard.report().build()?.pprof()?; + + let mut body = Vec::new(); + let mut encoder = GzEncoder::new(&mut body, Compression::default()); + + profile.write_to_writer(&mut encoder)?; + encoder.finish()?; + Ok(body) +}