Skip to content

Commit

Permalink
subscriber: implement FIlter for reload::Subscriber (#2159)
Browse files Browse the repository at this point in the history
## Motivation

The `reload` subscriber doesn't (and can't) implement downcasting correctly,
which breaks certain subscribers like the opentelemetry one.

## Solution

Most uses of the `reload` module (including mine) are just to change the
filter. Therefore, this PR implements `Filter` for `reload::Subscriber`
to allow users to not need to wrap the whole layer trait. Another
advantage of this is that the common-case critical sections are
shortened.
  • Loading branch information
guswynn authored Jun 15, 2022
1 parent 758df19 commit 3046981
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 23 deletions.
119 changes: 96 additions & 23 deletions tracing-subscriber/src/reload.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
//! Wrapper for a `Collect` or `Subscribe` to allow it to be dynamically reloaded.
//!
//! This module provides a type implementing [`Subscribe`] which wraps another type implementing
//! the [`Subscribe`] trait, allowing the wrapped type to be replaced with another
//! This module provides a type implementing [`Subscribe`] and [`Filter`]
//! which wraps another type implementing the corresponding trait. This
//! allows the wrapped type to be replaced with another
//! instance of that type at runtime.
//!
//! This can be used in cases where a subset of `Collect` functionality
//! This can be used in cases where a subset of `Collect` or `Filter` functionality
//! should be dynamically reconfigured, such as when filtering directives may
//! change at runtime. Note that this subscriber introduces a (relatively small)
//! amount of overhead, and should thus only be used as needed.
//!
//! ## Note
//!
//! //! The [`Subscribe`] implementation is unable to implement downcasting functionality,
//! so certain `Subscribers` will fail to reload if wrapped in a `reload::Subscriber`.
//!
//! If you only want to be able to dynamically change the
//! `Filter` on your layer, prefer wrapping that `Filter` in the `reload::Subscriber`.
//!
//! [`Subscribe`]: crate::Subscribe
//! [`Filter`]: crate::subscribe::Filter
use crate::subscribe;
use crate::sync::RwLock;

Expand All @@ -23,7 +33,10 @@ use tracing_core::{
span, Event, Metadata,
};

/// Wraps a `Collect` or `Subscribe`, allowing it to be reloaded dynamically at runtime.
/// Wraps a `Filter` or `Subscribe`, allowing it to be reloaded dynamically at runtime.
///
/// [`Filter`]: crate::subscribe::Filter
/// [`Subscribe`]: crate::Subscribe
#[derive(Debug)]
pub struct Subscriber<S> {
// TODO(eliza): this once used a `crossbeam_util::ShardedRwLock`. We may
Expand Down Expand Up @@ -119,10 +132,67 @@ where
}
}

