Skip to content

Commit

Permalink
Expose proxy errors as metrics
Browse files Browse the repository at this point in the history
The proxy handles errors by synthesizing responses; however, depending
on where such an error occurs, it may not be recorded by a metrics
layer.

This change introduces an error_total counter that records the
number/kind of errors encountered. This is immediately helpful for
debugging and diagnostics, but could also be helpful to view in the UI.
We may want to revisit this metric, especialy its labels, before relying
on it more prominently. But in the meantime it's extraordinarily useful
as it is.
  • Loading branch information
olix0r committed Feb 18, 2020
1 parent b2d3ed8 commit 6f5ac1a
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 5 deletions.
17 changes: 15 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ checksum = "b486ce3ccf7ffd79fdeb678eac06a9e6c09fc88d33836340becb8fffe87c5e33"

[[package]]
name = "chrono"
version = "0.4.10"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31850b4a4d6bae316f7a09e691c944c28299298837edc0a03f755618c23cbc01"
checksum = "e8493056968583b0193c1bb04d6f7684586f3726992d6c573261941a895dbd68"
dependencies = [
"libc",
"num-integer",
"num-traits 0.2.6",
"time",
Expand Down Expand Up @@ -628,6 +629,7 @@ dependencies = [
"linkerd2-drain",
"linkerd2-duplex",
"linkerd2-error",
"linkerd2-error-metrics",
"linkerd2-error-respond",
"linkerd2-exp-backoff",
"linkerd2-fallback",
Expand Down Expand Up @@ -786,6 +788,17 @@ dependencies = [
"futures",
]

[[package]]
name = "linkerd2-error-metrics"
version = "0.1.0"
dependencies = [
"futures",
"indexmap",
"linkerd2-error",
"linkerd2-metrics",
"tower",
]

[[package]]
name = "linkerd2-error-respond"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ linkerd2-dns = { path = "../../dns" }
linkerd2-drain = { path = "../../drain" }
linkerd2-duplex = { path = "../../duplex" }
linkerd2-error = { path = "../../error" }
linkerd2-error-metrics = { path = "../../error-metrics" }
linkerd2-error-respond = { path = "../../error-respond" }
linkerd2-exp-backoff = { path = "../../exp-backoff" }
linkerd2-fallback = { path = "../../fallback" }
Expand Down
75 changes: 74 additions & 1 deletion linkerd/app/core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::proxy::{buffer, identity};
use http::{header::HeaderValue, StatusCode};
use linkerd2_error::Error;
use linkerd2_error_metrics as metrics;
use linkerd2_error_respond as respond;
use linkerd2_proxy_http::HasH2Reason;
use linkerd2_router::error as router;
Expand All @@ -12,6 +13,26 @@ pub fn layer<B: Default>() -> respond::RespondLayer<NewRespond<B>> {
respond::RespondLayer::new(NewRespond(std::marker::PhantomData))
}

#[derive(Clone, Default)]
pub struct Metrics(metrics::Registry<Label>);

pub type MetricsLayer = metrics::RecordErrorLayer<LabelError, Label>;

/// Error metric labels.
#[derive(Copy, Clone, Debug)]
pub struct LabelError(super::metric_labels::Direction);

pub type Label = (super::metric_labels::Direction, Reason);

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum Reason {
CacheFull,
DispatchTimeout,
IdentityRequired,
LoadShed,
Unexpected,
}

#[derive(Debug)]
pub struct NewRespond<B>(std::marker::PhantomData<fn() -> B>);

Expand Down Expand Up @@ -68,7 +89,7 @@ impl<B: Default> respond::Respond for Respond<B> {
debug!(?code, "Handling error with gRPC status");
return Ok(rsp);
}
};
}

