From 51d7d9e17fc9210afeb28988cfd54d2a04c3b843 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 16 Apr 2024 15:43:49 +0200 Subject: [PATCH] core: fix missed `register_callsite` error There are 2 triggers which will cause a subscriber to receive a call to `Subscriber::register_callsite` for a specific callsite. 1. The first time the event or span at that callsite is executed. 2. When a new subscriber is added or removed (for example, calls to `set_default` or `with_default`) It is trigger (2) that will cause a new subscriber to receive `Subscriber::register_callsite` for all the callsites which had already been registered before it became active. When a callsite is registered for trigger (1), the callsite starts in state `UNREGISTERED`. The first thread to encounter the callsite will transition it to `REGISTERING` and determine the overall interest for the callsite by registering with all known dispatchers (which will call into `Subscriber::register_callsite`). Once that is complete, the callsite is added to the list of all known callsites and its state is transitioned to `REGISTERED`. is (re)built for all known dispatchers. The callsite starts in state `UNREGISTERED`. The This calls down into `Subscriber::register_callsite` for each subscriber. Once that is complete, the callsite is added to the global list of known callsites. While the callsite interest is being rebuilt, other threads that encounter the callsite will be given `Interest::sometimes()` until the registration is complete. However, if a new subscriber is added during this window, all the interest for all callsites will be rebuilt, but because the new callsite (in state `REGISTERING`) won't be included because it isn't yet in the global list of callsites. This can cause a case where that new subscriber being added won't receive `Subscriber::register_callsite` before it receives the subsequent call to `Subscriber::event` or `Subscriber::new_span`. The documentation on [Registering Callsites] is not very explicit on this point, but it does suggest that `Subscriber::register_callsite` will be called before the call to either `Subscriber::event` or `Subscriber::new_span`, and the current behavior can break this implicit contract. [Registering Callsites]: https://docs.rs/tracing-core/0.1.32/tracing_core/callsite/index.html#registering-callsites This change swaps the order of rebuilding the callsite interest and adding the callsite to the global list so that the callsite gets pushed first, avoiding this window in which a subscriber won't get a call to `register_callsite`. As such, a callsite may have its interest read before it is set. In this case, the existing implementation will return `Interest::sometimes()` for the `DefaultCallsite` implementation. Other implementations (outside of the `tracing` project) may perform this differently, but in this case, there is no documented guarantee regarding the ordering. A regression test is included which provokes the race condition 100% of the time before the changes in this fix. Fixes: #2743 --- tracing-core/src/callsite.rs | 9 +- tracing-core/src/dispatcher.rs | 1 + .../tests/missed_register_callsite.rs | 125 ++++++++++++++++++ tracing/tests/missed_register_callsite.rs | 109 +++++++++++++++ 4 files changed, 239 insertions(+), 5 deletions(-) create mode 100644 tracing-core/tests/missed_register_callsite.rs create mode 100644 tracing/tests/missed_register_callsite.rs diff --git a/tracing-core/src/callsite.rs b/tracing-core/src/callsite.rs index 62fa8c4af6..68b4ecb488 100644 --- a/tracing-core/src/callsite.rs +++ b/tracing-core/src/callsite.rs @@ -235,8 +235,6 @@ pub fn rebuild_interest_cache() { /// [`Callsite`]: crate::callsite::Callsite /// [reg-docs]: crate::callsite#registering-callsites pub fn register(callsite: &'static dyn Callsite) { - rebuild_callsite_interest(callsite, &DISPATCHERS.rebuilder()); - // Is this a `DefaultCallsite`? If so, use the fancy linked list! if callsite.private_type_id(private::Private(())).0 == TypeId::of::() { let callsite = unsafe { @@ -248,10 +246,11 @@ pub fn register(callsite: &'static dyn Callsite) { &*(callsite as *const dyn Callsite as *const DefaultCallsite) }; CALLSITES.push_default(callsite); - return; + } else { + CALLSITES.push_dyn(callsite); } - CALLSITES.push_dyn(callsite); + rebuild_callsite_interest(callsite, &DISPATCHERS.rebuilder()); } static CALLSITES: Callsites = Callsites { @@ -317,8 +316,8 @@ impl DefaultCallsite { ) { Ok(_) => { // Okay, we advanced the state, try to register the callsite. - rebuild_callsite_interest(self, &DISPATCHERS.rebuilder()); CALLSITES.push_default(self); + rebuild_callsite_interest(self, &DISPATCHERS.rebuilder()); self.registration.store(Self::REGISTERED, Ordering::Release); } // Great, the callsite is already registered! Just load its diff --git a/tracing-core/src/dispatcher.rs b/tracing-core/src/dispatcher.rs index de02afb790..babce49a88 100644 --- a/tracing-core/src/dispatcher.rs +++ b/tracing-core/src/dispatcher.rs @@ -455,6 +455,7 @@ fn get_global() -> &'static Dispatch { unsafe { // This is safe given the invariant that setting the global dispatcher // also sets `GLOBAL_INIT` to `INITIALIZED`. + #[allow(static_mut_refs)] &GLOBAL_DISPATCH } } diff --git a/tracing-core/tests/missed_register_callsite.rs b/tracing-core/tests/missed_register_callsite.rs new file mode 100644 index 0000000000..1532082d57 --- /dev/null +++ b/tracing-core/tests/missed_register_callsite.rs @@ -0,0 +1,125 @@ +use std::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, + thread::{self, JoinHandle}, + time::Duration, +}; + +use tracing_core::{ + callsite::{Callsite as _, DefaultCallsite}, + dispatcher::set_default, + field::{FieldSet, Value}, + span, Dispatch, Event, Kind, Level, Metadata, Subscriber, +}; + +struct TestSubscriber { + sleep: Duration, + callsite: AtomicPtr>, +} + +impl TestSubscriber { + fn new(sleep_micros: u64) -> Self { + Self { + sleep: Duration::from_micros(sleep_micros), + callsite: AtomicPtr::new(ptr::null_mut()), + } + } +} + +impl Subscriber for TestSubscriber { + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { + if !self.sleep.is_zero() { + thread::sleep(self.sleep); + } + + self.callsite + .store(metadata as *const _ as *mut _, Ordering::SeqCst); + + tracing_core::Interest::always() + } + + fn event(&self, event: &tracing_core::Event<'_>) { + let stored_callsite = self.callsite.load(Ordering::SeqCst); + let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; + + // This assert is the actual test. + assert_eq!( + stored_callsite, event_callsite, + "stored callsite: {stored_callsite:#?} does not match event \ + callsite: {event_callsite:#?}. Was `event` called before \ + `register_callsite`?" + ); + } + + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { + span::Id::from_u64(0) + } + fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} + fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} + fn enter(&self, _span: &tracing_core::span::Id) {} + fn exit(&self, _span: &tracing_core::span::Id) {} +} + +fn subscriber_thread(idx: usize, register_sleep_micros: u64) -> JoinHandle<()> { + thread::Builder::new() + .name(format!("subscriber-{idx}")) + .spawn(move || { + // We use a sleep to ensure the starting order of the 2 threads. + let subscriber = TestSubscriber::new(register_sleep_micros); + let _dispatch_guard = set_default(&Dispatch::new(subscriber)); + + static CALLSITE: DefaultCallsite = { + // The values of the metadata are unimportant + static META: Metadata<'static> = Metadata::new( + "event ", + "module::path", + Level::INFO, + None, + None, + None, + FieldSet::new(&["message"], tracing_core::callsite::Identifier(&CALLSITE)), + Kind::EVENT, + ); + DefaultCallsite::new(&META) + }; + let _interest = CALLSITE.interest(); + + let meta = CALLSITE.metadata(); + let field = meta.fields().field("message").unwrap(); + let message = format!("event-from-{idx}", idx = idx); + let values = [(&field, Some(&message as &dyn Value))]; + let value_set = CALLSITE.metadata().fields().value_set(&values); + + Event::dispatch(meta, &value_set); + + // Wait a bit for everything to end (we don't want to remove the subscriber + // immediately because that will influence the test). + thread::sleep(Duration::from_millis(10)); + }) + .expect("failed to spawn thread") +} + +/// Regression test for missing register_callsite call (#2743) +/// +/// This test provokes the race condition which causes the second subscriber to not receive a +/// call to `register_callsite` before it receives a call to `event`. +/// +/// Because the test depends on the interaction of multiple dispatchers in different threads, +/// it needs to be in a test file by itself. +#[test] +fn event_before_register() { + let subscriber_1_register_sleep_micros = 100; + let subscriber_2_register_sleep_micros = 0; + + let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros); + + // This delay ensures that the event callsite has interest() called first. + thread::sleep(Duration::from_micros(50)); + let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros); + + jh1.join().expect("failed to join thread"); + jh2.join().expect("failed to join thread"); +} diff --git a/tracing/tests/missed_register_callsite.rs b/tracing/tests/missed_register_callsite.rs new file mode 100644 index 0000000000..d14db125a6 --- /dev/null +++ b/tracing/tests/missed_register_callsite.rs @@ -0,0 +1,109 @@ +use std::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, + thread::{self, JoinHandle}, + time::Duration, +}; + +use tracing::Subscriber; +use tracing_core::{span, Metadata}; + +struct TestSubscriber { + creator_thread: String, + sleep: Duration, + callsite: AtomicPtr>, +} + +impl TestSubscriber { + fn new(sleep_micros: u64) -> Self { + let creator_thread = thread::current() + .name() + .unwrap_or("") + .to_owned(); + Self { + creator_thread, + sleep: Duration::from_micros(sleep_micros), + callsite: AtomicPtr::new(ptr::null_mut()), + } + } +} + +impl Subscriber for TestSubscriber { + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { + if !self.sleep.is_zero() { + thread::sleep(self.sleep); + } + + self.callsite + .store(metadata as *const _ as *mut _, Ordering::SeqCst); + println!( + "{creator} from {thread:?}: register_callsite: {callsite:#?}", + creator = self.creator_thread, + callsite = metadata as *const _, + thread = thread::current().name(), + ); + tracing_core::Interest::always() + } + + fn event(&self, event: &tracing_core::Event<'_>) { + let stored_callsite = self.callsite.load(Ordering::SeqCst); + let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; + + println!( + "{creator} from {thread:?}: event (with callsite): {event_callsite:#?} (stored callsite: {stored_callsite:#?})", + creator = self.creator_thread, + thread = thread::current().name(), + ); + + // This assert is the actual test. + assert_eq!( + stored_callsite, event_callsite, + "stored callsite: {stored_callsite:#?} does not match event \ + callsite: {event_callsite:#?}. Was `event` called before \ + `register_callsite`?" + ); + } + + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { + span::Id::from_u64(0) + } + fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} + fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} + fn enter(&self, _span: &tracing_core::span::Id) {} + fn exit(&self, _span: &tracing_core::span::Id) {} +} + +fn subscriber_thread(idx: usize, register_sleep_micros: u64) -> JoinHandle<()> { + thread::Builder::new() + .name(format!("subscriber-{idx}")) + .spawn(move || { + // We use a sleep to ensure the starting order of the 2 threads. + let subscriber = TestSubscriber::new(register_sleep_micros); + let _subscriber_guard = tracing::subscriber::set_default(subscriber); + + tracing::info!("event-from-{idx}", idx = idx); + + // Wait a bit for everything to end (we don't want to remove the subscriber + // immediately because that will mix up the test). + thread::sleep(Duration::from_millis(100)); + }) + .expect("failed to spawn thread") +} + +#[test] +fn event_before_register() { + let subscriber_1_register_sleep_micros = 100; + let subscriber_2_register_sleep_micros = 0; + + let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros); + + // This delay ensures that the event!() in the first thread is executed first. + thread::sleep(Duration::from_micros(50)); + let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros); + + jh1.join().expect("failed to join thread"); + jh2.join().expect("failed to join thread"); +}