Skip to content

Commit

Permalink
Add response classification to proxy metrics (linkerd#639)
Browse files Browse the repository at this point in the history
This PR adds a `classification` label to proxy response metrics, as @olix0r described in linkerd#634 (comment). The label is either "success" or "failure", depending on the following rules:
+ **if** the response had a gRPC status code, *then*
   - gRPC status code 0 is considered a success
   - all others are considered failures
+ **else if** the response had an HTTP status code, *then*
  - status codes < 500 are considered success,
  - status codes >= 500 are considered failures
+ **else if** the response stream failed **then**
  - the response is a failure.

I've also added end-to-end tests for the classification of HTTP responses (with some work towards classifying gRPC responses as well). Additionally, I've updated `doc/proxy_metrics.md` to reflect the added `classification` label.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Mar 28, 2018
1 parent 1ed4a93 commit c688cf6
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 26 deletions.
4 changes: 4 additions & 0 deletions doc/proxy-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ request headers are received to when the response stream has completed.

Each of these metrics has the following labels:

* `classification`: `success` if the response was successful, or `failure` if
a server error occurred. This classification is based on
the gRPC status code if one is present, and on the HTTP
status code otherwise. Only applicable to response metrics.
* `direction`: `inbound` if the request originated from outside of the pod,
`outbound` if the request originated from inside of the pod.
* `authority`: The value of the `:authority` (HTTP/2) or `Host` (HTTP/1.1)
Expand Down
74 changes: 70 additions & 4 deletions proxy/src/telemetry/metrics/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ struct RequestLabels {
authority: String,
}

#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
enum Classification {
Success,
Failure,
}

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
struct ResponseLabels {

request_labels: RequestLabels,
Expand All @@ -105,6 +111,9 @@ struct ResponseLabels {
/// The value of the grpc-status trailer. Only applicable to response
/// metrics for gRPC responses.
grpc_status_code: Option<u32>,

/// Was the response a success or failure?
classification: Classification,
}

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -424,7 +433,7 @@ impl Aggregate {

Event::StreamResponseFail(ref res, ref fail) => {
// TODO: do we care about the failure's error code here?
let labels = Arc::new(ResponseLabels::new(res, None));
let labels = Arc::new(ResponseLabels::fail(res));
self.update(|metrics| {
*metrics.response_total(&labels).incr();
*metrics.response_duration(&labels) += fail.since_response_open;
Expand Down Expand Up @@ -561,21 +570,38 @@ impl fmt::Display for PodOwner {
// ===== impl ResponseLabels =====

impl ResponseLabels {
fn new(rsp: &ctx::http::Response,grpc_status_code: Option<u32>) -> Self {

fn new(rsp: &ctx::http::Response, grpc_status_code: Option<u32>) -> Self {
let request_labels = RequestLabels::new(&rsp.request);
let classification = Classification::classify(rsp, grpc_status_code);
ResponseLabels {
request_labels,
status_code: rsp.status.as_u16(),
grpc_status_code,
classification,
}
}

/// Called when the response stream has failed.
fn fail(rsp: &ctx::http::Response) -> Self {
let request_labels = RequestLabels::new(&rsp.request);
ResponseLabels {
request_labels,
// TODO: is it correct to always treat this as 500?
// Alternatively, the status_code field could be made optional...
status_code: 500,
grpc_status_code: None,
classification: Classification::Failure,
}
}
}

impl fmt::Display for ResponseLabels {

fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{},status_code=\"{}\"",
write!(f, "{},{},status_code=\"{}\"",
self.request_labels,
self.classification,
self.status_code
)?;
if let Some(ref status) = self.grpc_status_code {
Expand All @@ -586,3 +612,43 @@ impl fmt::Display for ResponseLabels {
}

}

// ===== impl Classification =====

impl Classification {

fn grpc_status(code: u32) -> Self {
if code == 0 {
// XXX: are gRPC status codes indicating client side errors
// "successes" or "failures?
Classification::Success
} else {
Classification::Failure
}
}

fn http_status(status: &http::StatusCode) -> Self {
if status.is_server_error() {
Classification::Failure
} else {
Classification::Success
}
}

fn classify(rsp: &ctx::http::Response, grpc_status: Option<u32>) -> Self {
grpc_status.map(Classification::grpc_status)
.unwrap_or_else(|| Classification::http_status(&rsp.status))
}

}

impl fmt::Display for Classification {

fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
&Classification::Success => f.pad("classification=\"success\""),
&Classification::Failure => f.pad("classification=\"failure\""),
}
}

}
166 changes: 144 additions & 22 deletions proxy/tests/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,128 @@ fn metrics_endpoint_outbound_request_count() {

}

mod response_classification {
use super::support::*;

const REQ_STATUS_HEADER: &'static str = "x-test-status-requested";
const REQ_GRPC_STATUS_HEADER: &'static str = "x-test-grpc-status-requested";

const STATUSES: [http::StatusCode; 6] = [
http::StatusCode::OK,
http::StatusCode::NOT_MODIFIED,
http::StatusCode::BAD_REQUEST,
http::StatusCode::IM_A_TEAPOT,
http::StatusCode::GATEWAY_TIMEOUT,
http::StatusCode::INTERNAL_SERVER_ERROR,
];


fn expected_metric(status: &http::StatusCode, direction: &str) -> String {
format!(
"response_total{{authority=\"tele.test.svc.cluster.local\",direction=\"{}\",classification=\"{}\",status_code=\"{}\"}} 1",
direction,
if status.is_server_error() { "failure" } else { "success" },
status.as_u16(),
)
}

fn make_test_server() -> server::Server {
fn parse_header(headers: &http::HeaderMap, which: &str)
-> Option<http::StatusCode>
{
headers.get(which)
.map(|val| {
val.to_str()
.expect("requested status should be ascii")
.parse::<http::StatusCode>()
.expect("requested status should be numbers")
})
}
info!("running test server");
server::new()
.route_fn("/", move |req| {
let headers = req.headers();
let status = parse_header(headers, REQ_STATUS_HEADER)
.unwrap_or(http::StatusCode::OK);
let grpc_status = parse_header(headers, REQ_GRPC_STATUS_HEADER);
let mut rsp = if let Some(_grpc_status) = grpc_status {
// TODO: tests for grpc statuses
unimplemented!()
} else {
Response::new("".into())
};
*rsp.status_mut() = status;
rsp
})
}

// https://github.com/runconduit/conduit/issues/613
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn inbound_http() {
let _ = env_logger::try_init();
let srv = make_test_server().run();
let ctrl = controller::new();
let proxy = proxy::new()
.controller(ctrl.run())
.inbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.inbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");

for (i, status) in STATUSES.iter().enumerate() {
let request = client.request(
client.request_builder("/")
.header(REQ_STATUS_HEADER, status.as_str())
.method("GET")
);
assert_eq!(&request.status(), status);

let scrape = metrics.get("/metrics");
for status in &STATUSES[0..i] {
// assert that the current status code is incremented, *and* that
// all previous requests are *not* incremented.
assert_contains!(scrape, &expected_metric(status, "inbound"))
}
}
}

// https://github.com/runconduit/conduit/issues/613
#[test]
#[cfg_attr(not(feature = "flaky_tests"), ignore)]
fn outbound_http() {
let _ = env_logger::try_init();
let srv = make_test_server().run();
let ctrl = controller::new()
.destination("tele.test.svc.cluster.local", srv.addr)
.run();
let proxy = proxy::new()
.controller(ctrl)
.outbound(srv)
.metrics_flush_interval(Duration::from_millis(500))
.run();
let client = client::new(proxy.outbound, "tele.test.svc.cluster.local");
let metrics = client::http1(proxy.metrics, "localhost");

for (i, status) in STATUSES.iter().enumerate() {
let request = client.request(
client.request_builder("/")
.header(REQ_STATUS_HEADER, status.as_str())
.method("GET")
);
assert_eq!(&request.status(), status);

let scrape = metrics.get("/metrics");
for status in &STATUSES[0..i] {
// assert that the current status code is incremented, *and* that
// all previous requests are *not* incremented.
assert_contains!(scrape, &expected_metric(status, "outbound"))
}
}
}
}

// Ignore this test on CI, because our method of adding latency to requests
// (calling `thread::sleep`) is likely to be flakey on Travis.
// Eventually, we can add some kind of mock timer system for simulating latency
Expand Down Expand Up @@ -361,10 +483,10 @@ fn metrics_endpoint_inbound_response_latency() {
// assert the >=1000ms bucket is incremented by our request with 500ms
// extra latency.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"1000\"} 1");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 1");
// the histogram's count should be 1.
assert_contains!(scrape,
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 1");
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 1");
// TODO: we're not going to make any assertions about the
// response_latency_ms_sum stat, since its granularity depends on the actual
// observed latencies, which may vary a bit. we could make more reliable
Expand All @@ -378,43 +500,43 @@ fn metrics_endpoint_inbound_response_latency() {

// request with 40ms extra latency should fall into the 50ms bucket.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"50\"} 1");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 1");
// 1000ms bucket should be incremented as well, since it counts *all*
// bservations less than or equal to 1000ms, even if they also increment
// other buckets.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"1000\"} 2");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 2");
// the histogram's total count should be 2.
assert_contains!(scrape,
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 2");
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 2");

info!("client.get(/hi)");
assert_eq!(client.get("/hi"), "good morning");

let scrape = metrics.get("/metrics");
// request with 40ms extra latency should fall into the 50ms bucket.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"50\"} 2");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2");
// 1000ms bucket should be incremented as well.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"1000\"} 3");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 3");
// the histogram's total count should be 3.
assert_contains!(scrape,
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 3");
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 3");

info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");

let scrape = metrics.get("/metrics");
// 50ms bucket should be un-changed by the request with 500ms latency.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"50\"} 2");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2");
// 1000ms bucket should be incremented.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"1000\"} 4");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 4");
// the histogram's total count should be 4.
assert_contains!(scrape,
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 4");
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 4");
}

// Ignore this test on CI, because our method of adding latency to requests
Expand Down Expand Up @@ -450,10 +572,10 @@ fn metrics_endpoint_outbound_response_latency() {
// assert the >=1000ms bucket is incremented by our request with 500ms
// extra latency.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"1000\"} 1");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 1");
// the histogram's count should be 1.
assert_contains!(scrape,
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 1");
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 1");
// TODO: we're not going to make any assertions about the
// response_latency_ms_sum stat, since its granularity depends on the actual
// observed latencies, which may vary a bit. we could make more reliable
Expand All @@ -467,43 +589,43 @@ fn metrics_endpoint_outbound_response_latency() {

// request with 40ms extra latency should fall into the 50ms bucket.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"50\"} 1");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 1");
// 1000ms bucket should be incremented as well, since it counts *all*
// bservations less than or equal to 1000ms, even if they also increment
// other buckets.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"1000\"} 2");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 2");
// the histogram's total count should be 2.
assert_contains!(scrape,
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 2");
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 2");

info!("client.get(/hi)");
assert_eq!(client.get("/hi"), "good morning");

let scrape = metrics.get("/metrics");
// request with 40ms extra latency should fall into the 50ms bucket.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"50\"} 2");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2");
// 1000ms bucket should be incremented as well.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"1000\"} 3");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 3");
// the histogram's total count should be 3.
assert_contains!(scrape,
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 3");
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 3");

info!("client.get(/hey)");
assert_eq!(client.get("/hey"), "hello");

let scrape = metrics.get("/metrics");
// 50ms bucket should be un-changed by the request with 500ms latency.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"50\"} 2");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2");
// 1000ms bucket should be incremented.
assert_contains!(scrape,
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"1000\"} 4");
"response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 4");
// the histogram's total count should be 4.
assert_contains!(scrape,
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 4");
"response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 4");
}

// https://github.com/runconduit/conduit/issues/613
Expand Down

0 comments on commit c688cf6

Please sign in to comment.