Skip to content

Commit

Permalink
Add tracing layer list and refactor logging implementation
Browse files Browse the repository at this point in the history
Signed-off-by: trivernis <trivernis@protonmail.com>
  • Loading branch information
Trivernis committed Mar 9, 2022
1 parent a2aef10 commit aa772ea
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 48 deletions.
1 change: 1 addition & 0 deletions mediarepo-daemon/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mediarepo-daemon/mediarepo-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tracing = "0.1.31"
data-encoding = "2.3.2"
tokio-graceful-shutdown = "0.4.3"
thumbnailer = "0.4.0"
tracing-subscriber = "0.3.9"

[dependencies.sea-orm]
version = "0.6.0"
Expand Down
1 change: 1 addition & 0 deletions mediarepo-daemon/mediarepo-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ pub mod context;
pub mod error;
pub mod fs;
pub mod settings;
pub mod tracing_layer_list;
pub mod type_keys;
pub mod utils;
118 changes: 118 additions & 0 deletions mediarepo-daemon/mediarepo-core/src/tracing_layer_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::slice::{Iter, IterMut};
use tracing::level_filters::LevelFilter;
use tracing::span::{Attributes, Record};
use tracing::subscriber::Interest;
use tracing::{Event, Id, Metadata, Subscriber};
use tracing_subscriber::Layer;

pub struct DynLayerList<S>(Vec<Box<dyn Layer<S> + Send + Sync + 'static>>);

impl<S> DynLayerList<S> {
pub fn new() -> Self {
Self(Vec::new())
}

pub fn iter(&self) -> Iter<'_, Box<dyn Layer<S> + Send + Sync>> {
self.0.iter()
}

pub fn iter_mut(&mut self) -> IterMut<'_, Box<dyn Layer<S> + Send + Sync>> {
self.0.iter_mut()
}
}

impl<S> DynLayerList<S>
where
S: Subscriber,
{
pub fn add<L: Layer<S> + Send + Sync>(&mut self, layer: L) {
self.0.push(Box::new(layer));
}
}

impl<S> Layer<S> for DynLayerList<S>
where
S: Subscriber,
{
fn on_layer(&mut self, subscriber: &mut S) {
self.iter_mut().for_each(|l| l.on_layer(subscriber));
}

fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
// Return highest level of interest.
let mut interest = Interest::never();
for layer in &self.0 {
let new_interest = layer.register_callsite(metadata);
if (interest.is_sometimes() && new_interest.is_always())
|| (interest.is_never() && !new_interest.is_never())
{
interest = new_interest;
}
}
interest
}

fn enabled(
&self,
metadata: &Metadata<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) -> bool {
self.iter().any(|l| l.enabled(metadata, ctx.clone()))
}

fn on_new_span(
&self,
attrs: &Attributes<'_>,
id: &Id,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.iter()
.for_each(|l| l.on_new_span(attrs, id, ctx.clone()));
}

fn max_level_hint(&self) -> Option<LevelFilter> {
self.iter().filter_map(|l| l.max_level_hint()).max()
}

fn on_record(
&self,
span: &Id,
values: &Record<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.iter()
.for_each(|l| l.on_record(span, values, ctx.clone()));
}

fn on_follows_from(
&self,
span: &Id,
follows: &Id,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
self.iter()
.for_each(|l| l.on_follows_from(span, follows, ctx.clone()));
}

fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter().for_each(|l| l.on_event(event, ctx.clone()));
}

fn on_enter(&self, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter().for_each(|l| l.on_enter(id, ctx.clone()));
}

fn on_exit(&self, id: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter().for_each(|l| l.on_exit(id, ctx.clone()));
}

fn on_close(&self, id: Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter()
.for_each(|l| l.on_close(id.clone(), ctx.clone()));
}

fn on_id_change(&self, old: &Id, new: &Id, ctx: tracing_subscriber::layer::Context<'_, S>) {
self.iter()
.for_each(|l| l.on_id_change(old, new, ctx.clone()));
}
}
130 changes: 82 additions & 48 deletions mediarepo-daemon/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ use tracing::Level;
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_flame::FlameLayer;
use tracing_log::LogTracer;
use tracing_subscriber::{
fmt::{self},
Layer, Registry,
};
use tracing_subscriber::filter::{self, Targets};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{
fmt::{self},
Layer, Registry,
};

use mediarepo_core::settings::LoggingSettings;
use mediarepo_core::tracing_layer_list::DynLayerList;

#[allow(dyn_drop)]
pub type DropGuard = Box<dyn Drop>;
Expand All @@ -25,39 +26,60 @@ pub fn init_tracing(repo_path: &PathBuf, log_cfg: &LoggingSettings) -> Vec<DropG
LogTracer::init().expect("failed to subscribe to log entries");
let log_path = repo_path.join("logs");
let mut guards = Vec::new();
let mut layer_list = DynLayerList::new();

