diff --git a/CHANGELOG.md b/CHANGELOG.md index c709863..2f19bae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ All notable changes to this project will be documented in this file. +## [unreleased] + +### 🚀 Features + +- Add initial support for pollcatch + +### ⚙️ Miscellaneous Tasks + +- Fix release-plz.toml + ## [0.1.0] - 2025-04-16 ### ⚙️ Miscellaneous Tasks diff --git a/Cargo.lock b/Cargo.lock index df10df2..06067f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -114,7 +114,7 @@ dependencies = [ [[package]] name = "async-profiler-agent" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "async-trait", @@ -124,6 +124,7 @@ dependencies = [ "chrono", "clap", "libloading", + "rand", "reqwest", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 0341951..0f7dc63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-profiler-agent" -version = "0.1.0" +version = "0.1.1" description = "Rust agent for async-profiler" license = "Apache-2.0" repository = "https://github.com/async-profiler/rust-agent" @@ -29,6 +29,7 @@ anyhow = "1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tokio = { version = "1", features = ["test-util", "full"] } test-case = "3" +rand = "0.9" [[example]] name = 'simple' diff --git a/README.md b/README.md index 3685276..45de189 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,19 @@ The metadata is not used by the agent directly, and only provided to the reporte [Fargate]: https://aws.amazon.com/fargate [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html +### PollCatch + +If you want to find long poll times, and you have `RUSTFLAGS="--cfg tokio_unstable"`, you can +emit `tokio.PollCatchV1` events this way: + +``` + #[cfg(tokio_unstable)] + { + rt.on_before_task_poll(|_| async_profiler_agent::pollcatch::before_poll_hook()) + .on_after_task_poll(|_| async_profiler_agent::pollcatch::after_poll_hook()); + } +``` + ## Security See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information. diff --git a/examples/simple.rs b/examples/simple/main.rs similarity index 74% rename from examples/simple.rs rename to examples/simple/main.rs index 00a3596..3be4440 100644 --- a/examples/simple.rs +++ b/examples/simple/main.rs @@ -10,6 +10,8 @@ use std::time::Duration; use aws_config::BehaviorVersion; use clap::Parser; +mod slow; + pub fn set_up_tracing() { use tracing_subscriber::{prelude::*, EnvFilter}; @@ -34,8 +36,21 @@ struct Args { bucket: String, } -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { +#[allow(unexpected_cfgs)] +pub fn main() -> anyhow::Result<()> { + let mut rt: tokio::runtime::Builder = tokio::runtime::Builder::new_multi_thread(); + rt.enable_all(); + + #[cfg(tokio_unstable)] + { + rt.on_before_task_poll(|_| async_profiler_agent::pollcatch::before_poll_hook()) + .on_after_task_poll(|_| async_profiler_agent::pollcatch::after_poll_hook()); + } + let rt = rt.build().unwrap(); + rt.block_on(main_internal()) +} + +async fn main_internal() -> Result<(), anyhow::Error> { set_up_tracing(); tracing::info!("main started"); @@ -58,16 +73,6 @@ async fn main() -> Result<(), anyhow::Error> { profiler.spawn()?; tracing::info!("profiler started"); - let sleep_secs = 6; - let sleep_duration = Duration::from_secs(sleep_secs); - let mut random_string: String = String::with_capacity(1); - loop { - random_string.push('a'); - - tracing::info!("inside loop: going to sleep {sleep_secs} seconds"); - tokio::time::sleep(sleep_duration).await; - tracing::info!("application woke up"); - - random_string.pop(); - } + slow::run().await; + Ok(()) } diff --git a/examples/simple/slow.rs b/examples/simple/slow.rs new file mode 100644 index 0000000..4ce4995 --- /dev/null +++ b/examples/simple/slow.rs @@ -0,0 +1,45 @@ +#[inline(never)] +#[allow(deprecated)] +fn accidentally_slow() { + std::thread::sleep_ms(10); + std::hint::black_box(0); +} + +#[inline(never)] +fn accidentally_slow_2() { + accidentally_slow(); + std::hint::black_box(0); +} + +#[inline(never)] +fn short_sleep() { + std::thread::sleep(std::time::Duration::from_micros(100)); + std::hint::black_box(0); +} + +#[inline(never)] +fn short_sleep_2() { + short_sleep(); + std::hint::black_box(0); +} + +pub async fn run() { + let mut ts: Vec> = vec![]; + for _ in 0..16 { + ts.push(tokio::task::spawn(async move { + loop { + tokio::task::yield_now().await; + // make sure most time is spent in `short_sleep_2`, + // but `accidentally_slow_2` will cause long polls. + if rand::random::() < 0.001 { + accidentally_slow_2(); + } else { + short_sleep_2(); + } + } + })); + } + for t in ts { + t.await.ok(); + } +} diff --git a/src/asprof/mod.rs b/src/asprof/mod.rs index 71159d8..3009c29 100644 --- a/src/asprof/mod.rs +++ b/src/asprof/mod.rs @@ -4,6 +4,7 @@ use std::{ ffi::{c_char, CStr, CString}, path::Path, + ptr::{self, addr_of}, sync::Arc, }; @@ -32,12 +33,65 @@ impl AsProfBuilder { } } +#[derive(Copy, Clone, Debug)] +pub struct UserJfrKey { + key: i32, +} + pub struct AsProf {} impl AsProf { pub fn builder() -> AsProfBuilder { AsProfBuilder::default() } + + /// Return the async-profiler's sample counter + pub fn get_sample_counter() -> Option { + unsafe { + let prof = raw::async_profiler().ok()?; + let thread_local_data = prof.asprof_get_thread_local_data.as_ref()?(); + if thread_local_data.is_null() { + None + } else { + Some(ptr::read_volatile(addr_of!( + (*thread_local_data).sample_counter + ))) + } + } + } + + /// Create a user JFR key from a given key + pub fn create_user_jfr_key(key: &CStr) -> Result { + unsafe { + let prof = raw::async_profiler()?; + let asprof_register_jfr_event = + prof.asprof_register_jfr_event.as_ref().ok_or_else(|| { + AsProfError::AsyncProfilerError( + "async-profiler does not support user JFR events".into(), + ) + })?; + let res = asprof_register_jfr_event(key.as_ptr()); + if res < 0 { + Err(AsProfError::AsyncProfilerError( + "unable to register JFR event".into(), + )) + } else { + Ok(UserJfrKey { key: res }) + } + } + } + + pub fn emit_user_jfr(key: UserJfrKey, jfr: &[u8]) -> Result<(), AsProfError> { + unsafe { + let prof = raw::async_profiler()?; + let asprof_emit_jfr_event = prof.asprof_emit_jfr_event.as_ref().ok_or_else(|| { + AsProfError::AsyncProfilerError( + "async-profiler does not support user JFR events".into(), + ) + })?; + Self::asprof_error(asprof_emit_jfr_event(key.key, jfr.as_ptr(), jfr.len())) + } + } } impl super::profiler::ProfilerEngine for AsProf { @@ -69,6 +123,25 @@ impl super::profiler::ProfilerEngine for AsProf { } impl AsProf { + /// convert an asprof_error_t to a Result + /// + /// SAFETY: response must be a valid asprof_error_t + unsafe fn asprof_error(response: raw::asprof_error_t) -> Result<(), AsProfError> { + if !response.is_null() { + let response = (raw::async_profiler()?.asprof_error_str)(response); + if response.is_null() { + return Ok(()); + } + let response = unsafe { CStr::from_ptr(response) }; + let response_str = response.to_string_lossy(); + tracing::error!("received error from async-profiler: {}", response_str); + Err(AsProfError::AsyncProfilerError(response_str.to_string())) + // TODO: stop the background thread in case there is an error + } else { + Ok(()) + } + } + fn asprof_execute(args: &str) -> Result<(), AsProfError> { unsafe extern "C" fn callback(buf: *const c_char, size: usize) { unsafe { @@ -85,17 +158,11 @@ impl AsProf { } let args_compatible = CString::new(args).unwrap(); - let response = unsafe { - (raw::async_profiler()?.asprof_execute)(args_compatible.as_ptr(), Some(callback)) - }; - if !response.is_null() { - let response = unsafe { CStr::from_ptr(response) }; - let response_str = response.to_string_lossy(); - tracing::error!("received error from async-profiler: {}", response_str); - Err(AsProfError::AsyncProfilerError(response_str.to_string())) - // TODO: stop the background thread in case there is an error - } else { - Ok(()) + unsafe { + Self::asprof_error((raw::async_profiler()?.asprof_execute)( + args_compatible.as_ptr(), + Some(callback), + )) } } } diff --git a/src/asprof/raw.rs b/src/asprof/raw.rs index db200c9..ae500f2 100644 --- a/src/asprof/raw.rs +++ b/src/asprof/raw.rs @@ -12,6 +12,15 @@ pub type asprof_error_t = *const std::ffi::c_char; #[allow(non_camel_case_types)] pub type asprof_writer_t = Option; +#[allow(non_camel_case_types)] +#[repr(C)] +pub struct asprof_thread_local_data { + pub sample_counter: u64, +} + +#[allow(non_camel_case_types)] +pub type asprof_jfr_event_key = std::ffi::c_int; + pub(crate) struct AsyncProfiler { pub(crate) asprof_init: libloading::Symbol<'static, unsafe extern "C" fn()>, pub(crate) asprof_execute: libloading::Symbol< @@ -21,6 +30,25 @@ pub(crate) struct AsyncProfiler { output_callback: asprof_writer_t, ) -> asprof_error_t, >, + pub(crate) asprof_error_str: libloading::Symbol< + 'static, + unsafe extern "C" fn(asprof_error_t) -> *const std::ffi::c_char, + >, + pub(crate) asprof_get_thread_local_data: Option< + libloading::Symbol<'static, unsafe extern "C" fn() -> *mut asprof_thread_local_data>, + >, + pub(crate) asprof_register_jfr_event: Option< + libloading::Symbol< + 'static, + unsafe extern "C" fn(*const std::ffi::c_char) -> asprof_jfr_event_key, + >, + >, + pub(crate) asprof_emit_jfr_event: Option< + libloading::Symbol< + 'static, + unsafe extern "C" fn(asprof_jfr_event_key, *const u8, usize) -> asprof_error_t, + >, + >, } // make sure libasyncProfiler.so is dlopen'd from a static, to avoid it being dlclose'd. @@ -39,6 +67,10 @@ static ASYNC_PROFILER: LazyLock>> = Ok(AsyncProfiler { asprof_init: lib.get(b"asprof_init")?, asprof_execute: lib.get(b"asprof_execute")?, + asprof_error_str: lib.get(b"asprof_error_str")?, + asprof_get_thread_local_data: lib.get(b"asprof_get_thread_local_data").ok(), + asprof_register_jfr_event: lib.get(b"asprof_register_jfr_event").ok(), + asprof_emit_jfr_event: lib.get(b"asprof_emit_jfr_event").ok(), }) } }); diff --git a/src/lib.rs b/src/lib.rs index 2df66bb..1136463 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,5 +3,6 @@ mod asprof; pub mod metadata; +pub mod pollcatch; pub mod profiler; pub mod reporter; diff --git a/src/pollcatch/mod.rs b/src/pollcatch/mod.rs new file mode 100644 index 0000000..efb04e5 --- /dev/null +++ b/src/pollcatch/mod.rs @@ -0,0 +1,61 @@ +use std::{ + cell::Cell, + sync::{ + atomic::{self, AtomicBool}, + LazyLock, + }, +}; + +mod tsc; + +use crate::asprof::{self, AsProf}; + +static POLLCATCH_JFR_KEY: LazyLock> = LazyLock::new(|| { + // Pollcatch V1 event contains: + // 8 byte little-endian "before" tsc timestamp + // 8 byte little-endian "after" tsc timestamp + AsProf::create_user_jfr_key(c"tokio.PollcatchV1") + .map_err(|e| { + tracing::warn!(message="error creating jfr key", error=?e); + }) + .ok() +}); + +static EMITTED_JFR_ERROR: AtomicBool = AtomicBool::new(false); + +#[cold] +#[inline(never)] +fn write_timestamp(before: u64) { + if let Some(key) = *POLLCATCH_JFR_KEY { + let end = tsc::now(); + let mut buf = [0u8; 16]; + + buf[0..8].copy_from_slice(&before.to_le_bytes()[..]); + buf[8..16].copy_from_slice(&end.to_le_bytes()[..]); + if let Err(e) = AsProf::emit_user_jfr(key, &buf) { + if !EMITTED_JFR_ERROR.swap(true, atomic::Ordering::Relaxed) { + tracing::warn!(message="error emitting jfr", error=?e); + } + } + } +} + +thread_local! { + static BEFORE_POLL_TIMESTAMP: Cell = const { Cell::new(0) }; + static BEFORE_POLL_SAMPLE_COUNTER: Cell = const { Cell::new(0) }; +} + +/// Call this in the Tokio before task hook +pub fn before_poll_hook() { + let before = tsc::now(); + BEFORE_POLL_TIMESTAMP.set(before); + BEFORE_POLL_SAMPLE_COUNTER.set(AsProf::get_sample_counter().unwrap_or(0)); +} + +/// Call this in the Tokio after task hook +pub fn after_poll_hook() { + let sample_counter = AsProf::get_sample_counter().unwrap_or(0); + if sample_counter != BEFORE_POLL_SAMPLE_COUNTER.get() { + write_timestamp(BEFORE_POLL_TIMESTAMP.get()); + } +} diff --git a/src/pollcatch/tsc.rs b/src/pollcatch/tsc.rs new file mode 100644 index 0000000..1fd7878 --- /dev/null +++ b/src/pollcatch/tsc.rs @@ -0,0 +1,32 @@ +/// Current timestamp, async-signal-safe +#[inline] +pub fn now() -> u64 { + _now() +} + +#[cfg(all(target_arch = "x86_64", target_feature = "sse2"))] +#[inline] +fn _now() -> u64 { + unsafe { ::core::arch::x86_64::_rdtsc() } +} + +#[cfg(target_arch = "aarch64")] +#[inline] +fn _now() -> u64 { + let count: u64; + + unsafe { + ::core::arch::asm!("mrs {}, cntvct_el0", out(reg) count); + } + + count +} + +#[cfg(not(any( + all(target_arch = "x86_64", target_feature = "sse2"), + target_arch = "aarch64", +)))] +#[inline] +fn _now() -> u64 { + 0 +}