let version = match self {
Respond::Http1(ref version, _) => version.clone(),
Expand Down Expand Up @@ -193,3 +214,55 @@ impl std::fmt::Display for IdentityRequired {
}

impl std::error::Error for IdentityRequired {}

impl metrics::LabelError<Error> for LabelError {
type Labels = Label;

fn label_error(&self, err: &Error) -> Self::Labels {
let reason = if err.is::<router::NoCapacity>() {
Reason::CacheFull
} else if err.is::<shed::Overloaded>() {
Reason::LoadShed
} else if err.is::<buffer::Aborted>() {
Reason::DispatchTimeout
} else if err.is::<IdentityRequired>() {
Reason::IdentityRequired
} else {
Reason::Unexpected
};

(self.0, reason)
}
}

impl metrics::FmtLabels for Reason {
fn fmt_labels(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"message=\"{}\"",
match self {
Reason::CacheFull => "router full",
Reason::LoadShed => "load shed",
Reason::DispatchTimeout => "dispatch timeout",
Reason::IdentityRequired => "identity required",
Reason::Unexpected => "unexpected",
}
)
}
}

impl Metrics {
pub fn inbound(&self) -> MetricsLayer {
self.0
.layer(LabelError(super::metric_labels::Direction::In))
}

pub fn outbound(&self) -> MetricsLayer {
self.0
.layer(LabelError(super::metric_labels::Direction::Out))
}

pub fn report(&self) -> metrics::Registry<Label> {
self.0.clone()
}
}
1 change: 1 addition & 0 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,6 @@ pub struct ProxyMetrics {
pub http_route_actual: HttpRouteMetrics,
pub http_route_retry: HttpRouteRetry,
pub http_endpoint: HttpEndpointMetrics,
pub http_errors: errors::MetricsLayer,
pub transport: transport::Metrics,
}
1 change: 1 addition & 0 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl<A: OrigDstAddr> Config<A> {
.push(insert::layer(move || {
DispatchDeadline::after(buffer.dispatch_timeout)
}))
.push_per_make(metrics.http_errors)
.push_per_make(errors::layer())
.push(trace::layer(|src: &tls::accept::Meta| {
info_span!(
Expand Down
1 change: 1 addition & 0 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl<A: OrigDstAddr> Config<A> {
DispatchDeadline::after(buffer.dispatch_timeout)
}))
.push(http::insert::target::layer())
.push_per_make(metrics.http_errors)
.push_per_make(errors::layer())
.push(trace::layer(
|src: &tls::accept::Meta| info_span!("source", target.addr = %src.addrs.target_addr()),
Expand Down
9 changes: 7 additions & 2 deletions linkerd/app/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub use linkerd2_app_core::{
classify::Class,
handle_time, http_metrics as metrics,
errors, handle_time, http_metrics as metrics,
metric_labels::{ControlLabels, EndpointLabels, RouteLabels},
metrics::FmtMetrics,
opencensus, proxy, telemetry, transport, ControlHttpMetrics, ProxyMetrics,
Expand Down Expand Up @@ -51,6 +51,8 @@ impl Metrics {
(m, r)
};

let http_errors = errors::Metrics::default();

let handle_time_report = handle_time::Metrics::new();
let inbound_handle_time = handle_time_report.inbound();
let outbound_handle_time = handle_time_report.outbound();
Expand All @@ -66,6 +68,7 @@ impl Metrics {
http_route: http_route.clone(),
http_route_actual: http_route_actual.clone(),
http_route_retry: http_route_retry.clone(),
http_errors: http_errors.inbound(),
transport: transport.clone(),
},
outbound: ProxyMetrics {
Expand All @@ -74,13 +77,15 @@ impl Metrics {
http_route,
http_route_retry,
http_route_actual,
http_errors: http_errors.outbound(),
transport,
},
control,
opencensus,
};

let report = endpoint_report
let report = (http_errors.report())
.and_then(endpoint_report)
.and_then(route_report)
.and_then(retry_report)
.and_then(actual_report)
Expand Down
14 changes: 14 additions & 0 deletions linkerd/error-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "linkerd2-error-metrics"
version = "0.1.0"
authors = ["Linkerd Developers <cncf-linkerd-dev@lists.cncf.io>"]
edition = "2018"
publish = false


[dependencies]
futures = "0.1"
indexmap = "1.0"
linkerd2-error = { path = "../error" }
linkerd2-metrics = { path = "../metrics" }
tower = "0.1"
170 changes: 170 additions & 0 deletions linkerd/error-metrics/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use futures::{Future, Poll};
use indexmap::IndexMap;
pub use linkerd2_metrics::FmtLabels;
use linkerd2_metrics::{metrics, Counter, FmtMetrics};
use std::fmt;
use std::hash::Hash;
use std::sync::{Arc, Mutex};

metrics! {
request_errors_total: Counter {
"The total number of HTTP requests that could not be processed due to a proxy error."
}
}

pub trait LabelError<E> {
type Labels: FmtLabels + Hash + Eq;

fn label_error(&self, error: &E) -> Self::Labels;
}

#[derive(Debug)]
pub struct Registry<K: Hash + Eq> {
errors: Arc<Mutex<IndexMap<K, Counter>>>,
}

pub struct RecordErrorLayer<L, K: Hash + Eq> {
label: L,
errors: Arc<Mutex<IndexMap<K, Counter>>>,
}

pub struct RecordError<L, K: Hash + Eq, S> {
label: L,
errors: Arc<Mutex<IndexMap<K, Counter>>>,
inner: S,
}

impl<K: Hash + Eq> Registry<K> {
pub fn layer<L>(&self, label: L) -> RecordErrorLayer<L, K> {
RecordErrorLayer {
label,
errors: self.errors.clone(),
}
}
}

impl<K: Hash + Eq> Default for Registry<K> {
fn default() -> Self {
Self {
errors: Default::default(),
}
}
}

impl<K: Hash + Eq> Clone for Registry<K> {
fn clone(&self) -> Self {
Self {
errors: self.errors.clone(),
}
}
}

impl<L: Clone, K: Hash + Eq, S> tower::layer::Layer<S> for RecordErrorLayer<L, K> {
type Service = RecordError<L, K, S>;

fn layer(&self, inner: S) -> Self::Service {
Self::Service {
inner,
label: self.label.clone(),
errors: self.errors.clone(),
}
}
}

impl<L: Clone, K: Hash + Eq> Clone for RecordErrorLayer<L, K> {
fn clone(&self) -> Self {
Self {
errors: self.errors.clone(),
label: self.label.clone(),
}
}
}

impl<L: Clone, K: Hash + Eq, S: Clone> Clone for RecordError<L, K, S> {
fn clone(&self) -> Self {
Self {
errors: self.errors.clone(),
label: self.label.clone(),
inner: self.inner.clone(),
}
}
}

impl<L, K: FmtLabels + Hash + Eq, S> RecordError<L, K, S> {
fn record<E>(&self, err: &E)
where
L: LabelError<E, Labels = K> + Clone,
{
let labels = self.label.label_error(&err);
if let Ok(mut errors) = self.errors.lock() {
errors
.entry(labels)
.or_insert_with(|| Default::default())
.incr();
}
}
}

impl<Req, L, S> tower::Service<Req> for RecordError<L, L::Labels, S>
where
S: tower::Service<Req>,
L: LabelError<S::Error> + Clone,
{
type Response = S::Response;
type Error = S::Error;
type Future = RecordError<L, L::Labels, S::Future>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
match self.inner.poll_ready() {
Ok(ready) => Ok(ready),
Err(err) => {
self.record(&err);
return Err(err);
}
}
}

fn call(&mut self, req: Req) -> Self::Future {
RecordError {
inner: self.inner.call(req),
errors: self.errors.clone(),
label: self.label.clone(),
}
}
}

impl<L, F> Future for RecordError<L, L::Labels, F>
where
F: Future,
L: LabelError<F::Error> + Clone,
{
type Item = F::Item;
type Error = F::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(ready) => Ok(ready),
Err(err) => {
self.record(&err);
return Err(err);
}
}
}
}

impl<K: FmtLabels + Hash + Eq> FmtMetrics for Registry<K> {
fn fmt_metrics(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let errors = match self.errors.lock() {
Ok(errors) => errors,
Err(_) => return Ok(()),
};
if errors.is_empty() {
return Ok(());
}

request_errors_total.fmt_help(f)?;
request_errors_total.fmt_scopes(f, errors.iter(), |c| &c)?;

Ok(())
}
}

0 comments on commit 6f5ac1a

Please sign in to comment.