Skip to content

Commit

Permalink
trace-core: Dispatchers unset themselves (#1033)
Browse files Browse the repository at this point in the history
This branch changes `dispatcher::get_default` to unset the thread's
current dispatcher while the reference to it is held by the closure.
This prevents infinite loops if the subscriber calls code paths which
emit events or construct spans. 

Note that this also means that nested calls to `get_default` inside of a
`get_default` closure will receive a `None` dispatcher rather than the
"actual" dispatcher. However, it was necessary to unset the default in
`get_default` rather than in dispatch methods such as `Dispatch::enter`,
as when those functions are called, the current state has already been
borrowed.

Before:
```
test enter_span              ... bench:           3 ns/iter (+/- 0)
test span_no_fields          ... bench:          51 ns/iter (+/- 12)
test span_repeatedly         ... bench:       5,073 ns/iter (+/- 1,528)
test span_with_fields        ... bench:          56 ns/iter (+/- 49)
test span_with_fields_record ... bench:         363 ns/iter (+/- 61)
```

After:
```
test enter_span              ... bench:           3 ns/iter (+/- 0)
test span_no_fields          ... bench:          35 ns/iter (+/- 12)
test span_repeatedly         ... bench:       4,165 ns/iter (+/- 298)
test span_with_fields        ... bench:          48 ns/iter (+/- 12)
test span_with_fields_record ... bench:         363 ns/iter (+/- 91)
```

Closes #1032 

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Apr 16, 2019
1 parent 88b9426 commit 4bfa4ff
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 25 deletions.
48 changes: 48 additions & 0 deletions tokio-trace/tests/subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#[macro_use]
extern crate tokio_trace;
use tokio_trace::{
span,
subscriber::{with_default, Interest, Subscriber},
Event, Level, Metadata,
};

#[test]
fn event_macros_dont_infinite_loop() {
// This test ensures that an event macro within a subscriber
// won't cause an infinite loop of events.
struct TestSubscriber;
impl Subscriber for TestSubscriber {
fn register_callsite(&self, _: &Metadata) -> Interest {
// Always return sometimes so that `enabled` will be called
// (which can loop).
Interest::sometimes()
}

fn enabled(&self, meta: &Metadata) -> bool {
assert!(meta.fields().iter().any(|f| f.name() == "foo"));
event!(Level::TRACE, bar = false);
true
}

fn new_span(&self, _: &span::Attributes) -> span::Id {
span::Id::from_u64(0xAAAA)
}

fn record(&self, _: &span::Id, _: &span::Record) {}

fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}

fn event(&self, event: &Event) {
assert!(event.metadata().fields().iter().any(|f| f.name() == "foo"));
event!(Level::TRACE, baz = false);
}

fn enter(&self, _: &span::Id) {}

fn exit(&self, _: &span::Id) {}
}

with_default(TestSubscriber, || {
event!(Level::TRACE, foo = false);
})
}
224 changes: 199 additions & 25 deletions tokio-trace/tokio-trace-core/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Dispatches trace events to `Subscriber`s.
//! Dispatches trace events to `Subscriber`s.c
use {
callsite, span,
subscriber::{self, Subscriber},
Expand All @@ -7,7 +7,7 @@ use {

use std::{
any::Any,
cell::RefCell,
cell::{Cell, RefCell},
fmt,
sync::{Arc, Weak},
};
Expand All @@ -21,9 +21,30 @@ pub struct Dispatch {
}

thread_local! {
static CURRENT_DISPATCH: RefCell<Dispatch> = RefCell::new(Dispatch::none());
static CURRENT_STATE: State = State {
default: RefCell::new(Dispatch::none()),
can_enter: Cell::new(true),
};
}

/// The dispatch state of a thread.
struct State {
/// This thread's current default dispatcher.
default: RefCell<Dispatch>,
/// Whether or not we can currently begin dispatching a trace event.
///
/// This is set to `false` when functions such as `enter`, `exit`, `event`,
/// and `new_span` are called on this thread's default dispatcher, to
/// prevent further trace events triggered inside those functions from
/// creating an infinite recursion. When we finish handling a dispatch, this
/// is set back to `true`.
can_enter: Cell<bool>,
}

/// A guard that resets the current default dispatcher to the prior
/// default dispatcher when dropped.
struct ResetGuard(Option<Dispatch>);

/// Sets this dispatch as the default for the duration of a closure.
///
/// The default dispatcher is used when creating a new [span] or
Expand All @@ -35,41 +56,52 @@ thread_local! {
/// [`Subscriber`]: ../subscriber/trait.Subscriber.html
/// [`Event`]: ../event/struct.Event.html
pub fn with_default<T>(dispatcher: &Dispatch, f: impl FnOnce() -> T) -> T {
// A drop guard that resets CURRENT_DISPATCH to the prior dispatcher.
// Using this (rather than simply resetting after calling `f`) ensures
// that we always reset to the prior dispatcher even if `f` panics.
struct ResetGuard(Option<Dispatch>);
impl Drop for ResetGuard {
fn drop(&mut self) {
if let Some(dispatch) = self.0.take() {
let _ = CURRENT_DISPATCH.try_with(|current| {
*current.borrow_mut() = dispatch;
});
}
}
}

let dispatcher = dispatcher.clone();
let prior = CURRENT_DISPATCH.try_with(|current| current.replace(dispatcher));
let _guard = ResetGuard(prior.ok());
// When this guard is dropped, the default dispatcher will be reset to the
// prior default. Using this (rather than simply resetting after calling
// `f`) ensures that we always reset to the prior dispatcher even if `f`
// panics.
let _guard = State::set_default(dispatcher.clone());
f()
}
/// Executes a closure with a reference to this thread's current [dispatcher].
///
/// Note that calls to `get_default` should not be nested; if this function is
/// called while inside of another `get_default`, that closure will be provided
/// with `Dispatch::none` rather than the previously set dispatcher.
///
/// [dispatcher]: ../dispatcher/struct.Dispatch.html
pub fn get_default<T, F>(mut f: F) -> T
where
F: FnMut(&Dispatch) -> T,
{
CURRENT_DISPATCH
.try_with(|current| f(&*current.borrow()))
// While this guard is active, additional calls to subscriber functions on
// the default dispatcher will not be able to access the dispatch context.
// Dropping the guard will allow the dispatch context to be re-entered.
struct Entered<'a>(&'a Cell<bool>);
impl<'a> Drop for Entered<'a> {
#[inline]
fn drop(&mut self) {
self.0.set(true);
}
}

CURRENT_STATE
.try_with(|state| {
if state.can_enter.replace(false) {
let _guard = Entered(&state.can_enter);
f(&state.default.borrow())
} else {
f(&Dispatch::none())
}
})
.unwrap_or_else(|_| f(&Dispatch::none()))
}

pub(crate) struct Registrar(Weak<Subscriber + Send + Sync>);

impl Dispatch {
/// Returns a new `Dispatch` that discards events and spans.
#[inline]
pub fn none() -> Self {
Dispatch {
subscriber: Arc::new(NoSubscriber),
Expand Down Expand Up @@ -173,7 +205,7 @@ impl Dispatch {
self.subscriber.event(event)
}

/// Records that a span has been entered.
/// Records that a span has been can_enter.
///
/// This calls the [`enter`] function on the [`Subscriber`] that this
/// `Dispatch` forwards to.
Expand All @@ -182,7 +214,7 @@ impl Dispatch {
/// [`event`]: ../subscriber/trait.Subscriber.html#method.event
#[inline]
pub fn enter(&self, span: &span::Id) {
self.subscriber.enter(span)
self.subscriber.enter(span);
}

/// Records that a span has been exited.
Expand All @@ -191,7 +223,7 @@ impl Dispatch {
/// that this `Dispatch` forwards to.
#[inline]
pub fn exit(&self, span: &span::Id) {
self.subscriber.exit(span)
self.subscriber.exit(span);
}

/// Notifies the subscriber that a [span ID] has been cloned.
Expand Down Expand Up @@ -295,9 +327,50 @@ impl Registrar {
}
}

// ===== impl State =====

impl State {
/// Replaces the current default dispatcher on this thread with the provided
/// dispatcher.Any
///
/// Dropping the returned `ResetGuard` will reset the default dispatcher to
/// the previous value.
#[inline]
fn set_default(new_dispatch: Dispatch) -> ResetGuard {
let prior = CURRENT_STATE
.try_with(|state| {
state.can_enter.set(true);
state.default.replace(new_dispatch)
})
.ok();
ResetGuard(prior)
}
}

// ===== impl ResetGuard =====

impl Drop for ResetGuard {
#[inline]
fn drop(&mut self) {
if let Some(dispatch) = self.0.take() {
let _ = CURRENT_STATE.try_with(|state| {
*state.default.borrow_mut() = dispatch;
});
}
}
}

#[cfg(test)]
mod test {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use {
callsite::Callsite,
metadata::{Kind, Level, Metadata},
span,
subscriber::{Interest, Subscriber},
Event,
};

#[test]
fn dispatch_is() {
Expand All @@ -310,4 +383,105 @@ mod test {
let dispatcher = Dispatch::new(NoSubscriber);
assert!(dispatcher.downcast_ref::<NoSubscriber>().is_some());
}

struct TestCallsite;
static TEST_CALLSITE: TestCallsite = TestCallsite;
static TEST_META: Metadata<'static> = metadata! {
name: "test",
target: module_path!(),
level: Level::DEBUG,
fields: &[],
callsite: &TEST_CALLSITE,
kind: Kind::EVENT
};

impl Callsite for TestCallsite {
fn set_interest(&self, _: Interest) {}
fn metadata(&self) -> &Metadata {
&TEST_META
}
}

#[test]
fn events_dont_infinite_loop() {
// This test ensures that an event triggered within a subscriber
// won't cause an infinite loop of events.
struct TestSubscriber;
impl Subscriber for TestSubscriber {
fn enabled(&self, _: &Metadata) -> bool {
true
}

fn new_span(&self, _: &span::Attributes) -> span::Id {
span::Id::from_u64(0xAAAA)
}

fn record(&self, _: &span::Id, _: &span::Record) {}

fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}

fn event(&self, _: &Event) {
static EVENTS: AtomicUsize = AtomicUsize::new(0);
assert_eq!(
EVENTS.fetch_add(1, Ordering::Relaxed),
0,
"event method called twice!"
);
Event::dispatch(&TEST_META, &TEST_META.fields().value_set(&[]))
}

fn enter(&self, _: &span::Id) {}

fn exit(&self, _: &span::Id) {}
}

with_default(&Dispatch::new(TestSubscriber), || {
Event::dispatch(&TEST_META, &TEST_META.fields().value_set(&[]))
})
}

#[test]
fn spans_dont_infinite_loop() {
// This test ensures that a span created within a subscriber
// won't cause an infinite loop of new spans.

fn mk_span() {
get_default(|current| {
current.new_span(&span::Attributes::new(
&TEST_META,
&TEST_META.fields().value_set(&[]),
))
});
}

struct TestSubscriber;
impl Subscriber for TestSubscriber {
fn enabled(&self, _: &Metadata) -> bool {
true
}

fn new_span(&self, _: &span::Attributes) -> span::Id {
static NEW_SPANS: AtomicUsize = AtomicUsize::new(0);
assert_eq!(
NEW_SPANS.fetch_add(1, Ordering::Relaxed),
0,
"new_span method called twice!"
);
mk_span();
span::Id::from_u64(0xAAAA)
}

fn record(&self, _: &span::Id, _: &span::Record) {}

fn record_follows_from(&self, _: &span::Id, _: &span::Id) {}

fn event(&self, _: &Event) {}

fn enter(&self, _: &span::Id) {}

fn exit(&self, _: &span::Id) {}
}

with_default(&Dispatch::new(TestSubscriber), || mk_span())
}
}

0 comments on commit 4bfa4ff

Please sign in to comment.