Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement filter for the reload layer/subscriber #2159

Merged
merged 2 commits into from
Jun 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 96 additions & 23 deletions tracing-subscriber/src/reload.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
//! Wrapper for a `Collect` or `Subscribe` to allow it to be dynamically reloaded.
//!
//! This module provides a type implementing [`Subscribe`] which wraps another type implementing
//! the [`Subscribe`] trait, allowing the wrapped type to be replaced with another
//! This module provides a type implementing [`Subscribe`] and [`Filter`]
//! which wraps another type implementing the corresponding trait. This
//! allows the wrapped type to be replaced with another
//! instance of that type at runtime.
//!
//! This can be used in cases where a subset of `Collect` functionality
//! This can be used in cases where a subset of `Collect` or `Filter` functionality
//! should be dynamically reconfigured, such as when filtering directives may
//! change at runtime. Note that this subscriber introduces a (relatively small)
//! amount of overhead, and should thus only be used as needed.
//!
//! ## Note
//!
//! //! The [`Subscribe`] implementation is unable to implement downcasting functionality,
//! so certain `Subscribers` will fail to reload if wrapped in a `reload::Subscriber`.
//!
//! If you only want to be able to dynamically change the
//! `Filter` on your layer, prefer wrapping that `Filter` in the `reload::Subscriber`.
guswynn marked this conversation as resolved.
Show resolved Hide resolved
//!
//! [`Subscribe`]: crate::Subscribe
//! [`Filter`]: crate::subscribe::Filter
use crate::subscribe;
use crate::sync::RwLock;

Expand All @@ -23,7 +33,10 @@ use tracing_core::{
span, Event, Metadata,
};

/// Wraps a `Collect` or `Subscribe`, allowing it to be reloaded dynamically at runtime.
/// Wraps a `Filter` or `Subscribe`, allowing it to be reloaded dynamically at runtime.
///
/// [`Filter`]: crate::subscribe::Filter
/// [`Subscribe`]: crate::Subscribe
#[derive(Debug)]
pub struct Subscriber<S> {
// TODO(eliza): this once used a `crossbeam_util::ShardedRwLock`. We may
Expand Down Expand Up @@ -119,10 +132,67 @@ where
}
}

