Skip to content

Commit

Permalink
trace: Apache Common Log Format access logging (#1319)
Browse files Browse the repository at this point in the history
This branch builds on @tustvold's work in #601. The original PR
description from that branch:

> Access logging is very important functionality for my team as we wish
> to maintain feature-parity with our existing AWS ALB-based approach.
> This functionality was requested
> [here](linkerd/linkerd2#1913) and was marked
> as help wanted, so thought I'd take a stab at implementing it.
>
> Creating as a draft as still needs more testing and benchmarking, and
> I'm new to tower and so might have made some rookie errors. However, I
> wanted to create a draft as an opportunity to get some early feedback.
>
> The basic design consists of an AccessLogLayer that instruments both
> requests and responses that flow through it, in a similar manner to
> how handle_time is already computed. I'm not a massive fan of this,
> but it was the only way I could easily see to get accurate processing
> time metrics. I've tried to avoid any memory allocation on the hot
> path, although there are possibly more atomic increments than I would
> like. The performance impact with the feature disabled, i.e.
> LINKERD2_PROXY_ACCESS_LOG_FILE, not set should be minimal.
>
> The results of this instrumentation are then sent over a mpsc channel
> to an AccessLogCollector that writes them in a space-delimited format
> to a file specified as an environment variable. It buffers in memory
> and flushes on termination and on write if more than
> FLUSH_TIMEOUT_SECS since the last flush. This makes the access logging
> best effort much like AWS ALBs.
>
> An example deployment scenario using this functionality might deploy a
> fluent-bit sidecar to ship the logs, or write to /dev/stderr and use a
> log shipper deployed as a DaemonSet.

The additional changes in this branch are:

 - Updated against the latest state of the `main` branch.
 - Changed the `tracing` configuration to use per-layer filtering, so that
   the access log layer _only_ sees access log spans, while the stdout
   logging layer does not see the access log spans (although, it _could_
   if we wanted it to...)
 - Changed the format for outputting the access log to the Apache Common
   Log Format. Note that this format does *not* include all the data that
   the access log spans currently collect; I excluded that data so that
   the output is compatible with tools that ingest the Apache log format.
   In a follow-up PR, we can add the ability to control what format the
   access log is written in, and add an alternative format that includes
   all the access log data that Linkerd's spans can collect (I suggest
   newline-delimited JSON for this).

Of course, a huge thank you to @tustvold for all their work on this; I
only updated the branch with the latest changes and made some minor
improvements. :)

Co-authored-by: Raphael Taylor-Davies <r.taylordavies@googlemail.com>
  • Loading branch information
hawkw and tustvold authored Jan 20, 2022
1 parent 8f7be6f commit f5e9cea
Show file tree
Hide file tree
Showing 12 changed files with 466 additions and 13 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440"

