diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 70d3c08a529..65415033dfd 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -1080,9 +1080,11 @@ version = "0.0.1" dependencies = [ "async-trait", "bytes 1.0.1", + "either", "futures", "http", "hyper", + "itertools 0.10.1", "parking_lot", "prost", "prost-build", @@ -1095,6 +1097,8 @@ dependencies = [ "tonic", "tonic-build", "tower", + "tower-layer", + "tower-service", ] [[package]] @@ -1193,12 +1197,13 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994" +checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" dependencies = [ "bytes 1.0.1", "http", + "pin-project-lite", ] [[package]] @@ -1261,6 +1266,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "idna" version = "0.2.0" @@ -1405,6 +1422,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.6" @@ -2251,9 +2277,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e6984d2f1a23009bd270b8bb56d0926810a3d483f59c987d77969e9d8e840b2" +checksum = "de5e2533f59d08fcf364fd374ebda0692a70bd6d7e66ef97f306f45c6c5d8020" dependencies = [ "bytes 1.0.1", "prost-derive", @@ -2261,13 +2287,13 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d3ebd75ac2679c2af3a92246639f9fcc8a442ee420719cc4fe195b98dd5fa3" +checksum = "355f634b43cdd80724ee7848f95770e7e70eefa6dcf14fea676216573b8fd603" dependencies = [ "bytes 1.0.1", "heck", - "itertools 0.9.0", + "itertools 0.10.1", "log 0.4.11", "multimap", "petgraph", @@ -2279,12 +2305,12 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4" +checksum = "600d2f334aa05acb02a755e217ef1ab6dea4d51b58b7846588b747edec04efba" dependencies = [ "anyhow", - "itertools 0.9.0", + "itertools 0.10.1", "proc-macro2 1.0.24", "quote 1.0.8", "syn 1.0.67", @@ -2292,9 +2318,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b518d7cdd93dab1d1122cf07fa9a60771836c668dde9d9e2a139f957f0d9f1bb" +checksum = "603bbd6394701d13f3f25aada59c7de9d35a6a5887cfc156181234a44002771b" dependencies = [ "bytes 1.0.1", "prost", @@ -3047,6 +3073,8 @@ dependencies = [ "glob", "grpc_util", "hashing", + "http", + "http-body", "indexmap", "itertools 0.7.11", "lmdb", @@ -3068,6 +3096,7 @@ dependencies = [ "tokio", "tokio-rustls", "tonic", + "tower-service", "tryfuture", "uuid", "walkdir 2.3.1", @@ -3355,6 +3384,16 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "tokio-io-timeout" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90c49f106be240de154571dd31fbe48acb10ba6c6dd6f6517ad603abffa42de9" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.1.0" @@ -3413,9 +3452,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.4.3" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ac42cd97ac6bd2339af5bcabf105540e21e45636ec6fa6aae5e85d44db31be0" +checksum = "b584f064fdfc50017ec39162d5aebce49912f1eb16fd128e04b7f4ce4907c7e5" dependencies = [ "async-stream", "async-trait", @@ -3427,6 +3466,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-timeout", "percent-encoding", "pin-project 1.0.2", "prost", @@ -3437,6 +3477,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tower", + "tower-layer", "tower-service", "tracing", "tracing-futures", @@ -3444,9 +3485,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.4.2" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c695de27302f4697191dda1c7178131a8cb805463dda02864acb80fe1322fdcf" +checksum = "d12faebbe071b06f486be82cc9318350814fdd07fcb28f3690840cd770599283" dependencies = [ "proc-macro2 1.0.24", "prost-build", diff --git a/src/rust/engine/bazel_protos/Cargo.toml b/src/rust/engine/bazel_protos/Cargo.toml index 37f21dfd73e..60064cad5d1 100644 --- a/src/rust/engine/bazel_protos/Cargo.toml +++ b/src/rust/engine/bazel_protos/Cargo.toml @@ -8,15 +8,15 @@ publish = false [dependencies] bytes = "1.0" hashing = { path = "../hashing" } -prost = "0.7" -prost-build = "0.7" -prost-types = "0.7" -tonic = { version = "0.4", features = ["transport", "codegen", "tls", "tls-roots"] } +prost = "0.8" +prost-build = "0.8" +prost-types = "0.8" +tonic = { version = "0.5", features = ["transport", "codegen", "tls", "tls-roots"] } [build-dependencies] copy_dir = "0.1.2" dir-diff = "0.3.1" tempfile = "3" -prost-build = "0.7" -tonic-build = { version = "0.4", features = ["prost"] } +prost-build = "0.8" +tonic-build = { version = "0.5.1", features = ["prost"] } walkdir = "2" diff --git a/src/rust/engine/concrete_time/Cargo.toml b/src/rust/engine/concrete_time/Cargo.toml index 7be81a91888..e25e2e84d8d 100644 --- a/src/rust/engine/concrete_time/Cargo.toml +++ b/src/rust/engine/concrete_time/Cargo.toml @@ -6,8 +6,8 @@ name = "concrete_time" publish = false [dependencies] -prost = "0.7" -prost-types = "0.7" +prost = "0.8" +prost-types = "0.8" serde_derive = "1.0.98" serde = "1.0.98" log = "0.4" diff --git a/src/rust/engine/fs/fs_util/Cargo.toml b/src/rust/engine/fs/fs_util/Cargo.toml index 9508b95f9be..a087647709f 100644 --- a/src/rust/engine/fs/fs_util/Cargo.toml +++ b/src/rust/engine/fs/fs_util/Cargo.toml @@ -15,7 +15,7 @@ fs = { path = ".." } futures = "0.3" hashing = { path = "../../hashing" } parking_lot = "0.11" -prost = "0.7" +prost = "0.8" rand = "0.8" serde = "1.0" serde_json = "1.0" diff --git a/src/rust/engine/fs/store/Cargo.toml b/src/rust/engine/fs/store/Cargo.toml index 09f7925a02d..f3c63d937ae 100644 --- a/src/rust/engine/fs/store/Cargo.toml +++ b/src/rust/engine/fs/store/Cargo.toml @@ -15,6 +15,8 @@ fs = { path = ".." } futures = "0.3" glob = "0.2.11" hashing = { path = "../../hashing" } +http = "0.2" +http-body = "0.4" indexmap = "1.4" itertools = "0.7.2" lmdb = { git = "https://github.com/pantsbuild/lmdb-rs.git", rev = "06bdfbfc6348f6804127176e561843f214fc17f8" } @@ -22,15 +24,16 @@ log = "0.4" madvise = "0.1" memmap = "0.7" parking_lot = "0.11" -prost = "0.7" -prost-types = "0.7" +prost = "0.8" +prost-types = "0.8" serde = "1.0" serde_derive = "1.0" sharded_lmdb = { path = "../../sharded_lmdb" } task_executor = { path = "../../task_executor" } tempfile = "3" tokio-rustls = "0.22" -tonic = { version = "0.4", features = ["transport", "codegen", "tls", "tls-roots", "prost"] } +tonic = { version = "0.5", features = ["transport", "codegen", "tls", "tls-roots", "prost"] } +tower-service = "0.3" tryfuture = { path = "../../tryfuture" } uuid = { version = "0.7.1", features = ["v4"] } workunit_store = {path = "../../workunit_store" } diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 0c9dc990327..22dc8489db6 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -12,11 +12,11 @@ use bytes::{Bytes, BytesMut}; use futures::Future; use futures::StreamExt; use grpc_util::retry::{retry_call, status_is_retryable}; -use grpc_util::{headers_to_interceptor_fn, layered_service, status_to_str, LayeredService}; +use grpc_util::{headers_to_http_header_map, layered_service, status_to_str, LayeredService}; use hashing::Digest; use log::Level; use remexec::content_addressable_storage_client::ContentAddressableStorageClient; -use tonic::{Code, Interceptor, Request, Status}; +use tonic::{Code, Request, Status}; use workunit_store::{in_workunit, ObservationMetric, WorkunitMetadata}; #[derive(Clone)] @@ -25,7 +25,6 @@ pub struct ByteStore { chunk_size_bytes: usize, upload_timeout: Duration, rpc_attempts: usize, - interceptor: Option, byte_stream_client: Arc>, cas_client: Arc>, } @@ -78,34 +77,22 @@ impl ByteStore { let endpoint = grpc_util::create_endpoint(&cas_address, tls_client_config.as_ref(), &mut headers)?; + let http_headers = headers_to_http_header_map(&headers)?; let channel = layered_service( tonic::transport::Channel::balance_list(vec![endpoint].into_iter()), rpc_concurrency_limit, + http_headers, ); - let interceptor = if headers.is_empty() { - None - } else { - Some(Interceptor::new(headers_to_interceptor_fn(&headers)?)) - }; - let byte_stream_client = Arc::new(match interceptor.as_ref() { - Some(interceptor) => ByteStreamClient::with_interceptor(channel.clone(), interceptor.clone()), - None => ByteStreamClient::new(channel.clone()), - }); + let byte_stream_client = Arc::new(ByteStreamClient::new(channel.clone())); - let cas_client = Arc::new(match interceptor.as_ref() { - Some(interceptor) => { - ContentAddressableStorageClient::with_interceptor(channel, interceptor.clone()) - } - None => ContentAddressableStorageClient::new(channel), - }); + let cas_client = Arc::new(ContentAddressableStorageClient::new(channel)); Ok(ByteStore { instance_name, chunk_size_bytes, upload_timeout, rpc_attempts: rpc_retries + 1, - interceptor, byte_stream_client, cas_client, }) diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index aff6f76e276..6c4dc9cad4d 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -235,7 +235,7 @@ async fn write_connection_error() { .await .expect_err("Want error"); assert!( - error.contains("dns error: failed to lookup address information"), + error.contains("Unknown: \"transport error\""), "Bad error message, got: {}", error ); diff --git a/src/rust/engine/grpc_util/Cargo.toml b/src/rust/engine/grpc_util/Cargo.toml index ebc84540757..65d8dfb9d40 100644 --- a/src/rust/engine/grpc_util/Cargo.toml +++ b/src/rust/engine/grpc_util/Cargo.toml @@ -7,23 +7,27 @@ publish = false [dependencies] bytes = "1.0" +either = "1" futures = "0.3" hyper = "0.14" http = "0.2" +itertools = "0.10" rustls-native-certs = "0.5" -prost = "0.7" +prost = "0.8" rand = "0.8" tokio = { version = "1.4", features = ["net", "process", "rt-multi-thread", "sync", "time"] } tokio-rustls = "0.22" tokio-util = { version = "0.6", features = ["codec"] } -tonic = { version = "0.4", features = ["transport", "codegen", "tls", "tls-roots", "prost"] } +tonic = { version = "0.5", features = ["transport", "codegen", "tls", "tls-roots", "prost"] } tower = { version = "0.4", features = ["limit"] } +tower-layer = "0.3" +tower-service = "0.3" [dev-dependencies] async-trait = "0.1" parking_lot = "0.11" -prost-types = "0.7" +prost-types = "0.8" [build-dependencies] -prost-build = "0.7" -tonic-build = "0.4" +prost-build = "0.8" +tonic-build = "0.5.1" diff --git a/src/rust/engine/grpc_util/src/headers.rs b/src/rust/engine/grpc_util/src/headers.rs new file mode 100644 index 00000000000..7e8af38d003 --- /dev/null +++ b/src/rust/engine/grpc_util/src/headers.rs @@ -0,0 +1,81 @@ +// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md). +// Licensed under the Apache License, Version 2.0 (see LICENSE). + +use std::fmt; +use std::task::{Context, Poll}; + +use http::header::HeaderMap; +use http::Request; +use tower_layer::Layer; +use tower_service::Service; + +#[derive(Debug)] +pub struct SetRequestHeadersLayer { + headers: HeaderMap, +} + +impl SetRequestHeadersLayer { + pub fn new(headers: HeaderMap) -> Self { + SetRequestHeadersLayer { headers } + } +} + +impl Layer for SetRequestHeadersLayer { + type Service = SetRequestHeaders; + + fn layer(&self, inner: S) -> Self::Service { + SetRequestHeaders { + inner, + headers: self.headers.clone(), + } + } +} + +#[derive(Clone)] +pub struct SetRequestHeaders { + inner: S, + headers: HeaderMap, +} + +impl SetRequestHeaders { + pub fn new(inner: S, headers: HeaderMap) -> Self { + SetRequestHeaders { inner, headers } + } +} + +impl fmt::Debug for SetRequestHeaders +where + S: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SetRequestHeaders") + .field("inner", &self.inner) + .field("headers", &self.headers) + .finish() + } +} + +impl Service> for SetRequestHeaders +where + S: Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + #[inline] + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: Request) -> Self::Future { + if !self.headers.is_empty() { + let headers = req.headers_mut(); + for (header_name, header_value) in &self.headers { + headers.insert(header_name, header_value.clone()); + } + } + + self.inner.call(req) + } +} diff --git a/src/rust/engine/grpc_util/src/lib.rs b/src/rust/engine/grpc_util/src/lib.rs index 4cf64640c06..c96605f1f10 100644 --- a/src/rust/engine/grpc_util/src/lib.rs +++ b/src/rust/engine/grpc_util/src/lib.rs @@ -30,15 +30,20 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::convert::TryFrom; +use std::iter::FromIterator; use std::str::FromStr; -use http::header::USER_AGENT; +use crate::headers::{SetRequestHeaders, SetRequestHeadersLayer}; +use either::Either; +use http::header::{HeaderName, USER_AGENT}; +use http::{HeaderMap, HeaderValue}; +use itertools::Itertools; use tokio_rustls::rustls::ClientConfig; -use tonic::metadata::{AsciiMetadataKey, AsciiMetadataValue, KeyAndValueRef, MetadataMap}; use tonic::transport::{Channel, ClientTlsConfig, Endpoint}; use tower::limit::ConcurrencyLimit; use tower::ServiceBuilder; +pub mod headers; pub mod hyper; pub mod prost; pub mod retry; @@ -46,10 +51,15 @@ pub mod retry; // NB: Rather than boxing our tower/tonic services, we define a type alias that fully defines the // Service layers that we use universally. If this type becomes unwieldy, or our various Services // diverge in which layers they use, we should instead use a Box>. -pub type LayeredService = ConcurrencyLimit; +pub type LayeredService = SetRequestHeaders>; -pub fn layered_service(channel: Channel, concurrency_limit: usize) -> LayeredService { +pub fn layered_service( + channel: Channel, + concurrency_limit: usize, + http_headers: HeaderMap, +) -> LayeredService { ServiceBuilder::new() + .layer(SetRequestHeadersLayer::new(http_headers)) .concurrency_limit(concurrency_limit) .service(channel) } @@ -124,47 +134,32 @@ pub fn create_tls_config(root_ca_certs: Option>) -> Result) -> Result { - let mut metadata_map = MetadataMap::with_capacity(headers.len()); - for (key, value) in headers { - let key_ascii = AsciiMetadataKey::from_str(key.as_str()).map_err(|_| { - format!( - "Header key `{}` must be an ASCII value (as required by gRPC).", - key - ) - })?; - let value_ascii = AsciiMetadataValue::from_str(value.as_str()).map_err(|_| { - format!( - "Header value `{}` for key `{}` must be an ASCII value (as required by gRPC).", - value, key - ) - })?; - metadata_map.insert(key_ascii, value_ascii); +pub fn headers_to_http_header_map(headers: &BTreeMap) -> Result { + let http_headers = headers + .iter() + .map(|(key, value)| { + let header_name = HeaderName::from_str(&key) + .map_err(|err| format!("Invalid header name {}: {}", key, err))?; + + let header_value = HeaderValue::from_str(&value) + .map_err(|err| format!("Invalid header value {}: {}", value, err))?; + + Ok((header_name, header_value)) + }) + .collect::>>(); + + let (http_headers, errors): (Vec<(HeaderName, HeaderValue)>, Vec) = http_headers + .into_iter() + .partition_map(|result| match result { + Ok(v) => Either::Left(v), + Err(err) => Either::Right(err), + }); + + if !errors.is_empty() { + return Err(format!("header conversion errors: {}", errors.join("; "))); } - Ok(metadata_map) -} -pub fn headers_to_interceptor_fn( - headers: &BTreeMap, -) -> Result< - impl Fn(tonic::Request<()>) -> Result, tonic::Status> + Send + Sync + 'static, - String, -> { - let metadata_map = headers_to_metadata_map(headers)?; - Ok(move |mut req: tonic::Request<()>| { - let req_metadata = req.metadata_mut(); - for kv_ref in metadata_map.iter() { - match kv_ref { - KeyAndValueRef::Ascii(key, value) => { - req_metadata.insert(key, value.clone()); - } - KeyAndValueRef::Binary(key, value) => { - req_metadata.insert_bin(key, value.clone()); - } - } - } - Ok(req) - }) + Ok(HeaderMap::from_iter(http_headers)) } pub fn status_to_str(status: tonic::Status) -> String { diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index 8cf1501dfe5..2c9861b18ec 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -40,11 +40,11 @@ serde = "1.0.104" bincode = "1.2.1" double-checked-cell-async = "2.0" rand = "0.8" -prost = "0.7" -prost-types = "0.7" +prost = "0.8" +prost-types = "0.8" strum = "0.20" strum_macros = "0.20" -tonic = { version = "0.4", features = ["transport", "codegen", "tls", "tls-roots", "prost"] } +tonic = { version = "0.5", features = ["transport", "codegen", "tls", "tls-roots", "prost"] } tryfuture = { path = "../tryfuture" } [dev-dependencies] diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 68a93c77e7d..5c0f5b9d53b 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -19,8 +19,10 @@ use fs::{self, File, PathStat}; use futures::future::{self, BoxFuture, TryFutureExt}; use futures::FutureExt; use futures::{Stream, StreamExt}; +use grpc_util::headers_to_http_header_map; use grpc_util::prost::MessageExt; -use grpc_util::{headers_to_interceptor_fn, layered_service, status_to_str, LayeredService}; +use grpc_util::retry::{retry_call, status_is_retryable}; +use grpc_util::{layered_service, status_to_str, LayeredService}; use hashing::{Digest, Fingerprint}; use log::{debug, trace, warn, Level}; use prost::Message; @@ -32,7 +34,7 @@ use remexec::{ }; use store::{Snapshot, SnapshotOps, Store, StoreFileByDigest}; use tonic::metadata::BinaryMetadataValue; -use tonic::{Code, Interceptor, Request, Status}; +use tonic::{Code, Request, Status}; use tryfuture::try_future; use uuid::Uuid; use workunit_store::{ @@ -43,7 +45,6 @@ use crate::{ Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, Platform, Process, ProcessCacheScope, ProcessMetadata, ProcessResultMetadata, }; -use grpc_util::retry::{retry_call, status_is_retryable}; // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an Process, and may be populated only by the @@ -119,7 +120,7 @@ impl CommandRunner { store_address: &str, metadata: ProcessMetadata, root_ca_certs: Option>, - mut headers: BTreeMap, + headers: BTreeMap, store: Store, platform: Platform, overall_deadline: Duration, @@ -136,49 +137,36 @@ impl CommandRunner { None }; - let interceptor = if headers.is_empty() { - None - } else { - Some(Interceptor::new(headers_to_interceptor_fn(&headers)?)) - }; - + let mut execution_headers = headers.clone(); let execution_endpoint = grpc_util::create_endpoint( &execution_address, tls_client_config.as_ref().filter(|_| execution_use_tls), - &mut headers, + &mut execution_headers, )?; + let execution_http_headers = headers_to_http_header_map(&execution_headers)?; let execution_channel = layered_service( tonic::transport::Channel::balance_list(vec![execution_endpoint].into_iter()), execution_concurrency_limit, + execution_http_headers, ); - let execution_client = Arc::new(match interceptor.as_ref() { - Some(interceptor) => { - ExecutionClient::with_interceptor(execution_channel.clone(), interceptor.clone()) - } - None => ExecutionClient::new(execution_channel.clone()), - }); + let execution_client = Arc::new(ExecutionClient::new(execution_channel.clone())); + let mut store_headers = headers.clone(); let store_endpoint = grpc_util::create_endpoint( &store_address, tls_client_config.as_ref().filter(|_| execution_use_tls), - &mut headers, + &mut store_headers, )?; + let store_http_headers = headers_to_http_header_map(&store_headers)?; let store_channel = layered_service( tonic::transport::Channel::balance_list(vec![store_endpoint].into_iter()), cache_concurrency_limit, + store_http_headers, ); - let action_cache_client = Arc::new(match interceptor.as_ref() { - Some(interceptor) => ActionCacheClient::with_interceptor(store_channel, interceptor.clone()), - None => ActionCacheClient::new(store_channel), - }); + let action_cache_client = Arc::new(ActionCacheClient::new(store_channel)); - let capabilities_client = Arc::new(match interceptor.as_ref() { - Some(interceptor) => { - CapabilitiesClient::with_interceptor(execution_channel, interceptor.clone()) - } - None => CapabilitiesClient::new(execution_channel), - }); + let capabilities_client = Arc::new(CapabilitiesClient::new(execution_channel)); let command_runner = CommandRunner { metadata, diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 0395361daea..304b645149b 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -11,7 +11,7 @@ use fs::RelativePath; use futures::future::BoxFuture; use futures::FutureExt; use grpc_util::{ - headers_to_interceptor_fn, layered_service, retry::retry_call, status_to_str, LayeredService, + headers_to_http_header_map, layered_service, retry::retry_call, status_to_str, LayeredService, }; use hashing::Digest; use parking_lot::Mutex; @@ -81,15 +81,13 @@ impl CommandRunner { tls_client_config.as_ref(), &mut headers, )?; + let http_headers = headers_to_http_header_map(&headers)?; let channel = layered_service( tonic::transport::Channel::balance_list(vec![endpoint].into_iter()), concurrency_limit, + http_headers, ); - let action_cache_client = Arc::new(if headers.is_empty() { - ActionCacheClient::new(channel) - } else { - ActionCacheClient::with_interceptor(channel, headers_to_interceptor_fn(&headers)?) - }); + let action_cache_client = Arc::new(ActionCacheClient::new(channel)); Ok(CommandRunner { underlying, diff --git a/src/rust/engine/process_executor/Cargo.toml b/src/rust/engine/process_executor/Cargo.toml index 6c82368e377..452fc4f6cc0 100644 --- a/src/rust/engine/process_executor/Cargo.toml +++ b/src/rust/engine/process_executor/Cargo.toml @@ -15,7 +15,7 @@ futures = "0.3" hashing = { path = "../hashing" } log = "0.4" process_execution = { path = "../process_execution" } -prost = "0.7" +prost = "0.8" shlex = "0.1.1" store = { path = "../fs/store" } structopt = "0.3.20" diff --git a/src/rust/engine/testutil/Cargo.toml b/src/rust/engine/testutil/Cargo.toml index 08d74108dfd..7d9d3b70fdc 100644 --- a/src/rust/engine/testutil/Cargo.toml +++ b/src/rust/engine/testutil/Cargo.toml @@ -12,4 +12,4 @@ bytes = "1.0" grpc_util = { path = "../grpc_util" } fs = { path = "../fs" } hashing = { path = "../hashing" } -prost = "0.7" +prost = "0.8" diff --git a/src/rust/engine/testutil/mock/Cargo.toml b/src/rust/engine/testutil/mock/Cargo.toml index 1335bfaaeef..0168f7aa86f 100644 --- a/src/rust/engine/testutil/mock/Cargo.toml +++ b/src/rust/engine/testutil/mock/Cargo.toml @@ -15,8 +15,8 @@ hashing = { path = "../../hashing" } hyper = { version = "0.14", features = ["stream", "tcp"] } log = "0.4" parking_lot = "0.11" -prost = "0.7" -prost-types = "0.7" +prost = "0.8" +prost-types = "0.8" testutil = { path = ".." } tokio = { version = "1.4", features = ["time"] } -tonic = { version = "0.4" } +tonic = { version = "0.5" } diff --git a/src/rust/engine/testutil/mock/src/execution_server.rs b/src/rust/engine/testutil/mock/src/execution_server.rs index a1d0a4fb7ba..ecd614d47f4 100644 --- a/src/rust/engine/testutil/mock/src/execution_server.rs +++ b/src/rust/engine/testutil/mock/src/execution_server.rs @@ -36,7 +36,7 @@ use tonic::{Request, Response, Status}; /// Represents an expected API call from the REv2 client. The data carried by each enum /// variant are the parameters to verify and the results to return to the client. /// -#[derive(Clone, Debug)] +#[derive(Debug)] #[allow(clippy::large_enum_variant)] // GetActionResult variant is larger than others pub enum ExpectedAPICall { Execute { @@ -64,7 +64,7 @@ pub enum ExpectedAPICall { /// client. If the duration is not None, it represents a delay before either responding or /// canceling for the operation. /// -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct MockOperation { pub op: Result, Status>, pub duration: Option, @@ -182,7 +182,7 @@ impl Drop for TestServer { .mock_execution .expected_api_calls .lock() - .clone(), + .deref(), )), MockResponder::display_all(&self.mock_responder.received_messages.deref().lock()) );