Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --log-format json #948

Merged
merged 4 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ base64 = "0.13.0"
metrics = "0.20.0"
metrics-exporter-prometheus = "0.11.0"
tracing = { version = "0.1.15", features = ["release_max_level_info"] }
tracing-subscriber = { version = "0.3.1", features = ["env-filter"] }
tracing-subscriber = { version = "0.3.1", features = ["env-filter", "json"] }
tracing-appender = "0.2.0"
hyper = { version = "0.14.14", features = ["server"] }
halfbrown = "0.1.11"
Expand Down
3 changes: 3 additions & 0 deletions shotover-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
//! * [`transforms::TransformsConfig`], the enum to register with (add a variant) for configuring your own transform.

#![allow(clippy::derive_partial_eq_without_eq)]
// Accidentally printing would break json log output
#![deny(clippy::print_stdout)]
#![deny(clippy::print_stderr)]

pub mod codec;
pub mod config;
Expand Down
34 changes: 14 additions & 20 deletions shotover-proxy/src/observability/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::runner::ReloadHandle;
use anyhow::{anyhow, Result};
use bytes::Bytes;
use hyper::{
Expand All @@ -9,27 +10,20 @@ use std::convert::Infallible;
use std::str;
use std::{net::SocketAddr, sync::Arc};
use tracing::{error, trace};
use tracing_subscriber::reload::Handle;
use tracing_subscriber::EnvFilter;

/// Exports metrics over HTTP.
pub struct LogFilterHttpExporter<S> {
pub struct LogFilterHttpExporter {
recorder_handle: PrometheusHandle,
address: SocketAddr,
tracing_handle: Handle<EnvFilter, S>,
tracing_handle: ReloadHandle,
}

/// Sets the `tracing_suscriber` filter level to the value of `bytes` on `handle`
fn set_filter<S>(bytes: Bytes, handle: &Handle<EnvFilter, S>) -> Result<(), String>
where
S: tracing::Subscriber + 'static,
{
let body = str::from_utf8(bytes.as_ref()).map_err(|e| format!("{e}"))?;
fn set_filter(bytes: Bytes, handle: &ReloadHandle) -> Result<()> {
let body = str::from_utf8(bytes.as_ref())?;
trace!(request.body = ?body);
let new_filter = body
.parse::<tracing_subscriber::filter::EnvFilter>()
.map_err(|e| format!("{e}"))?;
handle.reload(new_filter).map_err(|e| format!("{e}"))
let new_filter = body.parse::<tracing_subscriber::filter::EnvFilter>()?;
handle.reload(new_filter)
}

fn rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body> {
Expand All @@ -39,17 +33,14 @@ fn rsp(status: StatusCode, body: impl Into<Body>) -> Response<Body> {
.expect("builder with known status code must not fail")
}

impl<S> LogFilterHttpExporter<S>
where
S: tracing::Subscriber + 'static,
{
impl LogFilterHttpExporter {
/// Creates a new [`LogFilterHttpExporter`] that listens on the given `address`.
///
/// Observers expose their output by being converted into strings.
pub fn new(
recorder_handle: PrometheusHandle,
address: SocketAddr,
tracing_handle: Handle<EnvFilter, S>,
tracing_handle: ReloadHandle,
) -> Self {
LogFilterHttpExporter {
recorder_handle,
Expand Down Expand Up @@ -89,8 +80,11 @@ where
match hyper::body::to_bytes(req).await {
Ok(body) => match set_filter(body, &tracing_handle) {
Err(error) => {
error!(%error, "setting filter failed!");
rsp(StatusCode::INTERNAL_SERVER_ERROR, error)
error!(?error, "setting filter failed!");
rsp(
StatusCode::INTERNAL_SERVER_ERROR,
format!("{:?}", error),
)
}
Ok(()) => rsp(StatusCode::NO_CONTENT, Body::empty()),
},
Expand Down
87 changes: 66 additions & 21 deletions shotover-proxy/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::filter::Directive;
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::fmt::format::DefaultFields;
use tracing_subscriber::fmt::format::Format;
use tracing_subscriber::fmt::format::Full;
use tracing_subscriber::fmt::format::Json;
use tracing_subscriber::fmt::format::JsonFields;
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::layer::Layered;
use tracing_subscriber::reload::Handle;
Expand All @@ -35,6 +39,15 @@ pub struct ConfigOpts {
// 2,097,152 = 2 * 1024 * 1024 (2MiB)
#[clap(long, default_value = "2097152")]
pub stack_size: usize,

#[arg(long, value_enum, default_value = "human")]
pub log_format: LogFormat,
}

#[derive(clap::ValueEnum, Clone)]
pub enum LogFormat {
Human,
Json,
}

impl Default for ConfigOpts {
Expand All @@ -44,6 +57,7 @@ impl Default for ConfigOpts {
config_file: "config/config.yaml".into(),
core_threads: 4,
stack_size: 2097152,
log_format: LogFormat::Human,
}
}
}
Expand All @@ -61,7 +75,7 @@ impl Runner {
let config = Config::from_file(params.config_file)?;
let topology = Topology::from_file(params.topology_file)?;

let tracing = TracingState::new(config.main_log_level.as_str())?;
let tracing = TracingState::new(config.main_log_level.as_str(), params.log_format)?;

let (runtime_handle, runtime) = Runner::get_runtime(params.stack_size, params.core_threads);

Expand Down Expand Up @@ -150,13 +164,10 @@ impl Runner {
}
}

type TracingStateHandle =
Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>;

struct TracingState {
/// Once this is dropped tracing logs are ignored
guard: WorkerGuard,
handle: TracingStateHandle,
handle: ReloadHandle,
}

/// Returns a new `EnvFilter` by parsing each directive string, or an error if any directive is invalid.
Expand All @@ -181,28 +192,62 @@ fn try_parse_log_directives(directives: &[Option<&str>]) -> Result<EnvFilter> {
}

impl TracingState {
fn new(log_level: &str) -> Result<Self> {
fn new(log_level: &str, format: LogFormat) -> Result<Self> {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let builder = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter({
// Load log directives from shotover config and then from the RUST_LOG env var, with the latter taking priority.
// In the future we might be able to simplify the implementation if work is done on tokio-rs/tracing#1466.
let overrides = env::var(EnvFilter::DEFAULT_ENV).ok();
try_parse_log_directives(&[Some(log_level), overrides.as_deref()])?
})
.with_filter_reloading();
let handle = builder.reload_handle();

// To avoid unit tests that run in the same excutable from blowing up when they try to reinitialize tracing we ignore the result returned by try_init.
// Currently the implementation of try_init will only fail when it is called multiple times.
builder.try_init().ok();
// Load log directives from shotover config and then from the RUST_LOG env var, with the latter taking priority.
// In the future we might be able to simplify the implementation if work is done on tokio-rs/tracing#1466.
let overrides = env::var(EnvFilter::DEFAULT_ENV).ok();
let env_filter = try_parse_log_directives(&[Some(log_level), overrides.as_deref()])?;

let handle = match format {
LogFormat::Json => {
let builder = tracing_subscriber::fmt()
.json()
.with_writer(non_blocking)
.with_env_filter(env_filter)
.with_filter_reloading();
let handle = ReloadHandle::Json(builder.reload_handle());
// To avoid unit tests that run in the same excutable from blowing up when they try to reinitialize tracing we ignore the result returned by try_init.
// Currently the implementation of try_init will only fail when it is called multiple times.
builder.try_init().ok();
handle
}
LogFormat::Human => {
let builder = tracing_subscriber::fmt()
.with_writer(non_blocking)
.with_env_filter(env_filter)
.with_filter_reloading();
let handle = ReloadHandle::Human(builder.reload_handle());
builder.try_init().ok();
handle
}
};

Ok(TracingState { guard, handle })
}
}

type Formatter<A, B> = Layered<Layer<Registry, A, Format<B>, NonBlocking>, Registry>;

// TODO: We will be able to remove this and just directly use the handle once tracing 0.2 is released. See:
// * https://github.com/tokio-rs/tracing/pull/1035
// * https://github.com/linkerd/linkerd2-proxy/blob/6c484f6dcdeebda18b68c800b4494263bf98fcdc/linkerd/app/core/src/trace.rs#L19-L36
#[derive(Clone)]
pub enum ReloadHandle {
Json(Handle<EnvFilter, Formatter<JsonFields, Json>>),
Human(Handle<EnvFilter, Formatter<DefaultFields, Full>>),
}

impl ReloadHandle {
pub fn reload(&self, filter: EnvFilter) -> Result<()> {
match self {
ReloadHandle::Json(handle) => handle.reload(filter).map_err(|e| anyhow!(e)),
ReloadHandle::Human(handle) => handle.reload(filter).map_err(|e| anyhow!(e)),
}
}
}

pub struct RunnerSpawned {
pub runtime: Option<Runtime>,
pub runtime_handle: RuntimeHandle,
Expand Down