diff --git a/doc/proxy-metrics.md b/doc/proxy-metrics.md index 6992145085cdf..8cabfb9493042 100644 --- a/doc/proxy-metrics.md +++ b/doc/proxy-metrics.md @@ -38,6 +38,10 @@ request headers are received to when the response stream has completed. Each of these metrics has the following labels: +* `classification`: `success` if the response was successful, or `failure` if + a server error occurred. This classification is based on + the gRPC status code if one is present, and on the HTTP + status code otherwise. Only applicable to response metrics. * `direction`: `inbound` if the request originated from outside of the pod, `outbound` if the request originated from inside of the pod. * `authority`: The value of the `:authority` (HTTP/2) or `Host` (HTTP/1.1) diff --git a/proxy/src/telemetry/metrics/prometheus.rs b/proxy/src/telemetry/metrics/prometheus.rs index 6fb75b09af87f..df07f2d72eeeb 100644 --- a/proxy/src/telemetry/metrics/prometheus.rs +++ b/proxy/src/telemetry/metrics/prometheus.rs @@ -94,7 +94,13 @@ struct RequestLabels { authority: String, } -#[derive(Clone, Debug, Default, Eq, PartialEq, Hash)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] +enum Classification { + Success, + Failure, +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash)] struct ResponseLabels { request_labels: RequestLabels, @@ -105,6 +111,9 @@ struct ResponseLabels { /// The value of the grpc-status trailer. Only applicable to response /// metrics for gRPC responses. grpc_status_code: Option, + + /// Was the response a success or failure? + classification: Classification, } #[derive(Clone, Debug, Eq, PartialEq, Hash)] @@ -424,7 +433,7 @@ impl Aggregate { Event::StreamResponseFail(ref res, ref fail) => { // TODO: do we care about the failure's error code here? - let labels = Arc::new(ResponseLabels::new(res, None)); + let labels = Arc::new(ResponseLabels::fail(res)); self.update(|metrics| { *metrics.response_total(&labels).incr(); *metrics.response_duration(&labels) += fail.since_response_open; @@ -561,12 +570,28 @@ impl fmt::Display for PodOwner { // ===== impl ResponseLabels ===== impl ResponseLabels { - fn new(rsp: &ctx::http::Response,grpc_status_code: Option) -> Self { + + fn new(rsp: &ctx::http::Response, grpc_status_code: Option) -> Self { let request_labels = RequestLabels::new(&rsp.request); + let classification = Classification::classify(rsp, grpc_status_code); ResponseLabels { request_labels, status_code: rsp.status.as_u16(), grpc_status_code, + classification, + } + } + + /// Called when the response stream has failed. + fn fail(rsp: &ctx::http::Response) -> Self { + let request_labels = RequestLabels::new(&rsp.request); + ResponseLabels { + request_labels, + // TODO: is it correct to always treat this as 500? + // Alternatively, the status_code field could be made optional... + status_code: 500, + grpc_status_code: None, + classification: Classification::Failure, } } } @@ -574,8 +599,9 @@ impl ResponseLabels { impl fmt::Display for ResponseLabels { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{},status_code=\"{}\"", + write!(f, "{},{},status_code=\"{}\"", self.request_labels, + self.classification, self.status_code )?; if let Some(ref status) = self.grpc_status_code { @@ -586,3 +612,43 @@ impl fmt::Display for ResponseLabels { } } + +// ===== impl Classification ===== + +impl Classification { + + fn grpc_status(code: u32) -> Self { + if code == 0 { + // XXX: are gRPC status codes indicating client side errors + // "successes" or "failures? + Classification::Success + } else { + Classification::Failure + } + } + + fn http_status(status: &http::StatusCode) -> Self { + if status.is_server_error() { + Classification::Failure + } else { + Classification::Success + } + } + + fn classify(rsp: &ctx::http::Response, grpc_status: Option) -> Self { + grpc_status.map(Classification::grpc_status) + .unwrap_or_else(|| Classification::http_status(&rsp.status)) + } + +} + +impl fmt::Display for Classification { + + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + &Classification::Success => f.pad("classification=\"success\""), + &Classification::Failure => f.pad("classification=\"failure\""), + } + } + +} diff --git a/proxy/tests/telemetry.rs b/proxy/tests/telemetry.rs index 2b846f4ddedec..da4e4f6eaf97a 100644 --- a/proxy/tests/telemetry.rs +++ b/proxy/tests/telemetry.rs @@ -330,6 +330,128 @@ fn metrics_endpoint_outbound_request_count() { } +mod response_classification { + use super::support::*; + + const REQ_STATUS_HEADER: &'static str = "x-test-status-requested"; + const REQ_GRPC_STATUS_HEADER: &'static str = "x-test-grpc-status-requested"; + + const STATUSES: [http::StatusCode; 6] = [ + http::StatusCode::OK, + http::StatusCode::NOT_MODIFIED, + http::StatusCode::BAD_REQUEST, + http::StatusCode::IM_A_TEAPOT, + http::StatusCode::GATEWAY_TIMEOUT, + http::StatusCode::INTERNAL_SERVER_ERROR, + ]; + + + fn expected_metric(status: &http::StatusCode, direction: &str) -> String { + format!( + "response_total{{authority=\"tele.test.svc.cluster.local\",direction=\"{}\",classification=\"{}\",status_code=\"{}\"}} 1", + direction, + if status.is_server_error() { "failure" } else { "success" }, + status.as_u16(), + ) + } + + fn make_test_server() -> server::Server { + fn parse_header(headers: &http::HeaderMap, which: &str) + -> Option + { + headers.get(which) + .map(|val| { + val.to_str() + .expect("requested status should be ascii") + .parse::() + .expect("requested status should be numbers") + }) + } + info!("running test server"); + server::new() + .route_fn("/", move |req| { + let headers = req.headers(); + let status = parse_header(headers, REQ_STATUS_HEADER) + .unwrap_or(http::StatusCode::OK); + let grpc_status = parse_header(headers, REQ_GRPC_STATUS_HEADER); + let mut rsp = if let Some(_grpc_status) = grpc_status { + // TODO: tests for grpc statuses + unimplemented!() + } else { + Response::new("".into()) + }; + *rsp.status_mut() = status; + rsp + }) + } + + // https://github.com/runconduit/conduit/issues/613 + #[test] + #[cfg_attr(not(feature = "flaky_tests"), ignore)] + fn inbound_http() { + let _ = env_logger::try_init(); + let srv = make_test_server().run(); + let ctrl = controller::new(); + let proxy = proxy::new() + .controller(ctrl.run()) + .inbound(srv) + .metrics_flush_interval(Duration::from_millis(500)) + .run(); + let client = client::new(proxy.inbound, "tele.test.svc.cluster.local"); + let metrics = client::http1(proxy.metrics, "localhost"); + + for (i, status) in STATUSES.iter().enumerate() { + let request = client.request( + client.request_builder("/") + .header(REQ_STATUS_HEADER, status.as_str()) + .method("GET") + ); + assert_eq!(&request.status(), status); + + let scrape = metrics.get("/metrics"); + for status in &STATUSES[0..i] { + // assert that the current status code is incremented, *and* that + // all previous requests are *not* incremented. + assert_contains!(scrape, &expected_metric(status, "inbound")) + } + } + } + + // https://github.com/runconduit/conduit/issues/613 + #[test] + #[cfg_attr(not(feature = "flaky_tests"), ignore)] + fn outbound_http() { + let _ = env_logger::try_init(); + let srv = make_test_server().run(); + let ctrl = controller::new() + .destination("tele.test.svc.cluster.local", srv.addr) + .run(); + let proxy = proxy::new() + .controller(ctrl) + .outbound(srv) + .metrics_flush_interval(Duration::from_millis(500)) + .run(); + let client = client::new(proxy.outbound, "tele.test.svc.cluster.local"); + let metrics = client::http1(proxy.metrics, "localhost"); + + for (i, status) in STATUSES.iter().enumerate() { + let request = client.request( + client.request_builder("/") + .header(REQ_STATUS_HEADER, status.as_str()) + .method("GET") + ); + assert_eq!(&request.status(), status); + + let scrape = metrics.get("/metrics"); + for status in &STATUSES[0..i] { + // assert that the current status code is incremented, *and* that + // all previous requests are *not* incremented. + assert_contains!(scrape, &expected_metric(status, "outbound")) + } + } + } +} + // Ignore this test on CI, because our method of adding latency to requests // (calling `thread::sleep`) is likely to be flakey on Travis. // Eventually, we can add some kind of mock timer system for simulating latency @@ -361,10 +483,10 @@ fn metrics_endpoint_inbound_response_latency() { // assert the >=1000ms bucket is incremented by our request with 500ms // extra latency. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"1000\"} 1"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 1"); // the histogram's count should be 1. assert_contains!(scrape, - "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 1"); + "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 1"); // TODO: we're not going to make any assertions about the // response_latency_ms_sum stat, since its granularity depends on the actual // observed latencies, which may vary a bit. we could make more reliable @@ -378,15 +500,15 @@ fn metrics_endpoint_inbound_response_latency() { // request with 40ms extra latency should fall into the 50ms bucket. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"50\"} 1"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 1"); // 1000ms bucket should be incremented as well, since it counts *all* // bservations less than or equal to 1000ms, even if they also increment // other buckets. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"1000\"} 2"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 2"); // the histogram's total count should be 2. assert_contains!(scrape, - "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 2"); + "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 2"); info!("client.get(/hi)"); assert_eq!(client.get("/hi"), "good morning"); @@ -394,13 +516,13 @@ fn metrics_endpoint_inbound_response_latency() { let scrape = metrics.get("/metrics"); // request with 40ms extra latency should fall into the 50ms bucket. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"50\"} 2"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2"); // 1000ms bucket should be incremented as well. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"1000\"} 3"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 3"); // the histogram's total count should be 3. assert_contains!(scrape, - "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 3"); + "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 3"); info!("client.get(/hey)"); assert_eq!(client.get("/hey"), "hello"); @@ -408,13 +530,13 @@ fn metrics_endpoint_inbound_response_latency() { let scrape = metrics.get("/metrics"); // 50ms bucket should be un-changed by the request with 500ms latency. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"50\"} 2"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2"); // 1000ms bucket should be incremented. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\",le=\"1000\"} 4"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 4"); // the histogram's total count should be 4. assert_contains!(scrape, - "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",status_code=\"200\"} 4"); + "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"inbound\",classification=\"success\",status_code=\"200\"} 4"); } // Ignore this test on CI, because our method of adding latency to requests @@ -450,10 +572,10 @@ fn metrics_endpoint_outbound_response_latency() { // assert the >=1000ms bucket is incremented by our request with 500ms // extra latency. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"1000\"} 1"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 1"); // the histogram's count should be 1. assert_contains!(scrape, - "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 1"); + "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 1"); // TODO: we're not going to make any assertions about the // response_latency_ms_sum stat, since its granularity depends on the actual // observed latencies, which may vary a bit. we could make more reliable @@ -467,15 +589,15 @@ fn metrics_endpoint_outbound_response_latency() { // request with 40ms extra latency should fall into the 50ms bucket. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"50\"} 1"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 1"); // 1000ms bucket should be incremented as well, since it counts *all* // bservations less than or equal to 1000ms, even if they also increment // other buckets. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"1000\"} 2"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 2"); // the histogram's total count should be 2. assert_contains!(scrape, - "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 2"); + "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 2"); info!("client.get(/hi)"); assert_eq!(client.get("/hi"), "good morning"); @@ -483,13 +605,13 @@ fn metrics_endpoint_outbound_response_latency() { let scrape = metrics.get("/metrics"); // request with 40ms extra latency should fall into the 50ms bucket. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"50\"} 2"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2"); // 1000ms bucket should be incremented as well. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"1000\"} 3"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 3"); // the histogram's total count should be 3. assert_contains!(scrape, - "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 3"); + "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 3"); info!("client.get(/hey)"); assert_eq!(client.get("/hey"), "hello"); @@ -497,13 +619,13 @@ fn metrics_endpoint_outbound_response_latency() { let scrape = metrics.get("/metrics"); // 50ms bucket should be un-changed by the request with 500ms latency. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"50\"} 2"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"50\"} 2"); // 1000ms bucket should be incremented. assert_contains!(scrape, - "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\",le=\"1000\"} 4"); + "response_latency_ms_bucket{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\",le=\"1000\"} 4"); // the histogram's total count should be 4. assert_contains!(scrape, - "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",status_code=\"200\"} 4"); + "response_latency_ms_count{authority=\"tele.test.svc.cluster.local\",direction=\"outbound\",classification=\"success\",status_code=\"200\"} 4"); } // https://github.com/runconduit/conduit/issues/613