From d28041a78e4d4df0782e020e946071a193d0b0a2 Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Wed, 29 Jun 2022 13:22:25 -0400 Subject: [PATCH] [internal] grpc_util: add layer to capture network-level timing data (Cherry-pick of #15978) (#15999) Our REAPI client uses `concurrency_limit` to ensure only a certain maximum number of client calls run concurrently. This causes any timing metrics computed outside of that concurrency limit to include the queuing time in addition to the actual network RPC time. This PR defines a layer below the concurrency limit that will capture just the network RPC time. Timings are captured based on the URI path which maps to a specific observation metric. Units are microseconds. --- src/rust/engine/Cargo.lock | 3 + src/rust/engine/grpc_util/Cargo.toml | 3 + src/rust/engine/grpc_util/src/lib.rs | 24 ++- src/rust/engine/grpc_util/src/metrics.rs | 177 ++++++++++++++++++ src/rust/engine/workunit_store/src/metrics.rs | 5 +- 5 files changed, 208 insertions(+), 4 deletions(-) create mode 100644 src/rust/engine/grpc_util/src/metrics.rs diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index b278fd437f5..ecddee5219b 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1085,7 +1085,9 @@ dependencies = [ "http", "hyper", "itertools", + "lazy_static", "parking_lot 0.11.2", + "pin-project 1.0.8", "prost", "prost-build", "prost-types", @@ -1102,6 +1104,7 @@ dependencies = [ "tower-layer", "tower-service", "webpki 0.21.4", + "workunit_store", ] [[package]] diff --git a/src/rust/engine/grpc_util/Cargo.toml b/src/rust/engine/grpc_util/Cargo.toml index 7bf08c329b0..78a2c711ca6 100644 --- a/src/rust/engine/grpc_util/Cargo.toml +++ b/src/rust/engine/grpc_util/Cargo.toml @@ -13,6 +13,8 @@ hyper = "0.14" http = "0.2" itertools = "0.10" rustls-native-certs = "0.5" +lazy_static = "1" +pin-project = "1.0" prost = "0.9" rand = "0.8" rustls = { version = "0.19", features = ["dangerous_configuration"] } @@ -25,6 +27,7 @@ tower = { version = "0.4", features = ["limit"] } tower-layer = "0.3" tower-service = "0.3" webpki = "0.21" +workunit_store = { path = "../workunit_store" } [dev-dependencies] # Pin async-trait due to https://github.com/dtolnay/async-trait/issues/144. diff --git a/src/rust/engine/grpc_util/src/lib.rs b/src/rust/engine/grpc_util/src/lib.rs index 524b29c999a..686a2792e52 100644 --- a/src/rust/engine/grpc_util/src/lib.rs +++ b/src/rust/engine/grpc_util/src/lib.rs @@ -26,22 +26,28 @@ #![allow(clippy::mutex_atomic)] use std::collections::btree_map::Entry; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::iter::FromIterator; use std::str::FromStr; +use std::sync::Arc; -use crate::headers::{SetRequestHeaders, SetRequestHeadersLayer}; use either::Either; use http::header::{HeaderName, USER_AGENT}; use http::{HeaderMap, HeaderValue}; use itertools::Itertools; +use lazy_static::lazy_static; use tokio_rustls::rustls::ClientConfig; use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; use tower::limit::ConcurrencyLimit; use tower::ServiceBuilder; +use workunit_store::ObservationMetric; + +use crate::headers::{SetRequestHeaders, SetRequestHeadersLayer}; +use crate::metrics::{NetworkMetrics, NetworkMetricsLayer}; pub mod headers; pub mod hyper; +pub mod metrics; pub mod prost; pub mod retry; pub mod tls; @@ -49,7 +55,7 @@ pub mod tls; // NB: Rather than boxing our tower/tonic services, we define a type alias that fully defines the // Service layers that we use universally. If this type becomes unwieldy, or our various Services // diverge in which layers they use, we should instead use a Box>. -pub type LayeredService = SetRequestHeaders>; +pub type LayeredService = SetRequestHeaders>>; pub fn layered_service( channel: Channel, @@ -59,9 +65,21 @@ pub fn layered_service( ServiceBuilder::new() .layer(SetRequestHeadersLayer::new(http_headers)) .concurrency_limit(concurrency_limit) + .layer(NetworkMetricsLayer::new(&METRIC_FOR_REAPI_PATH)) .service(channel) } +lazy_static! { + static ref METRIC_FOR_REAPI_PATH: Arc> = { + let mut m = HashMap::new(); + m.insert( + "/build.bazel.remote.execution.v2.ActionCache/GetActionResult".to_string(), + ObservationMetric::RemoteCacheGetActionResultNetworkTimeMicros, + ); + Arc::new(m) + }; +} + /// Create a Tonic `Endpoint` from a string containing a schema and IP address/name. pub fn create_endpoint( addr: &str, diff --git a/src/rust/engine/grpc_util/src/metrics.rs b/src/rust/engine/grpc_util/src/metrics.rs new file mode 100644 index 00000000000..a8274d8d81b --- /dev/null +++ b/src/rust/engine/grpc_util/src/metrics.rs @@ -0,0 +1,177 @@ +// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use std::collections::HashMap; +use std::fmt; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Instant; + +use futures::ready; +use futures::Future; +use http::{Request, Response}; +use pin_project::pin_project; +use tower_layer::Layer; +use tower_service::Service; +use workunit_store::{get_workunit_store_handle, ObservationMetric}; + +#[derive(Clone, Debug)] +pub struct NetworkMetricsLayer { + metric_for_path: Arc>, +} + +impl Layer for NetworkMetricsLayer { + type Service = NetworkMetrics; + + fn layer(&self, inner: S) -> Self::Service { + NetworkMetrics::new(inner, self.metric_for_path.clone()) + } +} + +impl NetworkMetricsLayer { + pub fn new(metric_for_path: &Arc>) -> Self { + Self { + metric_for_path: Arc::clone(metric_for_path), + } + } +} + +#[derive(Clone)] +pub struct NetworkMetrics { + inner: S, + metric_for_path: Arc>, +} + +impl NetworkMetrics { + pub fn new(inner: S, metric_for_path: Arc>) -> Self { + Self { + inner, + metric_for_path, + } + } +} + +impl fmt::Debug for NetworkMetrics +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NetworkMetrics") + .field("inner", &self.inner) + .finish() + } +} + +#[pin_project] +pub struct NetworkMetricsFuture { + #[pin] + inner: F, + metric_data: Option<(ObservationMetric, Instant)>, +} + +impl Future for NetworkMetricsFuture +where + F: Future, E>>, +{ + type Output = Result, E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let metric_data = self.metric_data; + let this = self.project(); + let result = ready!(this.inner.poll(cx)); + if let Some((metric, start)) = metric_data { + let workunit_store_handle = get_workunit_store_handle(); + if let Some(workunit_store_handle) = workunit_store_handle { + workunit_store_handle + .store + .record_observation(metric, start.elapsed().as_micros() as u64) + } + } + Poll::Ready(result) + } +} + +impl Service> for NetworkMetrics +where + S: Service, Response = Response>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = NetworkMetricsFuture; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + let metric_data = self + .metric_for_path + .get(req.uri().path()) + .cloned() + .map(|metric| (metric, Instant::now())); + NetworkMetricsFuture { + inner: self.inner.call(req), + metric_data, + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::convert::Infallible; + use std::sync::Arc; + + use hyper::{Body, Request, Response}; + use tower::{ServiceBuilder, ServiceExt}; + use workunit_store::{Level, ObservationMetric, WorkunitStore}; + + use super::NetworkMetricsLayer; + + async fn handler(_: Request) -> Result, Infallible> { + Ok(Response::new(Body::empty())) + } + + #[tokio::test] + async fn collects_network_metrics() { + let ws = WorkunitStore::new(true, Level::Debug); + ws.init_thread_state(None); + + let metric_for_path: Arc> = { + let mut m = HashMap::new(); + m.insert( + "/this-is-a-metric-path".to_string(), + ObservationMetric::TestObservation, + ); + Arc::new(m) + }; + + let svc = ServiceBuilder::new() + .layer(NetworkMetricsLayer::new(&metric_for_path)) + .service_fn(handler); + + let req = Request::builder() + .uri("/not-a-metric-path") + .body(Body::empty()) + .unwrap(); + + let _ = svc.clone().oneshot(req).await.unwrap(); + let observations = ws.encode_observations().unwrap(); + assert_eq!(observations.len(), 0); // there should be no observations for `/not-a-metric-path` + + let req = Request::builder() + .uri("/this-is-a-metric-path") + .body(Body::empty()) + .unwrap(); + + let _ = svc.clone().oneshot(req).await.unwrap(); + let observations = ws.encode_observations().unwrap(); + assert_eq!(observations.len(), 1); // there should be an observation for `/this-is-a-metric-path` + assert_eq!( + observations.into_keys().collect::>(), + vec!["test_observation"] + ); + } +} diff --git a/src/rust/engine/workunit_store/src/metrics.rs b/src/rust/engine/workunit_store/src/metrics.rs index a2032635460..abaa10055b2 100644 --- a/src/rust/engine/workunit_store/src/metrics.rs +++ b/src/rust/engine/workunit_store/src/metrics.rs @@ -80,6 +80,9 @@ pub enum ObservationMetric { /// The time saved (in milliseconds) thanks to a remote cache hit instead of running the process /// directly. RemoteCacheTimeSavedMs, - /// Remote cache timing (in microseconds) for GetActionResult calls. + /// Remote cache timing (in microseconds) for GetActionResult calls. Includes client-side + /// queuing due to concurrency limits. RemoteCacheGetActionResultTimeMicros, + /// Remote cache timing (in microseconds) for GetActionResult calls (network timing only). + RemoteCacheGetActionResultNetworkTimeMicros, }