[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"

[[package]]
name = "hyper"
version = "0.14.16"
Expand Down Expand Up @@ -867,6 +873,7 @@ dependencies = [
"libfuzzer-sys",
"linkerd-app-core",
"linkerd-app-test",
"linkerd-http-access-log",
"linkerd-io",
"linkerd-meshtls",
"linkerd-meshtls-rustls",
Expand Down Expand Up @@ -1061,6 +1068,22 @@ dependencies = [
"tokio",
]

[[package]]
name = "linkerd-http-access-log"
version = "0.1.0"
dependencies = [
"futures-core",
"http",
"humantime",
"linkerd-identity",
"linkerd-proxy-transport",
"linkerd-stack",
"linkerd-tls",
"linkerd-tracing",
"pin-project",
"tracing",
]

[[package]]
name = "linkerd-http-box"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ members = [
"linkerd/errno",
"linkerd/error-respond",
"linkerd/exp-backoff",
"linkerd/http-access-log",
"linkerd/http-box",
"linkerd/http-classify",
"linkerd/http-metrics",
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/inbound/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bytes = "1"
http = "0.2"
futures = { version = "0.3", default-features = false }
linkerd-app-core = { path = "../core" }
linkerd-http-access-log = { path = "../../http-access-log" }
linkerd-server-policy = { path = "../../server-policy" }
linkerd-tonic-watch = { path = "../../tonic-watch" }
linkerd2-proxy-api = { version = "0.3", features = ["client", "inbound"] }
Expand Down
7 changes: 5 additions & 2 deletions linkerd/app/inbound/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use linkerd_app_core::{
proxy::http,
svc::{self, ExtractParam, Param},
tls,
transport::OrigDstAddr,
transport::{ClientAddr, OrigDstAddr, Remote},
Error, Result,
};
use linkerd_http_access_log::NewAccessLog;
use tracing::debug_span;

#[derive(Copy, Clone, Debug)]
Expand All @@ -26,7 +27,8 @@ impl<H> Inbound<H> {
+ Param<http::normalize_uri::DefaultAuthority>
+ Param<tls::ConditionalServerTls>
+ Param<ServerLabel>
+ Param<OrigDstAddr>,
+ Param<OrigDstAddr>
+ Param<Remote<ClientAddr>>,
T: Clone + Send + Unpin + 'static,
I: io::AsyncRead + io::AsyncWrite + io::PeerAddr + Send + Unpin + 'static,
H: svc::NewService<T, Service = HSvc> + Clone + Send + Sync + Unpin + 'static,
Expand Down Expand Up @@ -79,6 +81,7 @@ impl<H> Inbound<H> {
.push(http::BoxResponse::layer()),
)
.check_new_service::<T, http::Request<_>>()
.push(NewAccessLog::layer())
.instrument(|t: &T| debug_span!("http", v = %Param::<Version>::param(t)))
.push(http::NewServeHttp::layer(h2_settings, rt.drain.clone()))
.push_on_service(svc::BoxService::layer())
Expand Down
19 changes: 19 additions & 0 deletions linkerd/http-access-log/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "linkerd-http-access-log"
version = "0.1.0"
authors = ["Linkerd Developers <cncf-linkerd-dev@lists.cncf.io>"]
license = "Apache-2.0"
edition = "2018"
publish = false

[dependencies]
futures-core = "0.3"
http = "0.2"
humantime = "2"
pin-project = "1"
linkerd-stack = { path = "../stack" }
linkerd-identity = { path = "../identity" }
linkerd-tls = { path = "../tls" }
linkerd-proxy-transport = { path = "../proxy/transport" }
linkerd-tracing = { path = "../tracing" }
tracing = "0.1.19"
207 changes: 207 additions & 0 deletions linkerd/http-access-log/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#![deny(warnings, rust_2018_idioms)]
#![forbid(unsafe_code)]

use futures_core::TryFuture;
use linkerd_identity as identity;
use linkerd_proxy_transport::{ClientAddr, Remote};
use linkerd_stack as svc;
use linkerd_tls as tls;
use linkerd_tracing::access_log::TRACE_TARGET;
use pin_project::pin_project;
use std::{
future::Future,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant, SystemTime},
};
use svc::{NewService, Param};
use tracing::{field, span, Level, Span};

#[derive(Clone, Debug)]
pub struct NewAccessLog<N> {
inner: N,
}

#[derive(Clone, Debug)]
pub struct AccessLogContext<S> {
inner: S,
client_addr: SocketAddr,
client_id: Option<identity::Name>,
}

struct ResponseFutureInner {
span: Span,
start: Instant,
processing: Duration,
}

#[pin_project]
pub struct AccessLogFuture<F> {
data: Option<ResponseFutureInner>,

#[pin]
inner: F,
}

impl<N> NewAccessLog<N> {
/// Returns a new `NewAccessLog` layer that wraps an inner service with
/// access logging middleware.
///
/// The access log is recorded by adding a `tracing` span to the service's
/// future. If access logging is not enabled by the `tracing` subscriber,
/// this span will never be enabled, and it can be skipped cheaply. When
/// access logging *is* enabled, additional data will be recorded when the
/// response future completes.
///
/// Recording the access log will introduce additional overhead in the
/// request path, but this is largely avoided when access logging is not
/// enabled.
#[inline]
pub fn layer() -> impl svc::layer::Layer<N, Service = Self> {
svc::layer::mk(|inner| NewAccessLog { inner })
}
}

impl<N, T> NewService<T> for NewAccessLog<N>
where
T: Param<tls::ConditionalServerTls> + Param<Remote<ClientAddr>>,
N: NewService<T>,
{
type Service = AccessLogContext<N::Service>;

fn new_service(&self, target: T) -> Self::Service {
let Remote(ClientAddr(client_addr)) = target.param();
let tls: tls::ConditionalServerTls = target.param();
let client_id = tls
.value()
.and_then(|tls| tls.client_id().map(|tls::ClientId(name)| name.clone()));
let inner = self.inner.new_service(target);
AccessLogContext {
inner,
client_addr,
client_id,
}
}
}

impl<S, B1, B2> svc::Service<http::Request<B1>> for AccessLogContext<S>
where
S: svc::Service<http::Request<B1>, Response = http::Response<B2>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = AccessLogFuture<S::Future>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, request: http::Request<B1>) -> Self::Future {
let get_header = |name: http::header::HeaderName| {
request
.headers()
.get(name)
.and_then(|x| x.to_str().ok())
.unwrap_or_default()
};

let trace_id = || {
let headers = request.headers();
headers
.get("x-b3-traceid")
.or_else(|| headers.get("x-request-id"))
.or_else(|| headers.get("x-amzn-trace-id"))
.and_then(|x| x.to_str().ok())
.unwrap_or_default()
};

let span = span!(target: TRACE_TARGET, Level::INFO, "http",
client.addr = %self.client_addr,
client.id = self.client_id.as_ref().map(|n| n.as_str()).unwrap_or("-"),
timestamp = %now(),
method = request.method().as_str(),
uri = %request.uri(),
version = ?request.version(),
trace_id = trace_id(),
request_bytes = get_header(http::header::CONTENT_LENGTH),
status = field::Empty,
response_bytes = field::Empty,
total_ns = field::Empty,
processing_ns = field::Empty,
user_agent = get_header(http::header::USER_AGENT),
host = get_header(http::header::HOST),
);

// The access log span is only enabled by the `tracing` subscriber if
// access logs are being recorded. If it's disabled, we can skip
// recording additional data in the response future.
if span.is_disabled() {
return AccessLogFuture {
data: None,
inner: self.inner.call(request),
};
}

AccessLogFuture {
data: Some(ResponseFutureInner {
span,
start: Instant::now(),
processing: Duration::from_secs(0),
}),
inner: self.inner.call(request),
}
}
}

impl<F, B2> Future for AccessLogFuture<F>
where
F: TryFuture<Ok = http::Response<B2>>,
{
type Output = Result<F::Ok, F::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

let data: &mut ResponseFutureInner = match &mut this.data {
Some(data) => data,
None => return this.inner.try_poll(cx),
};

let _enter = data.span.enter();
let poll_start = Instant::now();

let response: http::Response<B2> = match this.inner.try_poll(cx) {
Poll::Pending => {
data.processing += Instant::now().duration_since(poll_start);
return Poll::Pending;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(response)) => response,
};

let now = Instant::now();
let total_ns = now.duration_since(data.start).as_nanos();
let processing_ns = (now.duration_since(poll_start) + data.processing).as_nanos();

let span = &data.span;

response
.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|x| x.to_str().ok())
.map(|x| span.record("response_bytes", &x));

span.record("status", &response.status().as_u16());
span.record("total_ns", &field::display(total_ns));
span.record("processing_ns", &field::display(processing_ns));

Poll::Ready(Ok(response))
}
}

#[inline]
fn now() -> humantime::Rfc3339Timestamp {
humantime::format_rfc3339(SystemTime::now())
}
2 changes: 1 addition & 1 deletion linkerd/http-retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ mod tests {
tx: Tx(tx),
initial,
replay,
_trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug"),
_trace: linkerd_tracing::test::with_default_filter("linkerd_http_retry=debug").0,
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions linkerd/tls/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,17 @@ impl fmt::Display for NoServerTls {
}
}

// === impl ServerTls ===

impl ServerTls {
pub fn client_id(&self) -> Option<&ClientId> {
match self {
ServerTls::Established { ref client_id, .. } => client_id.as_ref(),
_ => None,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
3 changes: 2 additions & 1 deletion linkerd/tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ tracing-log = "0.1.2"

[dependencies.tracing-subscriber]
version = "0.3"
features = ["env-filter","smallvec", "tracing-log", "json", "parking_lot"]
default-features = false
features = ["env-filter", "fmt", "smallvec", "tracing-log", "json", "parking_lot", "registry"]

Loading

0 comments on commit f5e9cea

Please sign in to comment.