Skip to content

Commit

Permalink
Add filters for emitting Tracing diagnostics (#656)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkw authored Jul 17, 2020
1 parent 2947680 commit 8377282
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ tokio-rustls = { version = "0.13.1", optional = true }

[dev-dependencies]
pretty_env_logger = "0.4"
tracing-subscriber = "0.2.7"
tracing-log = "0.1"
serde_derive = "1.0"
handlebars = "3.0.0"
tokio = { version = "0.2", features = ["macros"] }
Expand Down
4 changes: 4 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ Hooray! `warp` also includes built-in support for WebSockets
### Autoreloading

- [`autoreload.rs`](./autoreload.rs) - Change some code and watch the server reload automatically!

### Debugging

- [`tracing.rs`](./tracing.rs) - Warp has built-in support for rich diagnostics with [`tracing`](https://docs.rs/tracing)!
59 changes: 59 additions & 0 deletions examples/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! [`tracing`] is a framework for instrumenting Rust programs to
//! collect scoped, structured, and async-aware diagnostics. This example
//! demonstrates how the `warp::trace` module can be used to instrument `warp`
//! applications with `tracing`.
//!
//! [`tracing`]: https://crates.io/tracing
#![deny(warnings)]
use tracing_subscriber::fmt::format::FmtSpan;
use warp::Filter;

#[tokio::main]
async fn main() {
// Filter traces based on the RUST_LOG env var, or, if it's not set,
// default to show the output of the example.
let filter = std::env::var("RUST_LOG").unwrap_or_else(|_| "tracing=info,warp=debug".to_owned());

// Configure the default `tracing` subscriber.
// The `fmt` subscriber from the `tracing-subscriber` crate logs `tracing`
// events to stdout. Other subscribers are available for integrating with
// distributed tracing systems such as OpenTelemetry.
tracing_subscriber::fmt()
// Use the filter we built above to determine which traces to record.
.with_env_filter(filter)
// Record an event when each span closes. This can be used to time our
// routes' durations!
.with_span_events(FmtSpan::CLOSE)
.init();

let hello = warp::path("hello")
.and(warp::get())
// When the `hello` route is called, emit a `tracing` event.
.map(|| {
tracing::info!("saying hello...");
"Hello, World!"
})
// Wrap the route in a `tracing` span to add the route's name as context
// to any events that occur inside it.
.with(warp::trace::named("hello"));

let goodbye = warp::path("goodbye")
.and(warp::get())
.map(|| {
tracing::info!("saying goodbye...");
"So long and thanks for all the fish!"
})
// We can also provide our own custom `tracing` spans to wrap a route.
.with(warp::trace(|info| {
// Construct our own custom span for this route.
tracing::info_span!("goodbye", req.path = ?info.path())
}));

let routes = hello
.or(goodbye)
// Wrap all the routes with a filter that creates a `tracing` span for
// each request we receive, including data about the request.
.with(warp::trace::request());

warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
1 change: 1 addition & 0 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod path;
pub mod query;
pub mod reply;
pub mod sse;
pub mod trace;
#[cfg(feature = "websocket")]
pub mod ws;

Expand Down
308 changes: 308 additions & 0 deletions src/filters/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
//! [`tracing`] filters.
//!
//! [`tracing`] is a framework for instrumenting Rust programs to
//! collect scoped, structured, and async-aware diagnostics. This module
//! provides a set of filters for instrumenting Warp applications with `tracing`
//! spans. [`Spans`] can be used to associate individual events with a request,
//! and track contexts through the application.
//!
//! [`tracing`]: https://crates.io/tracing
//! [`Spans`]: https://docs.rs/tracing/latest/tracing/#spans
use tracing::Span;

use std::net::SocketAddr;

use http::{self, header};

use crate::filter::{Filter, WrapSealed};
use crate::reject::IsReject;
use crate::reply::Reply;
use crate::route::Route;

use self::internal::WithTrace;

/// Create a wrapping filter that instruments every request with a `tracing`
/// [`Span`] at the [`INFO`] level, containing a summary of the request.
/// Additionally, if the [`DEBUG`] level is enabled, the span will contain an
/// event recording the request's headers.
///
/// # Example
///
/// ```
/// use warp::Filter;
///
/// let route = warp::any()
/// .map(warp::reply)
/// .with(warp::trace::request());
/// ```
///
/// [`Span`]: https://docs.rs/tracing/latest/tracing/#spans
/// [`INFO`]: https://docs.rs/tracing/0.1.16/tracing/struct.Level.html#associatedconstant.INFO
/// [`DEBUG`]: https://docs.rs/tracing/0.1.16/tracing/struct.Level.html#associatedconstant.DEBUG
pub fn request() -> Trace<impl Fn(Info) -> Span + Clone> {
use tracing::field::{display, Empty};
trace(|info: Info| {
let span = tracing::info_span!(
"request",
remote.addr = Empty,
method = %info.method(),
path = %info.path(),
version = ?info.route.version(),
referer = Empty,
);

// Record optional fields.
if let Some(remote_addr) = info.remote_addr() {
span.record("remote.addr", &display(remote_addr));
}

if let Some(referer) = info.referer() {
span.record("referer", &display(referer));
}

// The the headers are, potentially, quite long, so let's record them in
// an event within the generated span, rather than including them as
// context on *every* request.
tracing::debug!(parent: &span, headers = ?info.headers(), "received request");

span
})
}

/// Create a wrapping filter that instruments every request with a custom
/// `tracing` [`Span`] provided by a function.
///
///
/// # Example
///
/// ```
/// use warp::Filter;
///
/// let route = warp::any()
/// .map(warp::reply)
/// .with(warp::trace(|info| {
/// // Create a span using tracing macros
/// tracing::info_span!(
/// "request",
/// method = %info.method(),
/// path = %info.path(),
/// )
/// }));
/// ```
///
/// [`Span`]: https://docs.rs/tracing/latest/tracing/#spans
pub fn trace<F>(func: F) -> Trace<F>
where
F: Fn(Info) -> Span + Clone,
{
Trace { func }
}

/// Create a wrapping filter that instruments every request with a `tracing`
/// [`Span`] at the [`DEBUG`] level representing a named context.
///
/// This can be used to instrument multiple routes with their own sub-spans in a
/// per-request trace.
///
/// # Example
///
/// ```
/// use warp::Filter;
///
/// let hello = warp::path("hello")
/// .map(warp::reply)
/// .with(warp::trace::named("hello"));
///
/// let goodbye = warp::path("goodbye")
/// .map(warp::reply)
/// .with(warp::trace::named("goodbye"));
///
/// let routes = hello.or(goodbye);
/// ```
///
/// [`Span`]: https://docs.rs/tracing/latest/tracing/#spans
/// [`DEBUG`]: https://docs.rs/tracing/0.1.16/tracing/struct.Level.html#associatedconstant.DEBUG
pub fn named(name: &'static str) -> Trace<impl Fn(Info<'_>) -> Span + Copy> {
trace(move |_| tracing::debug_span!("context", "{}", name,))
}

/// Decorates a [`Filter`](crate::Filter) to create a [`tracing`] [span] for
/// requests and responses.
///
/// [`tracing`]: https://crates.io/tracing
/// [span]: https://docs.rs/tracing/latest/tracing/#spans
#[derive(Clone, Copy, Debug)]
pub struct Trace<F> {
func: F,
}

/// Information about the request/response that can be used to prepare log lines.
#[allow(missing_debug_implementations)]
pub struct Info<'a> {
route: &'a Route,
}

impl<FN, F> WrapSealed<F> for Trace<FN>
where
FN: Fn(Info) -> Span + Clone + Send,
F: Filter + Clone + Send,
F::Extract: Reply,
F::Error: IsReject,
{
type Wrapped = WithTrace<FN, F>;

fn wrap(&self, filter: F) -> Self::Wrapped {
WithTrace {
filter,
trace: self.clone(),
}
}
}

impl<'a> Info<'a> {
/// View the remote `SocketAddr` of the request.
pub fn remote_addr(&self) -> Option<SocketAddr> {
self.route.remote_addr()
}

/// View the `http::Method` of the request.
pub fn method(&self) -> &http::Method {
self.route.method()
}

/// View the URI path of the request.
pub fn path(&self) -> &str {
self.route.full_path()
}

/// View the `http::Version` of the request.
pub fn version(&self) -> http::Version {
self.route.version()
}

/// View the referer of the request.
pub fn referer(&self) -> Option<&str> {
self.route
.headers()
.get(header::REFERER)
.and_then(|v| v.to_str().ok())
}

/// View the user agent of the request.
pub fn user_agent(&self) -> Option<&str> {
self.route
.headers()
.get(header::USER_AGENT)
.and_then(|v| v.to_str().ok())
}

/// View the host of the request
pub fn host(&self) -> Option<&str> {
self.route
.headers()
.get(header::HOST)
.and_then(|v| v.to_str().ok())
}

/// View the request headers.
pub fn headers(&self) -> &http::HeaderMap {
self.route.headers()
}
}

mod internal {
use futures::{future::Inspect, future::MapOk, FutureExt, TryFutureExt};

use super::{Info, Trace};
use crate::filter::{Filter, FilterBase, Internal};
use crate::reject::IsReject;
use crate::reply::Reply;
use crate::reply::Response;
use crate::route;

#[allow(missing_debug_implementations)]
pub struct Traced(pub(super) Response);

impl Reply for Traced {
#[inline]
fn into_response(self) -> Response {
self.0
}
}

#[allow(missing_debug_implementations)]
#[derive(Clone, Copy)]
pub struct WithTrace<FN, F> {
pub(super) filter: F,
pub(super) trace: Trace<FN>,
}

use tracing::Span;
use tracing_futures::{Instrument, Instrumented};

fn finished_logger<E: IsReject>(reply: &Result<(Traced,), E>) {
match reply {
Ok((Traced(resp),)) => {
tracing::info!(target: "warp::filters::trace", status = resp.status().as_u16(), "finished processing with success");
}
Err(e) if e.status().is_server_error() => {
tracing::error!(
target: "warp::filters::trace",
status = e.status().as_u16(),
error = ?e,
"unable to process request (internal error)"
);
}
Err(e) if e.status().is_client_error() => {
tracing::warn!(
target: "warp::filters::trace",
status = e.status().as_u16(),
error = ?e,
"unable to serve request (client error)"
);
}
Err(e) => {
// Either informational or redirect
tracing::info!(
target: "warp::filters::trace",
status = e.status().as_u16(),
result = ?e,
"finished processing with status"
);
}
}
}

fn convert_reply<R: Reply>(reply: R) -> (Traced,) {
(Traced(reply.into_response()),)
}

impl<FN, F> FilterBase for WithTrace<FN, F>
where
FN: Fn(Info) -> Span + Clone + Send,
F: Filter + Clone + Send,
F::Extract: Reply,
F::Error: IsReject,
{
type Extract = (Traced,);
type Error = F::Error;
type Future = Instrumented<
Inspect<
MapOk<F::Future, fn(F::Extract) -> Self::Extract>,
fn(&Result<Self::Extract, F::Error>),
>,
>;

fn filter(&self, _: Internal) -> Self::Future {
let span = route::with(|route| (self.trace.func)(Info { route }));
let _entered = span.enter();

tracing::info!(target: "warp::filters::trace", "processing request");
self.filter
.filter(Internal)
.map_ok(convert_reply as fn(F::Extract) -> Self::Extract)
.inspect(finished_logger as fn(&Result<Self::Extract, F::Error>))
.instrument(span.clone())
}
}
}
Loading

0 comments on commit 8377282

Please sign in to comment.