Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[internal] grpc_util: add layer to capture network-level timing data (Cherry-pick of #15978) #15999

Merged
merged 1 commit into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/rust/engine/grpc_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ hyper = "0.14"
http = "0.2"
itertools = "0.10"
rustls-native-certs = "0.5"
lazy_static = "1"
pin-project = "1.0"
prost = "0.9"
rand = "0.8"
rustls = { version = "0.19", features = ["dangerous_configuration"] }
Expand All @@ -25,6 +27,7 @@ tower = { version = "0.4", features = ["limit"] }
tower-layer = "0.3"
tower-service = "0.3"
webpki = "0.21"
workunit_store = { path = "../workunit_store" }

[dev-dependencies]
# Pin async-trait due to https://github.com/dtolnay/async-trait/issues/144.
Expand Down
24 changes: 21 additions & 3 deletions src/rust/engine/grpc_util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,36 @@
#![allow(clippy::mutex_atomic)]

use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::iter::FromIterator;
use std::str::FromStr;
use std::sync::Arc;

use crate::headers::{SetRequestHeaders, SetRequestHeadersLayer};
use either::Either;
use http::header::{HeaderName, USER_AGENT};
use http::{HeaderMap, HeaderValue};
use itertools::Itertools;
use lazy_static::lazy_static;
use tokio_rustls::rustls::ClientConfig;
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
use tower::limit::ConcurrencyLimit;
use tower::ServiceBuilder;
use workunit_store::ObservationMetric;

use crate::headers::{SetRequestHeaders, SetRequestHeadersLayer};
use crate::metrics::{NetworkMetrics, NetworkMetricsLayer};

pub mod headers;
pub mod hyper;
pub mod metrics;
pub mod prost;
pub mod retry;
pub mod tls;

// NB: Rather than boxing our tower/tonic services, we define a type alias that fully defines the
// Service layers that we use universally. If this type becomes unwieldy, or our various Services
// diverge in which layers they use, we should instead use a Box<dyn Service<..>>.
pub type LayeredService = SetRequestHeaders<ConcurrencyLimit<Channel>>;
pub type LayeredService = SetRequestHeaders<ConcurrencyLimit<NetworkMetrics<Channel>>>;

pub fn layered_service(
channel: Channel,
Expand All @@ -59,9 +65,21 @@ pub fn layered_service(
ServiceBuilder::new()
.layer(SetRequestHeadersLayer::new(http_headers))
.concurrency_limit(concurrency_limit)
.layer(NetworkMetricsLayer::new(&METRIC_FOR_REAPI_PATH))
.service(channel)
}

lazy_static! {
static ref METRIC_FOR_REAPI_PATH: Arc<HashMap<String, ObservationMetric>> = {
let mut m = HashMap::new();
m.insert(
"/build.bazel.remote.execution.v2.ActionCache/GetActionResult".to_string(),
ObservationMetric::RemoteCacheGetActionResultNetworkTimeMicros,
);
Arc::new(m)
};
}

/// Create a Tonic `Endpoint` from a string containing a schema and IP address/name.
pub fn create_endpoint(
addr: &str,
Expand Down
177 changes: 177 additions & 0 deletions src/rust/engine/grpc_util/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;

use futures::ready;
use futures::Future;
use http::{Request, Response};
use pin_project::pin_project;
use tower_layer::Layer;
use tower_service::Service;
use workunit_store::{get_workunit_store_handle, ObservationMetric};

#[derive(Clone, Debug)]
pub struct NetworkMetricsLayer {
metric_for_path: Arc<HashMap<String, ObservationMetric>>,
}

impl<S> Layer<S> for NetworkMetricsLayer {
type Service = NetworkMetrics<S>;

fn layer(&self, inner: S) -> Self::Service {
NetworkMetrics::new(inner, self.metric_for_path.clone())
}
}

impl NetworkMetricsLayer {
pub fn new(metric_for_path: &Arc<HashMap<String, ObservationMetric>>) -> Self {
Self {
metric_for_path: Arc::clone(metric_for_path),
}
}
}

#[derive(Clone)]
pub struct NetworkMetrics<S> {
inner: S,
metric_for_path: Arc<HashMap<String, ObservationMetric>>,
}

impl<S> NetworkMetrics<S> {
pub fn new(inner: S, metric_for_path: Arc<HashMap<String, ObservationMetric>>) -> Self {
Self {
inner,
metric_for_path,
}
}
}

impl<S> fmt::Debug for NetworkMetrics<S>
where
S: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NetworkMetrics")
.field("inner", &self.inner)
.finish()
}
}

#[pin_project]
pub struct NetworkMetricsFuture<F> {
#[pin]
inner: F,
metric_data: Option<(ObservationMetric, Instant)>,
}

impl<F, B, E> Future for NetworkMetricsFuture<F>
where
F: Future<Output = Result<Response<B>, E>>,
{
type Output = Result<Response<B>, E>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let metric_data = self.metric_data;
let this = self.project();
let result = ready!(this.inner.poll(cx));
if let Some((metric, start)) = metric_data {
let workunit_store_handle = get_workunit_store_handle();
if let Some(workunit_store_handle) = workunit_store_handle {
workunit_store_handle
.store
.record_observation(metric, start.elapsed().as_micros() as u64)
}
}
Poll::Ready(result)
}
}

impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for NetworkMetrics<S>
where
S: Service<Request<ReqBody>, Response = Response<ResBody>>,
{
type Response = S::Response;
type Error = S::Error;
type Future = NetworkMetricsFuture<S::Future>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
let metric_data = self
.metric_for_path
.get(req.uri().path())
.cloned()
.map(|metric| (metric, Instant::now()));
NetworkMetricsFuture {
inner: self.inner.call(req),
metric_data,
}
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;

use hyper::{Body, Request, Response};
use tower::{ServiceBuilder, ServiceExt};
use workunit_store::{Level, ObservationMetric, WorkunitStore};

use super::NetworkMetricsLayer;

async fn handler(_: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(Response::new(Body::empty()))
}

#[tokio::test]
async fn collects_network_metrics() {
let ws = WorkunitStore::new(true, Level::Debug);
ws.init_thread_state(None);

let metric_for_path: Arc<HashMap<String, ObservationMetric>> = {
let mut m = HashMap::new();
m.insert(
"/this-is-a-metric-path".to_string(),
ObservationMetric::TestObservation,
);
Arc::new(m)
};

let svc = ServiceBuilder::new()
.layer(NetworkMetricsLayer::new(&metric_for_path))
.service_fn(handler);

let req = Request::builder()
.uri("/not-a-metric-path")
.body(Body::empty())
.unwrap();

let _ = svc.clone().oneshot(req).await.unwrap();
let observations = ws.encode_observations().unwrap();
assert_eq!(observations.len(), 0); // there should be no observations for `/not-a-metric-path`

let req = Request::builder()
.uri("/this-is-a-metric-path")
.body(Body::empty())
.unwrap();

let _ = svc.clone().oneshot(req).await.unwrap();
let observations = ws.encode_observations().unwrap();
assert_eq!(observations.len(), 1); // there should be an observation for `/this-is-a-metric-path`
assert_eq!(
observations.into_keys().collect::<Vec<_>>(),
vec!["test_observation"]
);
}
}
5 changes: 4 additions & 1 deletion src/rust/engine/workunit_store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ pub enum ObservationMetric {
/// The time saved (in milliseconds) thanks to a remote cache hit instead of running the process
/// directly.
RemoteCacheTimeSavedMs,
/// Remote cache timing (in microseconds) for GetActionResult calls.
/// Remote cache timing (in microseconds) for GetActionResult calls. Includes client-side
/// queuing due to concurrency limits.
RemoteCacheGetActionResultTimeMicros,
/// Remote cache timing (in microseconds) for GetActionResult calls (network timing only).
RemoteCacheGetActionResultNetworkTimeMicros,
}