Skip to content

Commit

Permalink
bug(tracing): register_callsite doesn't get called
Browse files Browse the repository at this point in the history
This branch contains code to reproduce an issue where 2 thread local
subscribers interfer so that when the first one takes too long to
complete `register_callsite()`, the second subscriber doesn't have
`register_callsite()` called at all.

It also contains an attempt at a fix by removing a race condition in
`DefaultCallsite::register`. This fix may be necessary anyway, but it
isn't sufficient.

The issue for this problem is #2743.
  • Loading branch information
hds committed Oct 10, 2023
1 parent 7f0cc09 commit 5f7c1b0
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 25 deletions.
54 changes: 29 additions & 25 deletions tracing-core/src/callsite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,31 +310,35 @@ impl DefaultCallsite {
// This only happens once (or if the cached interest value was corrupted).
#[cold]
pub fn register(&'static self) -> Interest {
// Attempt to advance the registration state to `REGISTERING`...
match self.registration.compare_exchange(
Self::UNREGISTERED,
Self::REGISTERING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Okay, we advanced the state, try to register the callsite.
rebuild_callsite_interest(self, &DISPATCHERS.rebuilder());
CALLSITES.push_default(self);
self.registration.store(Self::REGISTERED, Ordering::Release);
}
// Great, the callsite is already registered! Just load its
// previous cached interest.
Err(Self::REGISTERED) => {}
// Someone else is registering...
Err(_state) => {
debug_assert_eq!(
_state,
Self::REGISTERING,
"weird callsite registration state"
);
// Just hit `enabled` this time.
return Interest::sometimes();
loop {
// Attempt to advance the registration state to `REGISTERING`...
match self.registration.compare_exchange(
Self::UNREGISTERED,
Self::REGISTERING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Okay, we advanced the state, try to register the callsite.
rebuild_callsite_interest(self, &DISPATCHERS.rebuilder());
CALLSITES.push_default(self);
self.registration.store(Self::REGISTERED, Ordering::Release);
break;
}
// Great, the callsite is already registered! Just load its
// previous cached interest.
Err(Self::REGISTERED) => break,
// Someone else is registering...
Err(_state) => {
debug_assert_eq!(
_state,
Self::REGISTERING,
"weird callsite registration state: {_state}"

Check failure on line 336 in tracing-core/src/callsite.rs

View workflow job for this annotation

GitHub Actions / cargo check (+MSRV -Zminimal-versions) (tracing-attributes, stable)

there is no argument named `_state`

Check failure on line 336 in tracing-core/src/callsite.rs

View workflow job for this annotation

GitHub Actions / cargo check (+MSRV -Zminimal-versions) (tracing-core, stable)

there is no argument named `_state`

Check failure on line 336 in tracing-core/src/callsite.rs

View workflow job for this annotation

GitHub Actions / cargo check (+MSRV -Zminimal-versions) (tracing-futures, stable)

there is no argument named `_state`

Check failure on line 336 in tracing-core/src/callsite.rs

View workflow job for this annotation

GitHub Actions / cargo check (+MSRV -Zminimal-versions) (tracing, stable)

there is no argument named `_state`

Check failure on line 336 in tracing-core/src/callsite.rs

View workflow job for this annotation

GitHub Actions / cargo check (+MSRV -Zminimal-versions) (tracing-log, 1.56.0)

there is no argument named `_state`
);
// The callsite is being registered. We have to wait until
// registration is finished, otherwise
continue;
}
}
}

Expand Down
98 changes: 98 additions & 0 deletions tracing/tests/missed_register_callsite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
use std::{
ptr,
sync::atomic::{AtomicPtr, Ordering},
thread::{self, JoinHandle},
time::Duration,
};

use tracing::Subscriber;
use tracing_core::{span, Metadata};

struct TestSubscriber {
creator_thread: String,
sleep: Duration,
callsite: AtomicPtr<Metadata<'static>>,
}

impl TestSubscriber {
fn new(sleep_micros: u64) -> Self {
let creator_thread = thread::current()
.name()
.unwrap_or("<unknown thread>")
.to_owned();
Self {
creator_thread,
sleep: Duration::from_micros(sleep_micros),
callsite: AtomicPtr::new(ptr::null_mut()),
}
}
}

impl Subscriber for TestSubscriber {
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest {
if !self.sleep.is_zero() {
thread::sleep(self.sleep);
}

self.callsite
.store(metadata as *const _ as *mut _, Ordering::SeqCst);
println!(
"{creator}: register_callsite: {callsite:#?}",
creator = self.creator_thread,
callsite = metadata as *const _,
);
tracing_core::Interest::always()
}

fn event(&self, event: &tracing_core::Event<'_>) {
let stored_callsite = self.callsite.load(Ordering::SeqCst);
let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _;

println!(
"{creator}: event (with callsite): {event_callsite:#?} (stored callsite: {stored_callsite:#?})",
creator = self.creator_thread,
);

// This assert is the actual test.
// assert_eq!(
// stored_callsite, event_callsite,
// "stored callsite: {stored_callsite:#?} does not match event \
// callsite: {event_callsite:#?}. Was `event` called before \
// `register_callsite`?"
// );
}

fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id {
span::Id::from_u64(0)
}
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {}
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {}
fn enter(&self, _span: &tracing_core::span::Id) {}
fn exit(&self, _span: &tracing_core::span::Id) {}
}

#[test]
fn event_before_register() {
fn subscriber_thread(idx: usize, sleep_micros: u64) -> JoinHandle<()> {
thread::Builder::new()
.name(format!("subscriber-{idx}"))
.spawn(move || {
let subscriber = TestSubscriber::new(sleep_micros);
let _subscriber_guard = tracing::subscriber::set_default(subscriber);

tracing::info!("event-from-{idx}", idx = idx);
thread::sleep(Duration::from_millis(100));
})
.expect("failed to spawn thread")
}

let register_sleep_micros = 50;
let jh1 = subscriber_thread(1, register_sleep_micros);
let jh2 = subscriber_thread(2, 0);

jh1.join().expect("failed to join thread");
jh2.join().expect("failed to join thread");
}

0 comments on commit 5f7c1b0

Please sign in to comment.