Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

All notable changes to this project will be documented in this file.

## [unreleased]

### 🚀 Features

- Add initial support for pollcatch

### ⚙️ Miscellaneous Tasks

- Fix release-plz.toml

## [0.1.0] - 2025-04-16

### ⚙️ Miscellaneous Tasks
Expand Down
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "async-profiler-agent"
version = "0.1.0"
version = "0.1.1"
description = "Rust agent for async-profiler"
license = "Apache-2.0"
repository = "https://github.com/async-profiler/rust-agent"
Expand Down Expand Up @@ -29,6 +29,7 @@ anyhow = "1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1", features = ["test-util", "full"] }
test-case = "3"
rand = "0.9"

[[example]]
name = 'simple'
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,19 @@ The metadata is not used by the agent directly, and only provided to the reporte
[Fargate]: https://aws.amazon.com/fargate
[IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html

### PollCatch

If you want to find long poll times, and you have `RUSTFLAGS="--cfg tokio_unstable"`, you can
emit `tokio.PollCatchV1` events this way:

```
#[cfg(tokio_unstable)]
{
rt.on_before_task_poll(|_| async_profiler_agent::pollcatch::before_poll_hook())
.on_after_task_poll(|_| async_profiler_agent::pollcatch::after_poll_hook());
Comment on lines +81 to +83
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has to be set in the builder right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea

}
```

## Security

See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information.
Expand Down
33 changes: 19 additions & 14 deletions examples/simple.rs → examples/simple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use std::time::Duration;
use aws_config::BehaviorVersion;
use clap::Parser;

mod slow;

pub fn set_up_tracing() {
use tracing_subscriber::{prelude::*, EnvFilter};

Expand All @@ -34,8 +36,21 @@ struct Args {
bucket: String,
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
#[allow(unexpected_cfgs)]
pub fn main() -> anyhow::Result<()> {
let mut rt: tokio::runtime::Builder = tokio::runtime::Builder::new_multi_thread();
rt.enable_all();
Comment on lines +40 to +42
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you file an issue for turning an example into an integration test that we can run to validate future changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened #19


#[cfg(tokio_unstable)]
{
rt.on_before_task_poll(|_| async_profiler_agent::pollcatch::before_poll_hook())
.on_after_task_poll(|_| async_profiler_agent::pollcatch::after_poll_hook());
}
let rt = rt.build().unwrap();
rt.block_on(main_internal())
}

async fn main_internal() -> Result<(), anyhow::Error> {
set_up_tracing();
tracing::info!("main started");

Expand All @@ -58,16 +73,6 @@ async fn main() -> Result<(), anyhow::Error> {
profiler.spawn()?;
tracing::info!("profiler started");

let sleep_secs = 6;
let sleep_duration = Duration::from_secs(sleep_secs);
let mut random_string: String = String::with_capacity(1);
loop {
random_string.push('a');

tracing::info!("inside loop: going to sleep {sleep_secs} seconds");
tokio::time::sleep(sleep_duration).await;
tracing::info!("application woke up");

random_string.pop();
}
slow::run().await;
Ok(())
}
45 changes: 45 additions & 0 deletions examples/simple/slow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#[inline(never)]
#[allow(deprecated)]
fn accidentally_slow() {
std::thread::sleep_ms(10);
std::hint::black_box(0);
}

#[inline(never)]
fn accidentally_slow_2() {
accidentally_slow();
std::hint::black_box(0);
}

#[inline(never)]
fn short_sleep() {
std::thread::sleep(std::time::Duration::from_micros(100));
std::hint::black_box(0);
}

#[inline(never)]
fn short_sleep_2() {
short_sleep();
std::hint::black_box(0);
}

pub async fn run() {
let mut ts: Vec<tokio::task::JoinHandle<()>> = vec![];
for _ in 0..16 {
ts.push(tokio::task::spawn(async move {
loop {
tokio::task::yield_now().await;
// make sure most time is spent in `short_sleep_2`,
// but `accidentally_slow_2` will cause long polls.
if rand::random::<f64>() < 0.001 {
accidentally_slow_2();
} else {
short_sleep_2();
}
}
}));
}
for t in ts {
t.await.ok();
}
}
89 changes: 78 additions & 11 deletions src/asprof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{
ffi::{c_char, CStr, CString},
path::Path,
ptr::{self, addr_of},
sync::Arc,
};

Expand Down Expand Up @@ -32,12 +33,65 @@ impl AsProfBuilder {
}
}

#[derive(Copy, Clone, Debug)]
pub struct UserJfrKey {
key: i32,
}

pub struct AsProf {}

impl AsProf {
pub fn builder() -> AsProfBuilder {
AsProfBuilder::default()
}

/// Return the async-profiler's sample counter
pub fn get_sample_counter() -> Option<u64> {
unsafe {
let prof = raw::async_profiler().ok()?;
let thread_local_data = prof.asprof_get_thread_local_data.as_ref()?();
if thread_local_data.is_null() {
None
} else {
Some(ptr::read_volatile(addr_of!(
(*thread_local_data).sample_counter
)))
}
}
}

/// Create a user JFR key from a given key
pub fn create_user_jfr_key(key: &CStr) -> Result<UserJfrKey, AsProfError> {
unsafe {
let prof = raw::async_profiler()?;
let asprof_register_jfr_event =
prof.asprof_register_jfr_event.as_ref().ok_or_else(|| {
AsProfError::AsyncProfilerError(
"async-profiler does not support user JFR events".into(),
)
})?;
let res = asprof_register_jfr_event(key.as_ptr());
if res < 0 {
Err(AsProfError::AsyncProfilerError(
"unable to register JFR event".into(),
))
} else {
Ok(UserJfrKey { key: res })
}
}
}

pub fn emit_user_jfr(key: UserJfrKey, jfr: &[u8]) -> Result<(), AsProfError> {
unsafe {
let prof = raw::async_profiler()?;
let asprof_emit_jfr_event = prof.asprof_emit_jfr_event.as_ref().ok_or_else(|| {
AsProfError::AsyncProfilerError(
"async-profiler does not support user JFR events".into(),
)
})?;
Self::asprof_error(asprof_emit_jfr_event(key.key, jfr.as_ptr(), jfr.len()))
}
}
}

impl super::profiler::ProfilerEngine for AsProf {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsProf — I assume this means AsyncProfiler but since As is an english word common in Rust this is a bit confusing. Consider renaming to AsyncProfiler

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was copied from the previous agent. I can rename.

Expand Down Expand Up @@ -69,6 +123,25 @@ impl super::profiler::ProfilerEngine for AsProf {
}

impl AsProf {
/// convert an asprof_error_t to a Result
///
/// SAFETY: response must be a valid asprof_error_t
unsafe fn asprof_error(response: raw::asprof_error_t) -> Result<(), AsProfError> {
if !response.is_null() {
let response = (raw::async_profiler()?.asprof_error_str)(response);
if response.is_null() {
return Ok(());
}
let response = unsafe { CStr::from_ptr(response) };
let response_str = response.to_string_lossy();
tracing::error!("received error from async-profiler: {}", response_str);
Err(AsProfError::AsyncProfilerError(response_str.to_string()))
// TODO: stop the background thread in case there is an error
} else {
Ok(())
}
}

fn asprof_execute(args: &str) -> Result<(), AsProfError> {
unsafe extern "C" fn callback(buf: *const c_char, size: usize) {
unsafe {
Expand All @@ -85,17 +158,11 @@ impl AsProf {
}

let args_compatible = CString::new(args).unwrap();
let response = unsafe {
(raw::async_profiler()?.asprof_execute)(args_compatible.as_ptr(), Some(callback))
};
if !response.is_null() {
let response = unsafe { CStr::from_ptr(response) };
let response_str = response.to_string_lossy();
tracing::error!("received error from async-profiler: {}", response_str);
Err(AsProfError::AsyncProfilerError(response_str.to_string()))
// TODO: stop the background thread in case there is an error
} else {
Ok(())
unsafe {
Self::asprof_error((raw::async_profiler()?.asprof_execute)(
args_compatible.as_ptr(),
Some(callback),
))
}
}
}
32 changes: 32 additions & 0 deletions src/asprof/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ pub type asprof_error_t = *const std::ffi::c_char;
#[allow(non_camel_case_types)]
pub type asprof_writer_t = Option<unsafe extern "C" fn(buf: *const std::ffi::c_char, size: usize)>;

#[allow(non_camel_case_types)]
#[repr(C)]
pub struct asprof_thread_local_data {
pub sample_counter: u64,
}

#[allow(non_camel_case_types)]
pub type asprof_jfr_event_key = std::ffi::c_int;

pub(crate) struct AsyncProfiler {
pub(crate) asprof_init: libloading::Symbol<'static, unsafe extern "C" fn()>,
pub(crate) asprof_execute: libloading::Symbol<
Expand All @@ -21,6 +30,25 @@ pub(crate) struct AsyncProfiler {
output_callback: asprof_writer_t,
) -> asprof_error_t,
>,
pub(crate) asprof_error_str: libloading::Symbol<
'static,
unsafe extern "C" fn(asprof_error_t) -> *const std::ffi::c_char,
>,
pub(crate) asprof_get_thread_local_data: Option<
libloading::Symbol<'static, unsafe extern "C" fn() -> *mut asprof_thread_local_data>,
>,
pub(crate) asprof_register_jfr_event: Option<
libloading::Symbol<
'static,
unsafe extern "C" fn(*const std::ffi::c_char) -> asprof_jfr_event_key,
>,
>,
pub(crate) asprof_emit_jfr_event: Option<
libloading::Symbol<
'static,
unsafe extern "C" fn(asprof_jfr_event_key, *const u8, usize) -> asprof_error_t,
>,
>,
}

// make sure libasyncProfiler.so is dlopen'd from a static, to avoid it being dlclose'd.
Expand All @@ -39,6 +67,10 @@ static ASYNC_PROFILER: LazyLock<Result<AsyncProfiler, Arc<libloading::Error>>> =
Ok(AsyncProfiler {
asprof_init: lib.get(b"asprof_init")?,
asprof_execute: lib.get(b"asprof_execute")?,
asprof_error_str: lib.get(b"asprof_error_str")?,
asprof_get_thread_local_data: lib.get(b"asprof_get_thread_local_data").ok(),
asprof_register_jfr_event: lib.get(b"asprof_register_jfr_event").ok(),
asprof_emit_jfr_event: lib.get(b"asprof_emit_jfr_event").ok(),
})
}
});
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@

mod asprof;
pub mod metadata;
pub mod pollcatch;
pub mod profiler;
pub mod reporter;
Loading
Loading