Skip to content

Commit f532aae

Browse files
authored
Merge pull request #18 from async-profiler/pollcatch-v1
feat: add initial support for pollcatch
2 parents 5a565c6 + 7c1e836 commit f532aae

File tree

11 files changed

+295
-27
lines changed

11 files changed

+295
-27
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [unreleased]
6+
7+
### 🚀 Features
8+
9+
- Add initial support for pollcatch
10+
11+
### ⚙️ Miscellaneous Tasks
12+
13+
- Fix release-plz.toml
14+
515
## [0.1.0] - 2025-04-16
616

717
### ⚙️ Miscellaneous Tasks

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "async-profiler-agent"
3-
version = "0.1.0"
3+
version = "0.1.1"
44
description = "Rust agent for async-profiler"
55
license = "Apache-2.0"
66
repository = "https://github.com/async-profiler/rust-agent"
@@ -29,6 +29,7 @@ anyhow = "1"
2929
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
3030
tokio = { version = "1", features = ["test-util", "full"] }
3131
test-case = "3"
32+
rand = "0.9"
3233

3334
[[example]]
3435
name = 'simple'

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,19 @@ The metadata is not used by the agent directly, and only provided to the reporte
7171
[Fargate]: https://aws.amazon.com/fargate
7272
[IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
7373

74+
### PollCatch
75+
76+
If you want to find long poll times, and you have `RUSTFLAGS="--cfg tokio_unstable"`, you can
77+
emit `tokio.PollCatchV1` events this way:
78+
79+
```
80+
#[cfg(tokio_unstable)]
81+
{
82+
rt.on_before_task_poll(|_| async_profiler_agent::pollcatch::before_poll_hook())
83+
.on_after_task_poll(|_| async_profiler_agent::pollcatch::after_poll_hook());
84+
}
85+
```
86+
7487
## Security
7588

7689
See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information.

examples/simple.rs renamed to examples/simple/main.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use std::time::Duration;
1010
use aws_config::BehaviorVersion;
1111
use clap::Parser;
1212

13+
mod slow;
14+
1315
pub fn set_up_tracing() {
1416
use tracing_subscriber::{prelude::*, EnvFilter};
1517

@@ -34,8 +36,21 @@ struct Args {
3436
bucket: String,
3537
}
3638

37-
#[tokio::main]
38-
async fn main() -> Result<(), anyhow::Error> {
39+
#[allow(unexpected_cfgs)]
40+
pub fn main() -> anyhow::Result<()> {
41+
let mut rt: tokio::runtime::Builder = tokio::runtime::Builder::new_multi_thread();
42+
rt.enable_all();
43+
44+
#[cfg(tokio_unstable)]
45+
{
46+
rt.on_before_task_poll(|_| async_profiler_agent::pollcatch::before_poll_hook())
47+
.on_after_task_poll(|_| async_profiler_agent::pollcatch::after_poll_hook());
48+
}
49+
let rt = rt.build().unwrap();
50+
rt.block_on(main_internal())
51+
}
52+
53+
async fn main_internal() -> Result<(), anyhow::Error> {
3954
set_up_tracing();
4055
tracing::info!("main started");
4156

@@ -58,16 +73,6 @@ async fn main() -> Result<(), anyhow::Error> {
5873
profiler.spawn()?;
5974
tracing::info!("profiler started");
6075

61-
let sleep_secs = 6;
62-
let sleep_duration = Duration::from_secs(sleep_secs);
63-
let mut random_string: String = String::with_capacity(1);
64-
loop {
65-
random_string.push('a');
66-
67-
tracing::info!("inside loop: going to sleep {sleep_secs} seconds");
68-
tokio::time::sleep(sleep_duration).await;
69-
tracing::info!("application woke up");
70-
71-
random_string.pop();
72-
}
76+
slow::run().await;
77+
Ok(())
7378
}

examples/simple/slow.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#[inline(never)]
2+
#[allow(deprecated)]
3+
fn accidentally_slow() {
4+
std::thread::sleep_ms(10);
5+
std::hint::black_box(0);
6+
}
7+
8+
#[inline(never)]
9+
fn accidentally_slow_2() {
10+
accidentally_slow();
11+
std::hint::black_box(0);
12+
}
13+
14+
#[inline(never)]
15+
fn short_sleep() {
16+
std::thread::sleep(std::time::Duration::from_micros(100));
17+
std::hint::black_box(0);
18+
}
19+
20+
#[inline(never)]
21+
fn short_sleep_2() {
22+
short_sleep();
23+
std::hint::black_box(0);
24+
}
25+
26+
pub async fn run() {
27+
let mut ts: Vec<tokio::task::JoinHandle<()>> = vec![];
28+
for _ in 0..16 {
29+
ts.push(tokio::task::spawn(async move {
30+
loop {
31+
tokio::task::yield_now().await;
32+
// make sure most time is spent in `short_sleep_2`,
33+
// but `accidentally_slow_2` will cause long polls.
34+
if rand::random::<f64>() < 0.001 {
35+
accidentally_slow_2();
36+
} else {
37+
short_sleep_2();
38+
}
39+
}
40+
}));
41+
}
42+
for t in ts {
43+
t.await.ok();
44+
}
45+
}

src/asprof/mod.rs

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::{
55
ffi::{c_char, CStr, CString},
66
path::Path,
7+
ptr::{self, addr_of},
78
sync::Arc,
89
};
910

@@ -32,12 +33,65 @@ impl AsProfBuilder {
3233
}
3334
}
3435

36+
#[derive(Copy, Clone, Debug)]
37+
pub struct UserJfrKey {
38+
key: i32,
39+
}
40+
3541
pub struct AsProf {}
3642

3743
impl AsProf {
3844
pub fn builder() -> AsProfBuilder {
3945
AsProfBuilder::default()
4046
}
47+
48+
/// Return the async-profiler's sample counter
49+
pub fn get_sample_counter() -> Option<u64> {
50+
unsafe {
51+
let prof = raw::async_profiler().ok()?;
52+
let thread_local_data = prof.asprof_get_thread_local_data.as_ref()?();
53+
if thread_local_data.is_null() {
54+
None
55+
} else {
56+
Some(ptr::read_volatile(addr_of!(
57+
(*thread_local_data).sample_counter
58+
)))
59+
}
60+
}
61+
}
62+
63+
/// Create a user JFR key from a given key
64+
pub fn create_user_jfr_key(key: &CStr) -> Result<UserJfrKey, AsProfError> {
65+
unsafe {
66+
let prof = raw::async_profiler()?;
67+
let asprof_register_jfr_event =
68+
prof.asprof_register_jfr_event.as_ref().ok_or_else(|| {
69+
AsProfError::AsyncProfilerError(
70+
"async-profiler does not support user JFR events".into(),
71+
)
72+
})?;
73+
let res = asprof_register_jfr_event(key.as_ptr());
74+
if res < 0 {
75+
Err(AsProfError::AsyncProfilerError(
76+
"unable to register JFR event".into(),
77+
))
78+
} else {
79+
Ok(UserJfrKey { key: res })
80+
}
81+
}
82+
}
83+
84+
pub fn emit_user_jfr(key: UserJfrKey, jfr: &[u8]) -> Result<(), AsProfError> {
85+
unsafe {
86+
let prof = raw::async_profiler()?;
87+
let asprof_emit_jfr_event = prof.asprof_emit_jfr_event.as_ref().ok_or_else(|| {
88+
AsProfError::AsyncProfilerError(
89+
"async-profiler does not support user JFR events".into(),
90+
)
91+
})?;
92+
Self::asprof_error(asprof_emit_jfr_event(key.key, jfr.as_ptr(), jfr.len()))
93+
}
94+
}
4195
}
4296

4397
impl super::profiler::ProfilerEngine for AsProf {
@@ -69,6 +123,25 @@ impl super::profiler::ProfilerEngine for AsProf {
69123
}
70124

71125
impl AsProf {
126+
/// convert an asprof_error_t to a Result
127+
///
128+
/// SAFETY: response must be a valid asprof_error_t
129+
unsafe fn asprof_error(response: raw::asprof_error_t) -> Result<(), AsProfError> {
130+
if !response.is_null() {
131+
let response = (raw::async_profiler()?.asprof_error_str)(response);
132+
if response.is_null() {
133+
return Ok(());
134+
}
135+
let response = unsafe { CStr::from_ptr(response) };
136+
let response_str = response.to_string_lossy();
137+
tracing::error!("received error from async-profiler: {}", response_str);
138+
Err(AsProfError::AsyncProfilerError(response_str.to_string()))
139+
// TODO: stop the background thread in case there is an error
140+
} else {
141+
Ok(())
142+
}
143+
}
144+
72145
fn asprof_execute(args: &str) -> Result<(), AsProfError> {
73146
unsafe extern "C" fn callback(buf: *const c_char, size: usize) {
74147
unsafe {
@@ -85,17 +158,11 @@ impl AsProf {
85158
}
86159

87160
let args_compatible = CString::new(args).unwrap();
88-
let response = unsafe {
89-
(raw::async_profiler()?.asprof_execute)(args_compatible.as_ptr(), Some(callback))
90-
};
91-
if !response.is_null() {
92-
let response = unsafe { CStr::from_ptr(response) };
93-
let response_str = response.to_string_lossy();
94-
tracing::error!("received error from async-profiler: {}", response_str);
95-
Err(AsProfError::AsyncProfilerError(response_str.to_string()))
96-
// TODO: stop the background thread in case there is an error
97-
} else {
98-
Ok(())
161+
unsafe {
162+
Self::asprof_error((raw::async_profiler()?.asprof_execute)(
163+
args_compatible.as_ptr(),
164+
Some(callback),
165+
))
99166
}
100167
}
101168
}

src/asprof/raw.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@ pub type asprof_error_t = *const std::ffi::c_char;
1212
#[allow(non_camel_case_types)]
1313
pub type asprof_writer_t = Option<unsafe extern "C" fn(buf: *const std::ffi::c_char, size: usize)>;
1414

15+
#[allow(non_camel_case_types)]
16+
#[repr(C)]
17+
pub struct asprof_thread_local_data {
18+
pub sample_counter: u64,
19+
}
20+
21+
#[allow(non_camel_case_types)]
22+
pub type asprof_jfr_event_key = std::ffi::c_int;
23+
1524
pub(crate) struct AsyncProfiler {
1625
pub(crate) asprof_init: libloading::Symbol<'static, unsafe extern "C" fn()>,
1726
pub(crate) asprof_execute: libloading::Symbol<
@@ -21,6 +30,25 @@ pub(crate) struct AsyncProfiler {
2130
output_callback: asprof_writer_t,
2231
) -> asprof_error_t,
2332
>,
33+
pub(crate) asprof_error_str: libloading::Symbol<
34+
'static,
35+
unsafe extern "C" fn(asprof_error_t) -> *const std::ffi::c_char,
36+
>,
37+
pub(crate) asprof_get_thread_local_data: Option<
38+
libloading::Symbol<'static, unsafe extern "C" fn() -> *mut asprof_thread_local_data>,
39+
>,
40+
pub(crate) asprof_register_jfr_event: Option<
41+
libloading::Symbol<
42+
'static,
43+
unsafe extern "C" fn(*const std::ffi::c_char) -> asprof_jfr_event_key,
44+
>,
45+
>,
46+
pub(crate) asprof_emit_jfr_event: Option<
47+
libloading::Symbol<
48+
'static,
49+
unsafe extern "C" fn(asprof_jfr_event_key, *const u8, usize) -> asprof_error_t,
50+
>,
51+
>,
2452
}
2553

2654
// make sure libasyncProfiler.so is dlopen'd from a static, to avoid it being dlclose'd.
@@ -39,6 +67,10 @@ static ASYNC_PROFILER: LazyLock<Result<AsyncProfiler, Arc<libloading::Error>>> =
3967
Ok(AsyncProfiler {
4068
asprof_init: lib.get(b"asprof_init")?,
4169
asprof_execute: lib.get(b"asprof_execute")?,
70+
asprof_error_str: lib.get(b"asprof_error_str")?,
71+
asprof_get_thread_local_data: lib.get(b"asprof_get_thread_local_data").ok(),
72+
asprof_register_jfr_event: lib.get(b"asprof_register_jfr_event").ok(),
73+
asprof_emit_jfr_event: lib.get(b"asprof_emit_jfr_event").ok(),
4274
})
4375
}
4476
});

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33

44
mod asprof;
55
pub mod metadata;
6+
pub mod pollcatch;
67
pub mod profiler;
78
pub mod reporter;

0 commit comments

Comments
 (0)