diff --git a/Cargo.lock b/Cargo.lock index b3931437e..e3175d520 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3361,6 +3361,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.16" @@ -3371,12 +3381,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index cce09973d..dd8d8f8e7 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -57,7 +57,7 @@ base64 = "0.13.0" metrics = "0.20.0" metrics-exporter-prometheus = "0.11.0" tracing = { version = "0.1.15", features = ["release_max_level_info"] } -tracing-subscriber = { version = "0.3.1", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.1", features = ["env-filter", "json"] } tracing-appender = "0.2.0" hyper = { version = "0.14.14", features = ["server"] } halfbrown = "0.1.11" diff --git a/shotover-proxy/src/lib.rs b/shotover-proxy/src/lib.rs index 3fafa3779..5fa16d016 100644 --- a/shotover-proxy/src/lib.rs +++ b/shotover-proxy/src/lib.rs @@ -18,6 +18,9 @@ //! * [`transforms::TransformsConfig`], the enum to register with (add a variant) for configuring your own transform. #![allow(clippy::derive_partial_eq_without_eq)] +// Accidentally printing would break json log output +#![deny(clippy::print_stdout)] +#![deny(clippy::print_stderr)] pub mod codec; pub mod config; diff --git a/shotover-proxy/src/observability/mod.rs b/shotover-proxy/src/observability/mod.rs index 5b5fa47d5..f128d457b 100644 --- a/shotover-proxy/src/observability/mod.rs +++ b/shotover-proxy/src/observability/mod.rs @@ -1,3 +1,4 @@ +use crate::runner::ReloadHandle; use anyhow::{anyhow, Result}; use bytes::Bytes; use hyper::{ @@ -9,27 +10,20 @@ use std::convert::Infallible; use std::str; use std::{net::SocketAddr, sync::Arc}; use tracing::{error, trace}; -use tracing_subscriber::reload::Handle; -use tracing_subscriber::EnvFilter; /// Exports metrics over HTTP. -pub struct LogFilterHttpExporter { +pub struct LogFilterHttpExporter { recorder_handle: PrometheusHandle, address: SocketAddr, - tracing_handle: Handle, + tracing_handle: ReloadHandle, } /// Sets the `tracing_suscriber` filter level to the value of `bytes` on `handle` -fn set_filter(bytes: Bytes, handle: &Handle) -> Result<(), String> -where - S: tracing::Subscriber + 'static, -{ - let body = str::from_utf8(bytes.as_ref()).map_err(|e| format!("{e}"))?; +fn set_filter(bytes: Bytes, handle: &ReloadHandle) -> Result<()> { + let body = str::from_utf8(bytes.as_ref())?; trace!(request.body = ?body); - let new_filter = body - .parse::() - .map_err(|e| format!("{e}"))?; - handle.reload(new_filter).map_err(|e| format!("{e}")) + let new_filter = body.parse::()?; + handle.reload(new_filter) } fn rsp(status: StatusCode, body: impl Into) -> Response { @@ -39,17 +33,14 @@ fn rsp(status: StatusCode, body: impl Into) -> Response { .expect("builder with known status code must not fail") } -impl LogFilterHttpExporter -where - S: tracing::Subscriber + 'static, -{ +impl LogFilterHttpExporter { /// Creates a new [`LogFilterHttpExporter`] that listens on the given `address`. /// /// Observers expose their output by being converted into strings. pub fn new( recorder_handle: PrometheusHandle, address: SocketAddr, - tracing_handle: Handle, + tracing_handle: ReloadHandle, ) -> Self { LogFilterHttpExporter { recorder_handle, @@ -89,8 +80,11 @@ where match hyper::body::to_bytes(req).await { Ok(body) => match set_filter(body, &tracing_handle) { Err(error) => { - error!(%error, "setting filter failed!"); - rsp(StatusCode::INTERNAL_SERVER_ERROR, error) + error!(?error, "setting filter failed!"); + rsp( + StatusCode::INTERNAL_SERVER_ERROR, + format!("{:?}", error), + ) } Ok(()) => rsp(StatusCode::NO_CONTENT, Body::empty()), }, diff --git a/shotover-proxy/src/runner.rs b/shotover-proxy/src/runner.rs index 3ab9bd236..889c140b1 100644 --- a/shotover-proxy/src/runner.rs +++ b/shotover-proxy/src/runner.rs @@ -15,7 +15,11 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info}; use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; use tracing_subscriber::filter::Directive; -use tracing_subscriber::fmt::format::{DefaultFields, Format}; +use tracing_subscriber::fmt::format::DefaultFields; +use tracing_subscriber::fmt::format::Format; +use tracing_subscriber::fmt::format::Full; +use tracing_subscriber::fmt::format::Json; +use tracing_subscriber::fmt::format::JsonFields; use tracing_subscriber::fmt::Layer; use tracing_subscriber::layer::Layered; use tracing_subscriber::reload::Handle; @@ -35,6 +39,15 @@ pub struct ConfigOpts { // 2,097,152 = 2 * 1024 * 1024 (2MiB) #[clap(long, default_value = "2097152")] pub stack_size: usize, + + #[arg(long, value_enum, default_value = "human")] + pub log_format: LogFormat, +} + +#[derive(clap::ValueEnum, Clone)] +pub enum LogFormat { + Human, + Json, } impl Default for ConfigOpts { @@ -44,6 +57,7 @@ impl Default for ConfigOpts { config_file: "config/config.yaml".into(), core_threads: 4, stack_size: 2097152, + log_format: LogFormat::Human, } } } @@ -61,7 +75,7 @@ impl Runner { let config = Config::from_file(params.config_file)?; let topology = Topology::from_file(params.topology_file)?; - let tracing = TracingState::new(config.main_log_level.as_str())?; + let tracing = TracingState::new(config.main_log_level.as_str(), params.log_format)?; let (runtime_handle, runtime) = Runner::get_runtime(params.stack_size, params.core_threads); @@ -150,13 +164,10 @@ impl Runner { } } -type TracingStateHandle = - Handle, Registry>>; - struct TracingState { /// Once this is dropped tracing logs are ignored guard: WorkerGuard, - handle: TracingStateHandle, + handle: ReloadHandle, } /// Returns a new `EnvFilter` by parsing each directive string, or an error if any directive is invalid. @@ -181,28 +192,62 @@ fn try_parse_log_directives(directives: &[Option<&str>]) -> Result { } impl TracingState { - fn new(log_level: &str) -> Result { + fn new(log_level: &str, format: LogFormat) -> Result { let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout()); - let builder = tracing_subscriber::fmt() - .with_writer(non_blocking) - .with_env_filter({ - // Load log directives from shotover config and then from the RUST_LOG env var, with the latter taking priority. - // In the future we might be able to simplify the implementation if work is done on tokio-rs/tracing#1466. - let overrides = env::var(EnvFilter::DEFAULT_ENV).ok(); - try_parse_log_directives(&[Some(log_level), overrides.as_deref()])? - }) - .with_filter_reloading(); - let handle = builder.reload_handle(); - - // To avoid unit tests that run in the same excutable from blowing up when they try to reinitialize tracing we ignore the result returned by try_init. - // Currently the implementation of try_init will only fail when it is called multiple times. - builder.try_init().ok(); + // Load log directives from shotover config and then from the RUST_LOG env var, with the latter taking priority. + // In the future we might be able to simplify the implementation if work is done on tokio-rs/tracing#1466. + let overrides = env::var(EnvFilter::DEFAULT_ENV).ok(); + let env_filter = try_parse_log_directives(&[Some(log_level), overrides.as_deref()])?; + + let handle = match format { + LogFormat::Json => { + let builder = tracing_subscriber::fmt() + .json() + .with_writer(non_blocking) + .with_env_filter(env_filter) + .with_filter_reloading(); + let handle = ReloadHandle::Json(builder.reload_handle()); + // To avoid unit tests that run in the same excutable from blowing up when they try to reinitialize tracing we ignore the result returned by try_init. + // Currently the implementation of try_init will only fail when it is called multiple times. + builder.try_init().ok(); + handle + } + LogFormat::Human => { + let builder = tracing_subscriber::fmt() + .with_writer(non_blocking) + .with_env_filter(env_filter) + .with_filter_reloading(); + let handle = ReloadHandle::Human(builder.reload_handle()); + builder.try_init().ok(); + handle + } + }; Ok(TracingState { guard, handle }) } } +type Formatter = Layered, NonBlocking>, Registry>; + +// TODO: We will be able to remove this and just directly use the handle once tracing 0.2 is released. See: +// * https://github.com/tokio-rs/tracing/pull/1035 +// * https://github.com/linkerd/linkerd2-proxy/blob/6c484f6dcdeebda18b68c800b4494263bf98fcdc/linkerd/app/core/src/trace.rs#L19-L36 +#[derive(Clone)] +pub enum ReloadHandle { + Json(Handle>), + Human(Handle>), +} + +impl ReloadHandle { + pub fn reload(&self, filter: EnvFilter) -> Result<()> { + match self { + ReloadHandle::Json(handle) => handle.reload(filter).map_err(|e| anyhow!(e)), + ReloadHandle::Human(handle) => handle.reload(filter).map_err(|e| anyhow!(e)), + } + } +} + pub struct RunnerSpawned { pub runtime: Option, pub runtime_handle: RuntimeHandle,