diff --git a/tokio-trace/tests/subscriber.rs b/tokio-trace/tests/subscriber.rs new file mode 100644 index 00000000000..99635b8aaba --- /dev/null +++ b/tokio-trace/tests/subscriber.rs @@ -0,0 +1,48 @@ +#[macro_use] +extern crate tokio_trace; +use tokio_trace::{ + span, + subscriber::{with_default, Interest, Subscriber}, + Event, Level, Metadata, +}; + +#[test] +fn event_macros_dont_infinite_loop() { + // This test ensures that an event macro within a subscriber + // won't cause an infinite loop of events. + struct TestSubscriber; + impl Subscriber for TestSubscriber { + fn register_callsite(&self, _: &Metadata) -> Interest { + // Always return sometimes so that `enabled` will be called + // (which can loop). + Interest::sometimes() + } + + fn enabled(&self, meta: &Metadata) -> bool { + assert!(meta.fields().iter().any(|f| f.name() == "foo")); + event!(Level::TRACE, bar = false); + true + } + + fn new_span(&self, _: &span::Attributes) -> span::Id { + span::Id::from_u64(0xAAAA) + } + + fn record(&self, _: &span::Id, _: &span::Record) {} + + fn record_follows_from(&self, _: &span::Id, _: &span::Id) {} + + fn event(&self, event: &Event) { + assert!(event.metadata().fields().iter().any(|f| f.name() == "foo")); + event!(Level::TRACE, baz = false); + } + + fn enter(&self, _: &span::Id) {} + + fn exit(&self, _: &span::Id) {} + } + + with_default(TestSubscriber, || { + event!(Level::TRACE, foo = false); + }) +} diff --git a/tokio-trace/tokio-trace-core/src/dispatcher.rs b/tokio-trace/tokio-trace-core/src/dispatcher.rs index d436aed87ba..9ae3cac4e24 100644 --- a/tokio-trace/tokio-trace-core/src/dispatcher.rs +++ b/tokio-trace/tokio-trace-core/src/dispatcher.rs @@ -1,4 +1,4 @@ -//! Dispatches trace events to `Subscriber`s. +//! Dispatches trace events to `Subscriber`s.c use { callsite, span, subscriber::{self, Subscriber}, @@ -7,7 +7,7 @@ use { use std::{ any::Any, - cell::RefCell, + cell::{Cell, RefCell}, fmt, sync::{Arc, Weak}, }; @@ -21,9 +21,30 @@ pub struct Dispatch { } thread_local! { - static CURRENT_DISPATCH: RefCell = RefCell::new(Dispatch::none()); + static CURRENT_STATE: State = State { + default: RefCell::new(Dispatch::none()), + can_enter: Cell::new(true), + }; } +/// The dispatch state of a thread. +struct State { + /// This thread's current default dispatcher. + default: RefCell, + /// Whether or not we can currently begin dispatching a trace event. + /// + /// This is set to `false` when functions such as `enter`, `exit`, `event`, + /// and `new_span` are called on this thread's default dispatcher, to + /// prevent further trace events triggered inside those functions from + /// creating an infinite recursion. When we finish handling a dispatch, this + /// is set back to `true`. + can_enter: Cell, +} + +/// A guard that resets the current default dispatcher to the prior +/// default dispatcher when dropped. +struct ResetGuard(Option); + /// Sets this dispatch as the default for the duration of a closure. /// /// The default dispatcher is used when creating a new [span] or @@ -35,34 +56,44 @@ thread_local! { /// [`Subscriber`]: ../subscriber/trait.Subscriber.html /// [`Event`]: ../event/struct.Event.html pub fn with_default(dispatcher: &Dispatch, f: impl FnOnce() -> T) -> T { - // A drop guard that resets CURRENT_DISPATCH to the prior dispatcher. - // Using this (rather than simply resetting after calling `f`) ensures - // that we always reset to the prior dispatcher even if `f` panics. - struct ResetGuard(Option); - impl Drop for ResetGuard { - fn drop(&mut self) { - if let Some(dispatch) = self.0.take() { - let _ = CURRENT_DISPATCH.try_with(|current| { - *current.borrow_mut() = dispatch; - }); - } - } - } - - let dispatcher = dispatcher.clone(); - let prior = CURRENT_DISPATCH.try_with(|current| current.replace(dispatcher)); - let _guard = ResetGuard(prior.ok()); + // When this guard is dropped, the default dispatcher will be reset to the + // prior default. Using this (rather than simply resetting after calling + // `f`) ensures that we always reset to the prior dispatcher even if `f` + // panics. + let _guard = State::set_default(dispatcher.clone()); f() } /// Executes a closure with a reference to this thread's current [dispatcher]. /// +/// Note that calls to `get_default` should not be nested; if this function is +/// called while inside of another `get_default`, that closure will be provided +/// with `Dispatch::none` rather than the previously set dispatcher. +/// /// [dispatcher]: ../dispatcher/struct.Dispatch.html pub fn get_default(mut f: F) -> T where F: FnMut(&Dispatch) -> T, { - CURRENT_DISPATCH - .try_with(|current| f(&*current.borrow())) + // While this guard is active, additional calls to subscriber functions on + // the default dispatcher will not be able to access the dispatch context. + // Dropping the guard will allow the dispatch context to be re-entered. + struct Entered<'a>(&'a Cell); + impl<'a> Drop for Entered<'a> { + #[inline] + fn drop(&mut self) { + self.0.set(true); + } + } + + CURRENT_STATE + .try_with(|state| { + if state.can_enter.replace(false) { + let _guard = Entered(&state.can_enter); + f(&state.default.borrow()) + } else { + f(&Dispatch::none()) + } + }) .unwrap_or_else(|_| f(&Dispatch::none())) } @@ -70,6 +101,7 @@ pub(crate) struct Registrar(Weak); impl Dispatch { /// Returns a new `Dispatch` that discards events and spans. + #[inline] pub fn none() -> Self { Dispatch { subscriber: Arc::new(NoSubscriber), @@ -173,7 +205,7 @@ impl Dispatch { self.subscriber.event(event) } - /// Records that a span has been entered. + /// Records that a span has been can_enter. /// /// This calls the [`enter`] function on the [`Subscriber`] that this /// `Dispatch` forwards to. @@ -182,7 +214,7 @@ impl Dispatch { /// [`event`]: ../subscriber/trait.Subscriber.html#method.event #[inline] pub fn enter(&self, span: &span::Id) { - self.subscriber.enter(span) + self.subscriber.enter(span); } /// Records that a span has been exited. @@ -191,7 +223,7 @@ impl Dispatch { /// that this `Dispatch` forwards to. #[inline] pub fn exit(&self, span: &span::Id) { - self.subscriber.exit(span) + self.subscriber.exit(span); } /// Notifies the subscriber that a [span ID] has been cloned. @@ -295,9 +327,50 @@ impl Registrar { } } +// ===== impl State ===== + +impl State { + /// Replaces the current default dispatcher on this thread with the provided + /// dispatcher.Any + /// + /// Dropping the returned `ResetGuard` will reset the default dispatcher to + /// the previous value. + #[inline] + fn set_default(new_dispatch: Dispatch) -> ResetGuard { + let prior = CURRENT_STATE + .try_with(|state| { + state.can_enter.set(true); + state.default.replace(new_dispatch) + }) + .ok(); + ResetGuard(prior) + } +} + +// ===== impl ResetGuard ===== + +impl Drop for ResetGuard { + #[inline] + fn drop(&mut self) { + if let Some(dispatch) = self.0.take() { + let _ = CURRENT_STATE.try_with(|state| { + *state.default.borrow_mut() = dispatch; + }); + } + } +} + #[cfg(test)] mod test { use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + use { + callsite::Callsite, + metadata::{Kind, Level, Metadata}, + span, + subscriber::{Interest, Subscriber}, + Event, + }; #[test] fn dispatch_is() { @@ -310,4 +383,105 @@ mod test { let dispatcher = Dispatch::new(NoSubscriber); assert!(dispatcher.downcast_ref::().is_some()); } + + struct TestCallsite; + static TEST_CALLSITE: TestCallsite = TestCallsite; + static TEST_META: Metadata<'static> = metadata! { + name: "test", + target: module_path!(), + level: Level::DEBUG, + fields: &[], + callsite: &TEST_CALLSITE, + kind: Kind::EVENT + }; + + impl Callsite for TestCallsite { + fn set_interest(&self, _: Interest) {} + fn metadata(&self) -> &Metadata { + &TEST_META + } + } + + #[test] + fn events_dont_infinite_loop() { + // This test ensures that an event triggered within a subscriber + // won't cause an infinite loop of events. + struct TestSubscriber; + impl Subscriber for TestSubscriber { + fn enabled(&self, _: &Metadata) -> bool { + true + } + + fn new_span(&self, _: &span::Attributes) -> span::Id { + span::Id::from_u64(0xAAAA) + } + + fn record(&self, _: &span::Id, _: &span::Record) {} + + fn record_follows_from(&self, _: &span::Id, _: &span::Id) {} + + fn event(&self, _: &Event) { + static EVENTS: AtomicUsize = AtomicUsize::new(0); + assert_eq!( + EVENTS.fetch_add(1, Ordering::Relaxed), + 0, + "event method called twice!" + ); + Event::dispatch(&TEST_META, &TEST_META.fields().value_set(&[])) + } + + fn enter(&self, _: &span::Id) {} + + fn exit(&self, _: &span::Id) {} + } + + with_default(&Dispatch::new(TestSubscriber), || { + Event::dispatch(&TEST_META, &TEST_META.fields().value_set(&[])) + }) + } + + #[test] + fn spans_dont_infinite_loop() { + // This test ensures that a span created within a subscriber + // won't cause an infinite loop of new spans. + + fn mk_span() { + get_default(|current| { + current.new_span(&span::Attributes::new( + &TEST_META, + &TEST_META.fields().value_set(&[]), + )) + }); + } + + struct TestSubscriber; + impl Subscriber for TestSubscriber { + fn enabled(&self, _: &Metadata) -> bool { + true + } + + fn new_span(&self, _: &span::Attributes) -> span::Id { + static NEW_SPANS: AtomicUsize = AtomicUsize::new(0); + assert_eq!( + NEW_SPANS.fetch_add(1, Ordering::Relaxed), + 0, + "new_span method called twice!" + ); + mk_span(); + span::Id::from_u64(0xAAAA) + } + + fn record(&self, _: &span::Id, _: &span::Record) {} + + fn record_follows_from(&self, _: &span::Id, _: &span::Id) {} + + fn event(&self, _: &Event) {} + + fn enter(&self, _: &span::Id) {} + + fn exit(&self, _: &span::Id) {} + } + + with_default(&Dispatch::new(TestSubscriber), || mk_span()) + } }