Skip to content

Commit

Permalink
[internal] grpc_util: add layer to capture network-level timing data …
Browse files Browse the repository at this point in the history
…(Cherry-pick of pantsbuild#15978) (pantsbuild#15999)

Our REAPI client uses `concurrency_limit` to ensure only a certain maximum number of client calls run concurrently. This causes any timing metrics computed outside of that concurrency limit to include the queuing time in addition to the actual network RPC time.

This PR defines a layer below the concurrency limit that will capture just the network RPC time. Timings are captured based on the URI path which maps to a specific observation metric. Units are microseconds.
  • Loading branch information
Tom Dyas authored Jun 29, 2022
1 parent 0857d57 commit d28041a
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/rust/engine/grpc_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ hyper = "0.14"
http = "0.2"
itertools = "0.10"
rustls-native-certs = "0.5"
lazy_static = "1"
pin-project = "1.0"
prost = "0.9"
rand = "0.8"
rustls = { version = "0.19", features = ["dangerous_configuration"] }
Expand All @@ -25,6 +27,7 @@ tower = { version = "0.4", features = ["limit"] }
tower-layer = "0.3"
tower-service = "0.3"
webpki = "0.21"
workunit_store = { path = "../workunit_store" }

[dev-dependencies]
# Pin async-trait due to https://github.com/dtolnay/async-trait/issues/144.
Expand Down
24 changes: 21 additions & 3 deletions src/rust/engine/grpc_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,36 @@
#![allow(clippy::mutex_atomic)]

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::iter::FromIterator;
use std::str::FromStr;
use std::sync::Arc;

use crate::headers::{SetRequestHeaders, SetRequestHeadersLayer};
use either::Either;
use http::header::{HeaderName, USER_AGENT};
use http::{HeaderMap, HeaderValue};
use itertools::Itertools;
use lazy_static::lazy_static;
use tokio_rustls::rustls::ClientConfig;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
use tower::limit::ConcurrencyLimit;
use tower::ServiceBuilder;
use workunit_store::ObservationMetric;

use crate::headers::{SetRequestHeaders, SetRequestHeadersLayer};
use crate::metrics::{NetworkMetrics, NetworkMetricsLayer};

pub mod headers;
pub mod hyper;
pub mod metrics;
pub mod prost;
pub mod retry;
pub mod tls;

// NB: Rather than boxing our tower/tonic services, we define a type alias that fully defines the
// Service layers that we use universally. If this type becomes unwieldy, or our various Services
// diverge in which layers they use, we should instead use a Box<dyn Service<..>>.
pub type LayeredService = SetRequestHeaders<ConcurrencyLimit<Channel>>;
pub type LayeredService = SetRequestHeaders<ConcurrencyLimit<NetworkMetrics<Channel>>>;

pub fn layered_service(
channel: Channel,
Expand All @@ -59,9 +65,21 @@ pub fn layered_service(
ServiceBuilder::new()
.layer(SetRequestHeadersLayer::new(http_headers))
.concurrency_limit(concurrency_limit)
.layer(NetworkMetricsLayer::new(&METRIC_FOR_REAPI_PATH))
.service(channel)
}

lazy_static! {
static ref METRIC_FOR_REAPI_PATH: Arc<HashMap<String, ObservationMetric>> = {
let mut m = HashMap::new();
m.insert(
"/build.bazel.remote.execution.v2.ActionCache/GetActionResult".to_string(),
ObservationMetric::RemoteCacheGetActionResultNetworkTimeMicros,
);
Arc::new(m)
};
}

/// Create a Tonic `Endpoint` from a string containing a schema and IP address/name.
pub fn create_endpoint(
addr: &str,
Expand Down
177 changes: 177 additions & 0 deletions src/rust/engine/grpc_util/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

use futures::ready;
use futures::Future;
use http::{Request, Response};
use pin_project::pin_project;
use tower_layer::Layer;
use tower_service::Service;
use workunit_store::{get_workunit_store_handle, ObservationMetric};

#[derive(Clone, Debug)]
pub struct NetworkMetricsLayer {
metric_for_path: Arc<HashMap<String, ObservationMetric>>,
}

impl<S> Layer<S> for NetworkMetricsLayer {
type Service = NetworkMetrics<S>;

fn layer(&self, inner: S) -> Self::Service {
NetworkMetrics::new(inner, self.metric_for_path.clone())
}
}

impl NetworkMetricsLayer {
pub fn new(metric_for_path: &Arc<HashMap<String, ObservationMetric>>) -> Self {
Self {
metric_for_path: Arc::clone(metric_for_path),
}
}
}

#[derive(Clone)]
pub struct NetworkMetrics<S> {
inner: S,
metric_for_path: Arc<HashMap<String, ObservationMetric>>,
}

impl<S> NetworkMetrics<S> {
pub fn new(inner: S, metric_for_path: Arc<HashMap<String, ObservationMetric>>) -> Self {
Self {
inner,
metric_for_path,
}
}
}

impl<S> fmt::Debug for NetworkMetrics<S>
where
S: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NetworkMetrics")
.field("inner", &self.inner)
.finish()
}
}

#[pin_project]
pub struct NetworkMetricsFuture<F> {
#[pin]
inner: F,
metric_data: Option<(ObservationMetric, Instant)>,
}

impl<F, B, E> Future for NetworkMetricsFuture<F>
where
F: Future<Output = Result<Response<B>, E>>,
{
type Output = Result<Response<B>, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let metric_data = self.metric_data;
let this = self.project();
let result = ready!(this.inner.poll(cx));
if let Some((metric, start)) = metric_data {
let workunit_store_handle = get_workunit_store_handle();
if let Some(workunit_store_handle) = workunit_store_handle {
workunit_store_handle
.store
.record_observation(metric, start.elapsed().as_micros() as u64)
}
}
Poll::Ready(result)
}
}

impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for NetworkMetrics<S>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = NetworkMetricsFuture<S::Future>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
let metric_data = self
.metric_for_path
.get(req.uri().path())
.cloned()
.map(|metric| (metric, Instant::now()));
NetworkMetricsFuture {
inner: self.inner.call(req),
metric_data,
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;

use hyper::{Body, Request, Response};
use tower::{ServiceBuilder, ServiceExt};
use workunit_store::{Level, ObservationMetric, WorkunitStore};

use super::NetworkMetricsLayer;

async fn handler(_: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::empty()))
}

#[tokio::test]
async fn collects_network_metrics() {
let ws = WorkunitStore::new(true, Level::Debug);
ws.init_thread_state(None);

let metric_for_path: Arc<HashMap<String, ObservationMetric>> = {
let mut m = HashMap::new();
m.insert(
"/this-is-a-metric-path".to_string(),
ObservationMetric::TestObservation,
);
Arc::new(m)
};

let svc = ServiceBuilder::new()
.layer(NetworkMetricsLayer::new(&metric_for_path))
.service_fn(handler);

let req = Request::builder()
.uri("/not-a-metric-path")
.body(Body::empty())
.unwrap();

let _ = svc.clone().oneshot(req).await.unwrap();
let observations = ws.encode_observations().unwrap();
assert_eq!(observations.len(), 0); // there should be no observations for `/not-a-metric-path`

let req = Request::builder()
.uri("/this-is-a-metric-path")
.body(Body::empty())
.unwrap();

let _ = svc.clone().oneshot(req).await.unwrap();
let observations = ws.encode_observations().unwrap();
assert_eq!(observations.len(), 1); // there should be an observation for `/this-is-a-metric-path`
assert_eq!(
observations.into_keys().collect::<Vec<_>>(),
vec!["test_observation"]
);
}
}
5 changes: 4 additions & 1 deletion src/rust/engine/workunit_store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ pub enum ObservationMetric {
/// The time saved (in milliseconds) thanks to a remote cache hit instead of running the process
/// directly.
RemoteCacheTimeSavedMs,
/// Remote cache timing (in microseconds) for GetActionResult calls.
/// Remote cache timing (in microseconds) for GetActionResult calls. Includes client-side
/// queuing due to concurrency limits.
RemoteCacheGetActionResultTimeMicros,
/// Remote cache timing (in microseconds) for GetActionResult calls (network timing only).
RemoteCacheGetActionResultNetworkTimeMicros,
}

0 comments on commit d28041a

Please sign in to comment.