Skip to content

Commit

Permalink
feat: support record cpu profile (#761)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuliquan authored Oct 25, 2024
1 parent e5395fb commit 776965a
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 0 deletions.
5 changes: 5 additions & 0 deletions crates/arroyo-server-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,10 @@ toml = "0.8.13"
dirs = "5.0.1"
uuid = { version = "1.8.0", features = ["v4"] }

# profile
flate2 = "1.0.30"
pprof = { version = "0.13.0", features = ["flamegraph", "protobuf-codec"] }
serde = { version = "1.0.96", features = ["derive"] }

[build-dependencies]
vergen = { version = "8.0.0", features = ["build", "cargo", "git", "gitcl"] }
3 changes: 3 additions & 0 deletions crates/arroyo-server-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(clippy::type_complexity)]

mod profile;
pub mod shutdown;

use anyhow::anyhow;
Expand All @@ -13,6 +14,7 @@ use axum::Router;
use hyper::Body;
use lazy_static::lazy_static;
use once_cell::sync::OnceCell;
use profile::handle_get_profile;
use prometheus::{register_int_counter, Encoder, IntCounter, ProtobufEncoder, TextEncoder};
use reqwest::Client;
use serde_json::{json, Value};
Expand Down Expand Up @@ -292,6 +294,7 @@ pub async fn start_admin_server(service: &str) -> anyhow::Result<()> {
.route("/details", get(details))
.route("/config", get(config_route))
.route("/debug/pprof/heap", get(handle_get_heap))
.route("/debug/pprof/profile", get(handle_get_profile))
.with_state(state);

let addr = SocketAddr::new(addr, port);
Expand Down
59 changes: 59 additions & 0 deletions crates/arroyo-server-common/src/profile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use axum::{
extract::Query,
http::StatusCode,
response::{IntoResponse, Response},
};
use flate2::write::GzEncoder;
use flate2::Compression;
use pprof::{protos::Message, ProfilerGuardBuilder};
use std::time::Duration;
use tokio::time::sleep;

pub async fn handle_get_profile(
Query(params): Query<ProfileParams>,
) -> Result<Response, StatusCode> {
let frequency = params.frequency.unwrap_or(3000);
let duration = params.duration.unwrap_or(30);
match generate_profile(frequency, duration).await {
Ok(body) => Ok((
StatusCode::OK,
[("Content-Type", "application/octet-stream")],
[(
"Content-Disposition",
"attachment; filename=\"profile.pb.gz\"",
)],
body,
)
.into_response()),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}

#[derive(serde::Deserialize)]
pub struct ProfileParams {
/// CPU profile collecting frequency, unit: Hz
pub frequency: Option<i32>,
/// CPU profile collecting duration, unit: second
pub duration: Option<u64>,
}

async fn generate_profile(
frequency: i32,
duration: u64,
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let guard = ProfilerGuardBuilder::default()
.frequency(frequency)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()?;

sleep(Duration::from_secs(duration)).await;

let profile = guard.report().build()?.pprof()?;

let mut body = Vec::new();
let mut encoder = GzEncoder::new(&mut body, Compression::default());

profile.write_to_writer(&mut encoder)?;
encoder.finish()?;
Ok(body)
}

0 comments on commit 776965a

Please sign in to comment.