impl<S> Subscriber<S> {
/// Wraps the given `Subscribe`, returning a subscriber and a `Handle` that allows
#[cfg(all(feature = "registry", feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "registry", feature = "std"))))]
impl<S, C> crate::subscribe::Filter<C> for Subscriber<S>
where
S: crate::subscribe::Filter<C> + 'static,
C: Collect,
{
#[inline]
fn callsite_enabled(&self, metadata: &'static Metadata<'static>) -> Interest {
try_lock!(self.inner.read(), else return Interest::sometimes()).callsite_enabled(metadata)
}

#[inline]
fn enabled(&self, metadata: &Metadata<'_>, ctx: &subscribe::Context<'_, C>) -> bool {
try_lock!(self.inner.read(), else return false).enabled(metadata, ctx)
}

#[inline]
fn on_new_span(
&self,
attrs: &span::Attributes<'_>,
id: &span::Id,
ctx: subscribe::Context<'_, C>,
) {
try_lock!(self.inner.read()).on_new_span(attrs, id, ctx)
}

#[inline]
fn on_record(
&self,
span: &span::Id,
values: &span::Record<'_>,
ctx: subscribe::Context<'_, C>,
) {
try_lock!(self.inner.read()).on_record(span, values, ctx)
}

#[inline]
fn on_enter(&self, id: &span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_enter(id, ctx)
}

#[inline]
fn on_exit(&self, id: &span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_exit(id, ctx)
}

#[inline]
fn on_close(&self, id: span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_close(id, ctx)
}
}

impl<T> Subscriber<T> {
/// Wraps the given `Subscribe` or `Filter`,
/// returning a subscriber or filter and a `Handle` that allows
/// the inner type to be modified at runtime.
pub fn new(inner: S) -> (Self, Handle<S>) {
///
/// [`Filter`]: crate::subscribe::Filter
/// [`Subscribe`]: crate::Subscribe
pub fn new(inner: T) -> (Self, Handle<T>) {
let this = Self {
inner: Arc::new(RwLock::new(inner)),
};
Expand All @@ -131,7 +201,7 @@ impl<S> Subscriber<S> {
}

/// Returns a `Handle` that can be used to reload the wrapped `Subscribe`.
pub fn handle(&self) -> Handle<S> {
pub fn handle(&self) -> Handle<T> {
Handle {
inner: Arc::downgrade(&self.inner),
}
Expand All @@ -140,22 +210,25 @@ impl<S> Subscriber<S> {

// ===== impl Handle =====

impl<S> Handle<S> {
/// Replace the current subscriber with the provided `new_subscriber`.
impl<T> Handle<T> {
/// Replace the current subscriber or filter with the provided `new_value`.
///
/// [`Handle::reload`] cannot be used with the [`Filtered`](crate::filter::Filtered)
/// subscriber; use [`Handle::modify`] instead (see [this issue] for additional details).
///
/// However, if the _only_ the [`Filter`](crate::subscribe::Filter) needs to be modified,
/// use `reload::Subscriber` to wrap the `Filter` directly.
///
/// **Warning:** The [`Filtered`](crate::filter::Filtered) type currently can't be changed
/// at runtime via the [`Handle::reload`] method.
/// Use the [`Handle::modify`] method to change the filter instead.
/// (see <https://github.com/tokio-rs/tracing/issues/1629>)
pub fn reload(&self, new_subscriber: impl Into<S>) -> Result<(), Error> {
self.modify(|subscriber| {
*subscriber = new_subscriber.into();
/// [this issue]: https://github.com/tokio-rs/tracing/issues/1629
pub fn reload(&self, new_value: impl Into<T>) -> Result<(), Error> {
self.modify(|object| {
*object = new_value.into();
})
}

/// Invokes a closure with a mutable reference to the current subscriber,
/// allowing it to be modified in place.
pub fn modify(&self, f: impl FnOnce(&mut S)) -> Result<(), Error> {
pub fn modify(&self, f: impl FnOnce(&mut T)) -> Result<(), Error> {
let inner = self.inner.upgrade().ok_or(Error {
kind: ErrorKind::CollectorGone,
})?;
Expand All @@ -182,16 +255,16 @@ impl<S> Handle<S> {

/// Returns a clone of the subscriber's current value if it still exists.
/// Otherwise, if the collector has been dropped, returns `None`.
pub fn clone_current(&self) -> Option<S>
pub fn clone_current(&self) -> Option<T>
where
S: Clone,
T: Clone,
{
self.with_current(S::clone).ok()
self.with_current(T::clone).ok()
}

/// Invokes a closure with a borrowed reference to the current subscriber,
/// returning the result (or an error if the collector no longer exists).
pub fn with_current<T>(&self, f: impl FnOnce(&S) -> T) -> Result<T, Error> {
pub fn with_current<T2>(&self, f: impl FnOnce(&T) -> T2) -> Result<T2, Error> {
let inner = self.inner.upgrade().ok_or(Error {
kind: ErrorKind::CollectorGone,
})?;
Expand All @@ -200,7 +273,7 @@ impl<S> Handle<S> {
}
}

impl<S> Clone for Handle<S> {
impl<T> Clone for Handle<T> {
fn clone(&self) -> Self {
Handle {
inner: self.inner.clone(),
Expand Down
59 changes: 59 additions & 0 deletions tracing-subscriber/tests/reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ impl Collect for NopCollector {
}
}

pub struct NopSubscriber;
impl<S: Collect> tracing_subscriber::Subscribe<S> for NopSubscriber {
fn register_callsite(&self, _m: &Metadata<'_>) -> Interest {
Interest::sometimes()
}

fn enabled(&self, _m: &Metadata<'_>, _: subscribe::Context<'_, S>) -> bool {
true
}
}

#[test]
fn reload_handle() {
static FILTER1_CALLS: AtomicUsize = AtomicUsize::new(0);
Expand Down Expand Up @@ -82,3 +93,51 @@ fn reload_handle() {
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 1);
})
}

#[test]
fn reload_filter() {
static FILTER1_CALLS: AtomicUsize = AtomicUsize::new(0);
static FILTER2_CALLS: AtomicUsize = AtomicUsize::new(0);

enum Filter {
One,
Two,
}

impl<S: Collect> tracing_subscriber::subscribe::Filter<S> for Filter {
fn enabled(&self, m: &Metadata<'_>, _: &subscribe::Context<'_, S>) -> bool {
println!("ENABLED: {:?}", m);
match self {
Filter::One => FILTER1_CALLS.fetch_add(1, Ordering::SeqCst),
Filter::Two => FILTER2_CALLS.fetch_add(1, Ordering::SeqCst),
};
true
}
}
fn event() {
tracing::trace!("my event");
}

let (filter, handle) = Subscriber::new(Filter::One);

let dispatcher = tracing_core::dispatch::Dispatch::new(
tracing_subscriber::registry().with(NopSubscriber.with_filter(filter)),
);

tracing_core::dispatch::with_default(&dispatcher, || {
assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 0);
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 0);

event();

assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 1);
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 0);

handle.reload(Filter::Two).expect("should reload");

event();

assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 1);
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 1);
})
}

0 comments on commit 3046981

Please sign in to comment.