From 5f7c1b07e4e35cdaa261ae8d7dcb7730edbe949f Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 10 Oct 2023 16:14:05 +0200 Subject: [PATCH] bug(tracing): `register_callsite` doesn't get called This branch contains code to reproduce an issue where 2 thread local subscribers interfer so that when the first one takes too long to complete `register_callsite()`, the second subscriber doesn't have `register_callsite()` called at all. It also contains an attempt at a fix by removing a race condition in `DefaultCallsite::register`. This fix may be necessary anyway, but it isn't sufficient. The issue for this problem is #2743. --- tracing-core/src/callsite.rs | 54 +++++++------ tracing/tests/missed_register_callsite.rs | 98 +++++++++++++++++++++++ 2 files changed, 127 insertions(+), 25 deletions(-) create mode 100644 tracing/tests/missed_register_callsite.rs diff --git a/tracing-core/src/callsite.rs b/tracing-core/src/callsite.rs index f887132364..57b6256a56 100644 --- a/tracing-core/src/callsite.rs +++ b/tracing-core/src/callsite.rs @@ -310,31 +310,35 @@ impl DefaultCallsite { // This only happens once (or if the cached interest value was corrupted). #[cold] pub fn register(&'static self) -> Interest { - // Attempt to advance the registration state to `REGISTERING`... - match self.registration.compare_exchange( - Self::UNREGISTERED, - Self::REGISTERING, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Okay, we advanced the state, try to register the callsite. - rebuild_callsite_interest(self, &DISPATCHERS.rebuilder()); - CALLSITES.push_default(self); - self.registration.store(Self::REGISTERED, Ordering::Release); - } - // Great, the callsite is already registered! Just load its - // previous cached interest. - Err(Self::REGISTERED) => {} - // Someone else is registering... - Err(_state) => { - debug_assert_eq!( - _state, - Self::REGISTERING, - "weird callsite registration state" - ); - // Just hit `enabled` this time. - return Interest::sometimes(); + loop { + // Attempt to advance the registration state to `REGISTERING`... + match self.registration.compare_exchange( + Self::UNREGISTERED, + Self::REGISTERING, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Okay, we advanced the state, try to register the callsite. + rebuild_callsite_interest(self, &DISPATCHERS.rebuilder()); + CALLSITES.push_default(self); + self.registration.store(Self::REGISTERED, Ordering::Release); + break; + } + // Great, the callsite is already registered! Just load its + // previous cached interest. + Err(Self::REGISTERED) => break, + // Someone else is registering... + Err(_state) => { + debug_assert_eq!( + _state, + Self::REGISTERING, + "weird callsite registration state: {_state}" + ); + // The callsite is being registered. We have to wait until + // registration is finished, otherwise + continue; + } } } diff --git a/tracing/tests/missed_register_callsite.rs b/tracing/tests/missed_register_callsite.rs new file mode 100644 index 0000000000..552e9ea747 --- /dev/null +++ b/tracing/tests/missed_register_callsite.rs @@ -0,0 +1,98 @@ +use std::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, + thread::{self, JoinHandle}, + time::Duration, +}; + +use tracing::Subscriber; +use tracing_core::{span, Metadata}; + +struct TestSubscriber { + creator_thread: String, + sleep: Duration, + callsite: AtomicPtr>, +} + +impl TestSubscriber { + fn new(sleep_micros: u64) -> Self { + let creator_thread = thread::current() + .name() + .unwrap_or("") + .to_owned(); + Self { + creator_thread, + sleep: Duration::from_micros(sleep_micros), + callsite: AtomicPtr::new(ptr::null_mut()), + } + } +} + +impl Subscriber for TestSubscriber { + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { + if !self.sleep.is_zero() { + thread::sleep(self.sleep); + } + + self.callsite + .store(metadata as *const _ as *mut _, Ordering::SeqCst); + println!( + "{creator}: register_callsite: {callsite:#?}", + creator = self.creator_thread, + callsite = metadata as *const _, + ); + tracing_core::Interest::always() + } + + fn event(&self, event: &tracing_core::Event<'_>) { + let stored_callsite = self.callsite.load(Ordering::SeqCst); + let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; + + println!( + "{creator}: event (with callsite): {event_callsite:#?} (stored callsite: {stored_callsite:#?})", + creator = self.creator_thread, + ); + + // This assert is the actual test. + // assert_eq!( + // stored_callsite, event_callsite, + // "stored callsite: {stored_callsite:#?} does not match event \ + // callsite: {event_callsite:#?}. Was `event` called before \ + // `register_callsite`?" + // ); + } + + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { + span::Id::from_u64(0) + } + fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} + fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} + fn enter(&self, _span: &tracing_core::span::Id) {} + fn exit(&self, _span: &tracing_core::span::Id) {} +} + +#[test] +fn event_before_register() { + fn subscriber_thread(idx: usize, sleep_micros: u64) -> JoinHandle<()> { + thread::Builder::new() + .name(format!("subscriber-{idx}")) + .spawn(move || { + let subscriber = TestSubscriber::new(sleep_micros); + let _subscriber_guard = tracing::subscriber::set_default(subscriber); + + tracing::info!("event-from-{idx}", idx = idx); + thread::sleep(Duration::from_millis(100)); + }) + .expect("failed to spawn thread") + } + + let register_sleep_micros = 50; + let jh1 = subscriber_thread(1, register_sleep_micros); + let jh2 = subscriber_thread(2, 0); + + jh1.join().expect("failed to join thread"); + jh2.join().expect("failed to join thread"); +}