impl<S> Subscriber<S> {
/// Wraps the given `Subscribe`, returning a subscriber and a `Handle` that allows
#[cfg(all(feature = "registry", feature = "std"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "registry", feature = "std"))))]
impl<S, C> crate::subscribe::Filter<C> for Subscriber<S>
where
S: crate::subscribe::Filter<C> + 'static,
C: Collect,
{
#[inline]
fn callsite_enabled(&self, metadata: &'static Metadata<'static>) -> Interest {
try_lock!(self.inner.read(), else return Interest::sometimes()).callsite_enabled(metadata)
}

#[inline]
fn enabled(&self, metadata: &Metadata<'_>, ctx: &subscribe::Context<'_, C>) -> bool {
try_lock!(self.inner.read(), else return false).enabled(metadata, ctx)
}

#[inline]
fn on_new_span(
&self,
attrs: &span::Attributes<'_>,
id: &span::Id,
ctx: subscribe::Context<'_, C>,
) {
try_lock!(self.inner.read()).on_new_span(attrs, id, ctx)
}

#[inline]
fn on_record(
&self,
span: &span::Id,
values: &span::Record<'_>,
ctx: subscribe::Context<'_, C>,
) {
try_lock!(self.inner.read()).on_record(span, values, ctx)
}

#[inline]
fn on_enter(&self, id: &span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_enter(id, ctx)
}

#[inline]
fn on_exit(&self, id: &span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_exit(id, ctx)
}

#[inline]
fn on_close(&self, id: span::Id, ctx: subscribe::Context<'_, C>) {
try_lock!(self.inner.read()).on_close(id, ctx)
}
}

impl<T> Subscriber<T> {
/// Wraps the given `Subscribe` or `Filter`,
guswynn marked this conversation as resolved.
Show resolved Hide resolved
/// returning a subscriber or filter and a `Handle` that allows
/// the inner type to be modified at runtime.
pub fn new(inner: S) -> (Self, Handle<S>) {
///
/// [`Filter`]: crate::subscribe::Filter
/// [`Subscribe`]: crate::Subscribe
pub fn new(inner: T) -> (Self, Handle<T>) {
let this = Self {
inner: Arc::new(RwLock::new(inner)),
};
Expand All @@ -131,7 +201,7 @@ impl<S> Subscriber<S> {
}

/// Returns a `Handle` that can be used to reload the wrapped `Subscribe`.
pub fn handle(&self) -> Handle<S> {
pub fn handle(&self) -> Handle<T> {
Handle {
inner: Arc::downgrade(&self.inner),
}
Expand All @@ -140,22 +210,25 @@ impl<S> Subscriber<S> {

// ===== impl Handle =====

impl<S> Handle<S> {
/// Replace the current subscriber with the provided `new_subscriber`.
impl<T> Handle<T> {
/// Replace the current subscriber or filter with the provided `new_value`.
///
/// [`Handle::reload`] cannot be used with the [`Filtered`](crate::filter::Filtered)
/// subscriber; use [`Handle::modify`] instead (see [this issue] for additional details).
///
/// However, if the _only_ the [`Filter`](crate::subscribe::Filter) needs to be modified,
/// use `reload::Subscriber` to wrap the `Filter` directly.
///
/// **Warning:** The [`Filtered`](crate::filter::Filtered) type currently can't be changed
/// at runtime via the [`Handle::reload`] method.
/// Use the [`Handle::modify`] method to change the filter instead.
/// (see <https://github.com/tokio-rs/tracing/issues/1629>)
pub fn reload(&self, new_subscriber: impl Into<S>) -> Result<(), Error> {
self.modify(|subscriber| {
*subscriber = new_subscriber.into();
/// [this issue]: https://github.com/tokio-rs/tracing/issues/1629
pub fn reload(&self, new_value: impl Into<T>) -> Result<(), Error> {
self.modify(|object| {
*object = new_value.into();
})
}

/// Invokes a closure with a mutable reference to the current subscriber,
/// allowing it to be modified in place.
pub fn modify(&self, f: impl FnOnce(&mut S)) -> Result<(), Error> {
pub fn modify(&self, f: impl FnOnce(&mut T)) -> Result<(), Error> {
let inner = self.inner.upgrade().ok_or(Error {
kind: ErrorKind::CollectorGone,
})?;
Expand All @@ -182,16 +255,16 @@ impl<S> Handle<S> {

/// Returns a clone of the subscriber's current value if it still exists.
/// Otherwise, if the collector has been dropped, returns `None`.
pub fn clone_current(&self) -> Option<S>
pub fn clone_current(&self) -> Option<T>
where
S: Clone,
T: Clone,
{
self.with_current(S::clone).ok()
self.with_current(T::clone).ok()
}

/// Invokes a closure with a borrowed reference to the current subscriber,
/// returning the result (or an error if the collector no longer exists).
pub fn with_current<T>(&self, f: impl FnOnce(&S) -> T) -> Result<T, Error> {
pub fn with_current<T2>(&self, f: impl FnOnce(&T) -> T2) -> Result<T2, Error> {
let inner = self.inner.upgrade().ok_or(Error {
kind: ErrorKind::CollectorGone,
})?;
Expand All @@ -200,7 +273,7 @@ impl<S> Handle<S> {
}
}

impl<S> Clone for Handle<S> {
impl<T> Clone for Handle<T> {
fn clone(&self) -> Self {
Handle {
inner: self.inner.clone(),
Expand Down
59 changes: 59 additions & 0 deletions tracing-subscriber/tests/reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ impl Collect for NopCollector {
}
}

pub struct NopSubscriber;
impl<S: Collect> tracing_subscriber::Subscribe<S> for NopSubscriber {
fn register_callsite(&self, _m: &Metadata<'_>) -> Interest {
Interest::sometimes()
}

fn enabled(&self, _m: &Metadata<'_>, _: subscribe::Context<'_, S>) -> bool {
true
}
}

#[test]
fn reload_handle() {
static FILTER1_CALLS: AtomicUsize = AtomicUsize::new(0);
Expand Down Expand Up @@ -82,3 +93,51 @@ fn reload_handle() {
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 1);
})
}

#[test]
fn reload_filter() {
static FILTER1_CALLS: AtomicUsize = AtomicUsize::new(0);
static FILTER2_CALLS: AtomicUsize = AtomicUsize::new(0);

enum Filter {
One,
Two,
}

impl<S: Collect> tracing_subscriber::subscribe::Filter<S> for Filter {
fn enabled(&self, m: &Metadata<'_>, _: &subscribe::Context<'_, S>) -> bool {
println!("ENABLED: {:?}", m);
match self {
Filter::One => FILTER1_CALLS.fetch_add(1, Ordering::SeqCst),
Filter::Two => FILTER2_CALLS.fetch_add(1, Ordering::SeqCst),
};
true
}
}
fn event() {
tracing::trace!("my event");
}

let (filter, handle) = Subscriber::new(Filter::One);

let dispatcher = tracing_core::dispatch::Dispatch::new(
tracing_subscriber::registry().with(NopSubscriber.with_filter(filter)),
);

tracing_core::dispatch::with_default(&dispatcher, || {
assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 0);
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 0);

event();

assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 1);
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 0);

handle.reload(Filter::Two).expect("should reload");

event();

assert_eq!(FILTER1_CALLS.load(Ordering::SeqCst), 1);
assert_eq!(FILTER2_CALLS.load(Ordering::SeqCst), 1);
})
}