if !log_path.exists() {
fs::create_dir(&log_path).expect("failed to create directory for log files");
}
let (stdout_writer, guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(Box::new(guard) as DropGuard);

let stdout_layer = fmt::layer()
.with_thread_names(false)
.with_target(true)
.with_writer(stdout_writer)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(
std::env::var("RUST_LOG")
.unwrap_or(String::from("info,sqlx=warn"))
.parse::<filter::Targets>()
.unwrap_or(
filter::Targets::new()
.with_default(Level::INFO)
.with_target("sqlx", Level::WARN),
),
);
add_stdout_layer(&mut guards, &mut layer_list);
add_sql_layer(log_cfg, &log_path, &mut guards, &mut layer_list);
add_bromine_layer(log_cfg, &log_path, &mut guards, &mut layer_list);
add_app_log_layer(log_cfg, &log_path, &mut guards, &mut layer_list);

let (sql_writer, guard) = get_sql_log_writer(&log_path);
let tokio_console_enabled = std::env::var("TOKIO_CONSOLE")
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false);

if tokio_console_enabled {
add_tokio_console_layer(&mut layer_list);
}

let registry = Registry::default().with(layer_list);
tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing");

guards
}

fn add_tokio_console_layer(layer_list: &mut DynLayerList<Registry>) {
let console_layer = ConsoleLayer::builder().with_default_env().spawn();
layer_list.add(console_layer);
}

fn add_app_log_layer(
log_cfg: &LoggingSettings,
log_path: &PathBuf,
guards: &mut Vec<DropGuard>,
layer_list: &mut DynLayerList<Registry>,
) {
let (app_log_writer, guard) = get_application_log_writer(&log_path);
guards.push(Box::new(guard) as DropGuard);

let sql_layer = fmt::layer()
.with_writer(sql_writer)
let app_log_layer = fmt::layer()
.with_writer(app_log_writer)
.pretty()
.with_ansi(false)
.with_span_events(FmtSpan::NONE)
.with_filter(get_sql_targets(log_cfg.trace_sql));
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(get_app_targets(log_cfg.level.clone().into()));
layer_list.add(app_log_layer);
}

fn add_bromine_layer(
log_cfg: &LoggingSettings,
log_path: &PathBuf,
guards: &mut Vec<DropGuard>,
layer_list: &mut DynLayerList<Registry>,
) {
let (bromine_writer, guard) = get_bromine_log_writer(&log_path);
guards.push(Box::new(guard) as DropGuard);

Expand All @@ -67,36 +89,48 @@ pub fn init_tracing(repo_path: &PathBuf, log_cfg: &LoggingSettings) -> Vec<DropG
.with_ansi(false)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(get_bromine_targets(log_cfg.trace_api_calls));
layer_list.add(bromine_layer);
}

let (app_log_writer, guard) = get_application_log_writer(&log_path);
fn add_sql_layer(
log_cfg: &LoggingSettings,
log_path: &PathBuf,
guards: &mut Vec<DropGuard>,
layer_list: &mut DynLayerList<Registry>,
) {
let (sql_writer, guard) = get_sql_log_writer(&log_path);
guards.push(Box::new(guard) as DropGuard);

let app_log_layer = fmt::layer()
.with_writer(app_log_writer)
let sql_layer = fmt::layer()
.with_writer(sql_writer)
.pretty()
.with_ansi(false)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(get_app_targets(log_cfg.level.clone().into()));

let registry = Registry::default()
.with(stdout_layer)
.with(sql_layer)
.with(bromine_layer)
.with(app_log_layer);
.with_span_events(FmtSpan::NONE)
.with_filter(get_sql_targets(log_cfg.trace_sql));

let tokio_console_enabled = std::env::var("TOKIO_CONSOLE")
.map(|v| v.eq_ignore_ascii_case("true"))
.unwrap_or(false);
layer_list.add(sql_layer);
}

if tokio_console_enabled {
let console_layer = ConsoleLayer::builder().with_default_env().spawn();
let registry = registry.with(console_layer);
tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing");
} else {
tracing::subscriber::set_global_default(registry).expect("Failed to initialize tracing");
}
fn add_stdout_layer(guards: &mut Vec<DropGuard>, layer_list: &mut DynLayerList<Registry>) {
let (stdout_writer, guard) = tracing_appender::non_blocking(std::io::stdout());
guards.push(Box::new(guard) as DropGuard);

guards
let stdout_layer = fmt::layer()
.with_thread_names(false)
.with_target(true)
.with_writer(stdout_writer)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(
std::env::var("RUST_LOG")
.unwrap_or(String::from("info,sqlx=warn"))
.parse::<filter::Targets>()
.unwrap_or(
filter::Targets::new()
.with_default(Level::INFO)
.with_target("sqlx", Level::WARN),
),
);
layer_list.add(stdout_layer);
}

fn get_sql_log_writer(log_path: &PathBuf) -> (NonBlocking, WorkerGuard) {
Expand Down

0 comments on commit aa772ea

Please sign in to comment.