From f9867db17de45126e33789ae37d0a1863051980c Mon Sep 17 00:00:00 2001 From: Daniel Wagner-Hall Date: Mon, 18 Feb 2019 21:12:30 +0000 Subject: [PATCH] Revert remote execution from tower to grpcio (#7256) We're seeing weird broken connection errors with tower. We'll probably just chuck some retries in and be happy, but for now, let's get back to a more stable time. * Revert "Remove unused operation wrapper (#7194)" This reverts commit 9400024680dba6e68f4e85dc95e321ef063be42b. * Revert "Switch operation getting to tower (#7108)" This reverts commit 0375b3041a33b0f731c6d88f8f5ab782bdb2690d. * Revert "Remote execution uses tower-grpc to start executions (#7049)" This reverts commit 28683c770afb43462fcff7f03334a1a09a24e739. --- src/rust/engine/Cargo.lock | 14 +- src/rust/engine/process_execution/Cargo.toml | 16 +- .../process_execution/bazel_protos/Cargo.toml | 1 - .../process_execution/bazel_protos/build.rs | 8 +- .../bazel_protos/src/conversions.rs | 102 +- .../process_execution/bazel_protos/src/lib.rs | 1 - .../engine/process_execution/src/remote.rs | 1363 ++++++++--------- src/rust/engine/process_executor/Cargo.toml | 1 - src/rust/engine/process_executor/src/main.rs | 47 +- src/rust/engine/src/context.rs | 24 +- .../testutil/mock/src/execution_server.rs | 17 +- 11 files changed, 707 insertions(+), 887 deletions(-) diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 4c13841e6d2..2648f2b9e20 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -98,7 +98,6 @@ dependencies = [ "grpcio 0.3.0 (git+https://github.com/pantsbuild/grpc-rs.git?rev=4dfafe9355dc996d7d0702e7386a6fedcd9734c0)", "grpcio-compiler 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "hashing 0.0.1", - "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-derive 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "prost-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1280,28 +1279,18 @@ dependencies = [ "fs 0.0.1", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "futures-timer 0.1.1 (git+https://github.com/pantsbuild/futures-timer?rev=0b747e565309a58537807ab43c674d8951f9e5a0)", - "h2 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", + "grpcio 0.3.0 (git+https://github.com/pantsbuild/grpc-rs.git?rev=4dfafe9355dc996d7d0702e7386a6fedcd9734c0)", "hashing 0.0.1", - "http 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "mock 0.0.1", - "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", - "prost 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "prost-types 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "resettable 0.0.1", "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "testutil 0.0.1", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-connect 0.1.0 (git+https://github.com/pantsbuild/tokio-connect?rev=f7ad1ca437973d6e24037ac6f7d5ef1013833c0b)", "tokio-process 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", - "tower-grpc 0.1.0 (git+https://github.com/pantsbuild/tower-grpc.git?rev=ef19f2e1715f415ecb699e8f17f5845ad2b45daf)", - "tower-h2 0.1.0 (git+https://github.com/pantsbuild/tower-h2?rev=44b0efb4983b769283efd5b2a3bc3decbf7c33de)", - "tower-http 0.1.0 (git+https://github.com/pantsbuild/tower-http?rev=56049ee7f31d4f6c549f5d1d5fbbfd7937df3d00)", - "tower-util 0.1.0 (git+https://github.com/pantsbuild/tower?rev=7b61c1fc1992c1df684fd3f179644ef0ca9bfa4c)", ] [[package]] @@ -1316,7 +1305,6 @@ dependencies = [ "hashing 0.0.1", "process_execution 0.0.1", "resettable 0.0.1", - "tokio 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/src/rust/engine/process_execution/Cargo.toml b/src/rust/engine/process_execution/Cargo.toml index affa75eeae1..2bbef270ffb 100644 --- a/src/rust/engine/process_execution/Cargo.toml +++ b/src/rust/engine/process_execution/Cargo.toml @@ -13,28 +13,18 @@ bytes = "0.4.5" digest = "0.8" fs = { path = "../fs" } futures = "^0.1.16" -# TODO: Switch to a release once https://github.com/alexcrichton/futures-timer/pull/11 and https://github.com/alexcrichton/futures-timer/pull/12 merge -futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" } -h2 = "0.1.13" +grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "4dfafe9355dc996d7d0702e7386a6fedcd9734c0", default_features = false, features = ["protobuf-codec"] } hashing = { path = "../hashing" } -http = "0.1" log = "0.4" -parking_lot = "0.6" -prost = "0.4" -prost-types = "0.4" protobuf = { version = "2.0.4", features = ["with-bytes"] } resettable = { path = "../resettable" } sha2 = "0.8" tempfile = "3" +# TODO: Switch to a release once https://github.com/alexcrichton/futures-timer/pull/11 and https://github.com/alexcrichton/futures-timer/pull/12 merge +futures-timer = { git = "https://github.com/pantsbuild/futures-timer", rev = "0b747e565309a58537807ab43c674d8951f9e5a0" } time = "0.1.40" -tokio = "0.1.14" tokio-codec = "0.1" -tokio-connect = { git = "https://github.com/pantsbuild/tokio-connect.git", rev = "f7ad1ca437973d6e24037ac6f7d5ef1013833c0b" } tokio-process = "0.2.1" -tower-grpc = { git = "https://github.com/pantsbuild/tower-grpc.git", rev = "ef19f2e1715f415ecb699e8f17f5845ad2b45daf" } -tower-h2 = { git = "https://github.com/pantsbuild/tower-h2.git", rev = "44b0efb4983b769283efd5b2a3bc3decbf7c33de" } -tower-http = { git = "https://github.com/pantsbuild/tower-http.git", rev = "56049ee7f31d4f6c549f5d1d5fbbfd7937df3d00" } -tower-util = { git = "https://github.com/pantsbuild/tower.git", rev = "7b61c1fc1992c1df684fd3f179644ef0ca9bfa4c" } [dev-dependencies] mock = { path = "../testutil/mock" } diff --git a/src/rust/engine/process_execution/bazel_protos/Cargo.toml b/src/rust/engine/process_execution/bazel_protos/Cargo.toml index 6890cef290f..7cd044c7471 100644 --- a/src/rust/engine/process_execution/bazel_protos/Cargo.toml +++ b/src/rust/engine/process_execution/bazel_protos/Cargo.toml @@ -10,7 +10,6 @@ bytes = "0.4.5" futures = "^0.1.16" grpcio = { git = "https://github.com/pantsbuild/grpc-rs.git", rev = "4dfafe9355dc996d7d0702e7386a6fedcd9734c0", default_features = false, features = ["protobuf-codec"] } hashing = { path = "../../hashing" } -log = "0.4" prost = "0.4" prost-derive = "0.4" prost-types = "0.4" diff --git a/src/rust/engine/process_execution/bazel_protos/build.rs b/src/rust/engine/process_execution/bazel_protos/build.rs index 23c441961f3..0c9e136ad9a 100644 --- a/src/rust/engine/process_execution/bazel_protos/build.rs +++ b/src/rust/engine/process_execution/bazel_protos/build.rs @@ -176,11 +176,9 @@ fn generate_for_tower(thirdpartyprotobuf: &Path, out_dir: PathBuf) { .enable_server(true) .enable_client(true) .build( - &[ - PathBuf::from("build/bazel/remote/execution/v2/remote_execution.proto"), - PathBuf::from("google/rpc/code.proto"), - PathBuf::from("google/rpc/error_details.proto"), - ], + &[PathBuf::from( + "build/bazel/remote/execution/v2/remote_execution.proto", + )], &std::fs::read_dir(&thirdpartyprotobuf) .unwrap() .into_iter() diff --git a/src/rust/engine/process_execution/bazel_protos/src/conversions.rs b/src/rust/engine/process_execution/bazel_protos/src/conversions.rs index e46767fb1e6..f017612f321 100644 --- a/src/rust/engine/process_execution/bazel_protos/src/conversions.rs +++ b/src/rust/engine/process_execution/bazel_protos/src/conversions.rs @@ -1,7 +1,4 @@ -use bytes::BytesMut; use hashing; -use log::error; -use prost::Message; impl<'a> From<&'a hashing::Digest> for crate::remote_execution::Digest { fn from(d: &hashing::Digest) -> Self { @@ -21,31 +18,19 @@ impl<'a> From<&'a hashing::Digest> for crate::build::bazel::remote::execution::v } } -impl<'a> From<&'a crate::remote_execution::Digest> for Result { - fn from(d: &crate::remote_execution::Digest) -> Self { +impl<'a> From<&'a super::remote_execution::Digest> for Result { + fn from(d: &super::remote_execution::Digest) -> Self { hashing::Fingerprint::from_hex_string(d.get_hash()) .map_err(|err| format!("Bad fingerprint in Digest {:?}: {:?}", d.get_hash(), err)) .map(|fingerprint| hashing::Digest(fingerprint, d.get_size_bytes() as usize)) } } -impl<'a> From<&'a crate::build::bazel::remote::execution::v2::Digest> - for Result -{ - fn from(d: &crate::build::bazel::remote::execution::v2::Digest) -> Self { - hashing::Fingerprint::from_hex_string(&d.hash) - .map_err(|err| format!("Bad fingerprint in Digest {:?}: {:?}", d.hash, err)) - .map(|fingerprint| hashing::Digest(fingerprint, d.size_bytes as usize)) - } -} - impl From for crate::operations::Operation { fn from(op: crate::google::longrunning::Operation) -> Self { let mut dst = Self::new(); dst.set_name(op.name); - if let Some(metadata) = op.metadata { - dst.set_metadata(prost_any_to_gcprio_any(metadata)); - } + dst.set_metadata(prost_any_to_gcprio_any(op.metadata.unwrap())); dst.set_done(op.done); match op.result { Some(crate::google::longrunning::operation::Result::Response(response)) => { @@ -60,87 +45,6 @@ impl From for crate::operations::Operatio } } -// This should only be used in test contexts. It should be deleted when the mock systems use tower. -impl From - for crate::build::bazel::remote::execution::v2::ExecuteRequest -{ - fn from(req: crate::remote_execution::ExecuteRequest) -> Self { - if req.has_execution_policy() || req.has_results_cache_policy() { - panic!("Can't convert ExecuteRequest protos with execution policy or results cache policy"); - } - let digest: Result = req.get_action_digest().into(); - Self { - action_digest: Some((&digest.expect("Bad digest converting ExecuteRequest proto")).into()), - instance_name: req.instance_name, - execution_policy: None, - results_cache_policy: None, - skip_cache_lookup: req.skip_cache_lookup, - } - } -} - -// This should only be used in test contexts. It should be deleted when the mock systems use tower. -impl From - for crate::remote_execution::ExecuteRequest -{ - fn from(req: crate::build::bazel::remote::execution::v2::ExecuteRequest) -> Self { - if req.execution_policy.is_some() || req.results_cache_policy.is_some() { - panic!("Can't convert ExecuteRequest protos with execution policy or results cache policy"); - } - let digest: Result = (&req - .action_digest - .expect("Missing digest converting ExecuteRequest proto")) - .into(); - - let mut ret = Self::new(); - ret.set_action_digest((&digest.expect("Bad digest converting ExecuteRequest proto")).into()); - ret.set_instance_name(req.instance_name); - ret.set_skip_cache_lookup(req.skip_cache_lookup); - ret - } -} - -// This should only be used in test contexts. It should be deleted when the mock systems use tower. -impl Into for crate::google::rpc::Status { - fn into(self) -> grpcio::RpcStatus { - let mut buf = BytesMut::with_capacity(self.encoded_len()); - self.encode(&mut buf).unwrap(); - grpcio::RpcStatus { - status: self.code.into(), - details: None, - status_proto_bytes: Some(buf.to_vec()), - } - } -} - -// TODO: Use num_enum or similar here when TryInto is stable. -pub fn code_from_i32(i: i32) -> crate::google::rpc::Code { - use crate::google::rpc::Code::*; - match i { - 0 => Ok, - 1 => Cancelled, - 2 => Unknown, - 3 => InvalidArgument, - 4 => DeadlineExceeded, - 5 => NotFound, - 6 => AlreadyExists, - 7 => PermissionDenied, - 8 => ResourceExhausted, - 9 => FailedPrecondition, - 10 => Aborted, - 11 => OutOfRange, - 12 => Unimplemented, - 13 => Internal, - 14 => Unavailable, - 15 => DataLoss, - 16 => Unauthenticated, - _ => { - error!("Unknown grpc error code: {}, default to Unknown", i); - Unknown - } - } -} - pub fn prost_any_to_gcprio_any(any: prost_types::Any) -> protobuf::well_known_types::Any { let prost_types::Any { type_url, value } = any; let mut dst = protobuf::well_known_types::Any::new(); diff --git a/src/rust/engine/process_execution/bazel_protos/src/lib.rs b/src/rust/engine/process_execution/bazel_protos/src/lib.rs index df86e9d656a..0bfd0d1bcae 100644 --- a/src/rust/engine/process_execution/bazel_protos/src/lib.rs +++ b/src/rust/engine/process_execution/bazel_protos/src/lib.rs @@ -13,6 +13,5 @@ mod gen_for_tower; pub use crate::gen_for_tower::*; mod conversions; -pub use crate::conversions::code_from_i32; mod verification; pub use crate::verification::verify_directory_canonical; diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index b0272cc5828..bebf1ee61f0 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use std::mem::drop; use std::path::PathBuf; +use std::sync::Arc; use std::time::{Duration, Instant}; use bazel_protos; @@ -9,11 +11,10 @@ use digest::{Digest as DigestTrait, FixedOutput}; use fs::{self, File, PathStat, Store}; use futures::{future, Future, Stream}; use futures_timer::Delay; +use grpcio; use hashing::{Digest, Fingerprint}; use log::{debug, trace, warn}; -use parking_lot::Mutex; -use prost::Message; -use protobuf::{self, Message as GrpcioMessage, ProtobufEnum}; +use protobuf::{self, Message, ProtobufEnum}; use sha2::Sha256; use time; @@ -21,36 +22,26 @@ use super::{ExecuteProcessRequest, ExecutionStats, FallibleExecuteProcessResult} use std; use std::cmp::min; -use std::net::SocketAddr; -use std::net::ToSocketAddrs; -use tokio::executor::DefaultExecutor; -use tokio::net::tcp::{ConnectFuture, TcpStream}; -use tower_grpc::Request; -use tower_h2::client; -use tower_util::MakeService; - // Environment variable which is exclusively used for cache key invalidation. // This may be not specified in an ExecuteProcessRequest, and may be populated only by the // CommandRunner. const CACHE_KEY_GEN_VERSION_ENV_VAR_NAME: &str = "PANTS_CACHE_KEY_GEN_VERSION"; -type Connection = tower_http::add_origin::AddOrigin< - tower_h2::client::Connection, ->; - -struct Clients { - execution_client: - Mutex>, - operations_client: Mutex>, +#[derive(Debug)] +enum OperationOrStatus { + Operation(bazel_protos::operations::Operation), + Status(bazel_protos::status::Status), } #[derive(Clone)] -#[allow(clippy::type_complexity)] pub struct CommandRunner { cache_key_gen_version: Option, instance_name: Option, authorization_header: Option, - clients: futures::future::Shared>, + channel: grpcio::Channel, + env: Arc, + execution_client: Arc, + operations_client: Arc, store: Store, futures_timer_thread: resettable::Resettable, } @@ -81,36 +72,35 @@ impl CommandRunner { // behavior. fn oneshot_execute( &self, - execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, - ) -> impl Future { - let command_runner = self.clone(); - self - .clients - .clone() - .map_err(|err| format!("Error getting execution_client: {}", err)) - .and_then(move |clients| { - clients - .execution_client - .lock() - .execute(command_runner.make_request(execute_request)) - .map_err(towergrpcerror_to_string) - .and_then(|response_stream| { - response_stream - .into_inner() - .take(1) - .into_future() - .map_err(|err| { - format!( - "Error getting response from remote process execution {:?}", - err - ) - }) - .and_then(|(resp, stream)| { - std::mem::drop(stream); - resp.ok_or_else(|| "Didn't get response from remote process execution".to_owned()) - }) - }) + execute_request: &Arc, + ) -> BoxFuture { + let stream = try_future!(self + .execution_client + .execute_opt(&execute_request, self.call_option()) + .map_err(rpcerror_to_string)); + stream + .take(1) + .into_future() + // If there was a response, drop the _stream to disconnect so that the server doesn't keep + // the connection alive and continue sending on it. + .map(|(maybe_operation, stream)| { + drop(stream); + maybe_operation + }) + // If there was an error, drop the _stream to disconnect so that the server doesn't keep the + // connection alive and continue sending on it. + .map_err(|(error, stream)| { + drop(stream); + error + }) + .then(|maybe_operation_result| match maybe_operation_result { + Ok(Some(operation)) => Ok(OperationOrStatus::Operation(operation)), + Ok(None) => { + Err("Didn't get proper stream response from server during remote execution".to_owned()) + } + Err(err) => rpcerror_to_status_or_string(err).map(OperationOrStatus::Status), }) + .to_boxed() } } @@ -135,7 +125,7 @@ impl super::CommandRunner for CommandRunner { /// TODO: Request jdk_home be created if set. /// fn run(&self, req: ExecuteProcessRequest) -> BoxFuture { - let clients = self.clients.clone(); + let operations_client = self.operations_client.clone(); let store = self.store.clone(); let execute_request_result = @@ -154,6 +144,8 @@ impl super::CommandRunner for CommandRunner { Ok((action, command, execute_request)) => { let command_runner = self.clone(); let command_runner2 = self.clone(); + let command_runner3 = self.clone(); + let execute_request = Arc::new(execute_request); let execute_request2 = execute_request.clone(); let futures_timer_thread = self.futures_timer_thread.clone(); @@ -174,7 +166,7 @@ impl super::CommandRunner for CommandRunner { command ); command_runner - .oneshot_execute(execute_request) + .oneshot_execute(&execute_request) .join(future::ok(history)) }) .and_then(move |(operation, history)| { @@ -187,9 +179,9 @@ impl super::CommandRunner for CommandRunner { let execute_request2 = execute_request2.clone(); let store = store.clone(); - let clients = clients.clone(); + let operations_client = operations_client.clone(); let command_runner2 = command_runner2.clone(); - let command_runner3 = command_runner2.clone(); + let command_runner3 = command_runner3.clone(); let futures_timer_thread = futures_timer_thread.clone(); let f = command_runner2.extract_execute_response(operation, &mut history); f.map(future::Loop::Break).or_else(move |value| { @@ -220,7 +212,7 @@ impl super::CommandRunner for CommandRunner { let mut history = history; history.current_attempt += summary; command_runner2 - .oneshot_execute(execute_request) + .oneshot_execute(&execute_request) .join(future::ok(history)) }) // Reset `iter_num` on `MissingDigests` @@ -228,11 +220,9 @@ impl super::CommandRunner for CommandRunner { .to_boxed() } ExecutionError::NotFinished(operation_name) => { - let operation_name2 = operation_name.clone(); - let operation_request = - bazel_protos::google::longrunning::GetOperationRequest { - name: operation_name.clone(), - }; + let mut operation_request = + bazel_protos::operations::GetOperationRequest::new(); + operation_request.set_name(operation_name.clone()); let backoff_period = min( CommandRunner::BACKOFF_MAX_WAIT_MILLIS, @@ -261,23 +251,19 @@ impl super::CommandRunner for CommandRunner { ) }) .and_then(move |_| { - clients - .map_err(|err| format!("{}", err)) - .and_then(move |clients| { - clients - .operations_client - .lock() - .get_operation(command_runner3.make_request(operation_request)) - .map(|r| r.into_inner()) - .or_else(move |err| { - rpcerror_recover_cancelled(operation_name2, err) - }) - .map_err(towergrpcerror_to_string) - }) - .map(move |operation| { - future::Loop::Continue((history, operation, iter_num + 1)) - }) - .to_boxed() + future::done( + operations_client + .get_operation_opt(&operation_request, command_runner3.call_option()) + .or_else(move |err| { + rpcerror_recover_cancelled(operation_request.take_name(), err) + }) + .map(OperationOrStatus::Operation) + .map_err(rpcerror_to_string), + ) + .map(move |operation| { + future::Loop::Continue((history, operation, iter_num + 1)) + }) + .to_boxed() }) .to_boxed() } @@ -315,84 +301,57 @@ impl CommandRunner { address: &str, cache_key_gen_version: Option, instance_name: Option, + root_ca_certs: Option>, oauth_bearer_token: Option, + thread_count: usize, store: Store, futures_timer_thread: resettable::Resettable, - ) -> Result { - struct Dst(SocketAddr); - - impl tokio_connect::Connect for Dst { - type Connected = TcpStream; - type Error = ::std::io::Error; - type Future = ConnectFuture; - - fn connect(&self) -> Self::Future { - TcpStream::connect(&self.0) + ) -> CommandRunner { + let env = Arc::new(grpcio::Environment::new(thread_count)); + let channel = { + let builder = grpcio::ChannelBuilder::new(env.clone()); + if let Some(_root_ca_certs) = root_ca_certs { + panic!("Sorry, we dropped secure grpc support until we can either make openssl link properly, or switch to tower"); + /* + let creds = grpcio::ChannelCredentialsBuilder::new() + .root_cert(root_ca_certs) + .build(); + builder.secure_connect(address, creds) + */ + } else { + builder.connect(address) } - } - - // TODO: Support https - let uri: http::Uri = format!("http://{}", address) - .parse() - .map_err(|err| format!("Failed to parse remote server address URL: {}", err))?; - let socket_addr = address - .to_socket_addrs() - .map_err(|err| format!("Failed to resolve remote socket address URL: {}", err))? - .next() - .ok_or_else(|| "Remote server address resolved to no addresses".to_owned())?; - let conn = client::Connect::new( - Dst(socket_addr), - h2::client::Builder::default(), - DefaultExecutor::current(), - ) - .make_service(()) - .map_err(|err| format!("Error connecting to remote execution server: {}", err)) - .and_then(move |conn| { - tower_http::add_origin::Builder::new() - .uri(uri) - .build(conn) - .map_err(|err| { - format!( - "Failed to add origin for remote execution server: {:?}", - err - ) - }) - .map(Mutex::new) - }); - let clients = conn - .map(|conn| { - let conn = conn.lock(); - let execution_client = Mutex::new( - bazel_protos::build::bazel::remote::execution::v2::client::Execution::new(conn.clone()), - ); - let operations_client = Mutex::new( - bazel_protos::google::longrunning::client::Operations::new(conn.clone()), - ); - Clients { - execution_client, - operations_client, - } - }) - .to_boxed() - .shared(); - Ok(CommandRunner { + }; + let execution_client = Arc::new(bazel_protos::remote_execution_grpc::ExecutionClient::new( + channel.clone(), + )); + let operations_client = Arc::new(bazel_protos::operations_grpc::OperationsClient::new( + channel.clone(), + )); + + CommandRunner { cache_key_gen_version, instance_name, authorization_header: oauth_bearer_token.map(|t| format!("Bearer {}", t)), - clients, + channel, + env, + execution_client, + operations_client, store, futures_timer_thread, - }) + } } - fn make_request(&self, message: T) -> Request { - let mut request = Request::new(message); + fn call_option(&self) -> grpcio::CallOption { + let mut call_option = grpcio::CallOption::default(); if let Some(ref authorization_header) = self.authorization_header { - request - .metadata_mut() - .insert("authorization", authorization_header.parse().unwrap()); + let mut builder = grpcio::MetadataBuilder::with_capacity(1); + builder + .add_str("authorization", &authorization_header) + .unwrap(); + call_option = call_option.headers(builder.build()); } - request + call_option } fn store_proto_locally( @@ -411,113 +370,102 @@ impl CommandRunner { fn extract_execute_response( &self, - operation: bazel_protos::google::longrunning::Operation, + operation_or_status: OperationOrStatus, attempts: &mut ExecutionHistory, ) -> BoxFuture { - trace!("Got operation response: {:?}", operation); - - if !operation.done { - return future::err(ExecutionError::NotFinished(operation.name)).to_boxed(); - } - let execute_response = if let Some(result) = operation.result { - match result { - bazel_protos::google::longrunning::operation::Result::Error(ref status) => { - return future::err(ExecutionError::Fatal(format_error(status))).to_boxed(); - } - bazel_protos::google::longrunning::operation::Result::Response(ref any) => try_future!( - bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse::decode(&any.value) - .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e))) - ), - } - } else { - return future::err(ExecutionError::Fatal( - "Operation finished but no response supplied".to_string(), - )) - .to_boxed(); - }; + trace!("Got operation response: {:?}", operation_or_status); - trace!("Got (nested) execute response: {:?}", execute_response); - - if let Some(ref result) = execute_response.result { - if let Some(ref metadata) = result.execution_metadata { - let enqueued = timespec_from(&metadata.queued_timestamp); - let worker_start = timespec_from(&metadata.worker_start_timestamp); - let input_fetch_start = timespec_from(&metadata.input_fetch_start_timestamp); - let input_fetch_completed = timespec_from(&metadata.input_fetch_completed_timestamp); - let execution_start = timespec_from(&metadata.execution_start_timestamp); - let execution_completed = timespec_from(&metadata.execution_completed_timestamp); - let output_upload_start = timespec_from(&metadata.output_upload_start_timestamp); - let output_upload_completed = timespec_from(&metadata.output_upload_completed_timestamp); - - match (worker_start - enqueued).to_std() { - Ok(duration) => attempts.current_attempt.remote_queue = Some(duration), - Err(err) => warn!("Got negative remote queue time: {}", err), - } - match (input_fetch_completed - input_fetch_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration), - Err(err) => warn!("Got negative remote input fetch time: {}", err), + let status = match operation_or_status { + OperationOrStatus::Operation(mut operation) => { + if !operation.get_done() { + return future::err(ExecutionError::NotFinished(operation.take_name())).to_boxed(); } - match (execution_completed - execution_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_execution = Some(duration), - Err(err) => warn!("Got negative remote execution time: {}", err), + if operation.has_error() { + return future::err(ExecutionError::Fatal(format_error(&operation.get_error()))) + .to_boxed(); } - match (output_upload_completed - output_upload_start).to_std() { - Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration), - Err(err) => warn!("Got negative remote output store time: {}", err), + if !operation.has_response() { + return future::err(ExecutionError::Fatal( + "Operation finished but no response supplied".to_string(), + )) + .to_boxed(); } - attempts.current_attempt.was_cache_hit = execute_response.cached_result; - } - } - let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]); - execution_attempts.push(attempts.current_attempt); - - let maybe_result = execute_response.result; + let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); + try_future!(execute_response + .merge_from_bytes(operation.get_response().get_value()) + .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))); + trace!("Got (nested) execute response: {:?}", execute_response); + + if execute_response.get_result().has_execution_metadata() { + let metadata = execute_response.get_result().get_execution_metadata(); + let enqueued = timespec_from(metadata.get_queued_timestamp()); + let worker_start = timespec_from(metadata.get_worker_start_timestamp()); + let input_fetch_start = timespec_from(metadata.get_input_fetch_start_timestamp()); + let input_fetch_completed = timespec_from(metadata.get_input_fetch_completed_timestamp()); + let execution_start = timespec_from(metadata.get_execution_start_timestamp()); + let execution_completed = timespec_from(metadata.get_execution_completed_timestamp()); + let output_upload_start = timespec_from(metadata.get_output_upload_start_timestamp()); + let output_upload_completed = + timespec_from(metadata.get_output_upload_completed_timestamp()); + + match (worker_start - enqueued).to_std() { + Ok(duration) => attempts.current_attempt.remote_queue = Some(duration), + Err(err) => warn!("Got negative remote queue time: {}", err), + } + match (input_fetch_completed - input_fetch_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_input_fetch = Some(duration), + Err(err) => warn!("Got negative remote input fetch time: {}", err), + } + match (execution_completed - execution_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_execution = Some(duration), + Err(err) => warn!("Got negative remote execution time: {}", err), + } + match (output_upload_completed - output_upload_start).to_std() { + Ok(duration) => attempts.current_attempt.remote_output_store = Some(duration), + Err(err) => warn!("Got negative remote output store time: {}", err), + } + attempts.current_attempt.was_cache_hit = execute_response.cached_result; + } - let status = execute_response - .status - .unwrap_or_else(|| bazel_protos::google::rpc::Status { - code: bazel_protos::google::rpc::Code::Ok.into(), - message: String::new(), - details: vec![], - }); - if status.code == bazel_protos::google::rpc::Code::Ok.into() { - if let Some(result) = maybe_result { - return self - .extract_stdout(&result) - .join(self.extract_stderr(&result)) - .join(self.extract_output_files(&result)) - .and_then(move |((stdout, stderr), output_directory)| { - Ok(FallibleExecuteProcessResult { - stdout: stdout, - stderr: stderr, - exit_code: result.exit_code, - output_directory: output_directory, - execution_attempts: execution_attempts, + let mut execution_attempts = std::mem::replace(&mut attempts.attempts, vec![]); + execution_attempts.push(attempts.current_attempt); + + let status = execute_response.take_status(); + if grpcio::RpcStatusCode::from(status.get_code()) == grpcio::RpcStatusCode::Ok { + return self + .extract_stdout(&execute_response) + .join(self.extract_stderr(&execute_response)) + .join(self.extract_output_files(&execute_response)) + .and_then(move |((stdout, stderr), output_directory)| { + Ok(FallibleExecuteProcessResult { + stdout: stdout, + stderr: stderr, + exit_code: execute_response.get_result().get_exit_code(), + output_directory: output_directory, + execution_attempts: execution_attempts, + }) }) - }) - .to_boxed(); - } else { - return futures::future::err(ExecutionError::Fatal( - "No result found on ExecuteResponse".to_owned(), - )) - .to_boxed(); + .to_boxed(); + } + status } - } + OperationOrStatus::Status(status) => status, + }; - match bazel_protos::code_from_i32(status.code) { - bazel_protos::google::rpc::Code::Ok => unreachable!(), - bazel_protos::google::rpc::Code::FailedPrecondition => { - if status.details.len() != 1 { + match grpcio::RpcStatusCode::from(status.get_code()) { + grpcio::RpcStatusCode::Ok => unreachable!(), + grpcio::RpcStatusCode::FailedPrecondition => { + if status.get_details().len() != 1 { return future::err(ExecutionError::Fatal(format!( "Received multiple details in FailedPrecondition ExecuteResponse's status field: {:?}", - status.details + status.get_details() ))) .to_boxed(); } - let details = &status.details[0]; + let details = status.get_details().get(0).unwrap(); let mut precondition_failure = bazel_protos::error_details::PreconditionFailure::new(); - if details.type_url + if details.get_type_url() != format!( "type.googleapis.com/{}", precondition_failure.descriptor().full_name() @@ -526,12 +474,13 @@ impl CommandRunner { return future::err(ExecutionError::Fatal(format!( "Received FailedPrecondition, but didn't know how to resolve it: {},\ protobuf type {}", - status.message, details.type_url + status.get_message(), + details.get_type_url() ))) .to_boxed(); } try_future!(precondition_failure - .merge_from_bytes(&details.value) + .merge_from_bytes(details.get_value()) .map_err(|e| ExecutionError::Fatal(format!( "Error deserializing FailedPrecondition proto: {:?}", e @@ -579,7 +528,8 @@ impl CommandRunner { } code => future::err(ExecutionError::Fatal(format!( "Error from remote execution: {:?}: {:?}", - code, status.message + code, + status.get_message() ))) .to_boxed(), } @@ -588,10 +538,11 @@ impl CommandRunner { fn extract_stdout( &self, - result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + execute_response: &bazel_protos::remote_execution::ExecuteResponse, ) -> BoxFuture { - if let Some(ref stdout_digest) = result.stdout_digest { - let stdout_digest_result: Result = stdout_digest.into(); + if execute_response.get_result().has_stdout_digest() { + let stdout_digest_result: Result = + execute_response.get_result().get_stdout_digest().into(); let stdout_digest = try_future!(stdout_digest_result .map_err(|err| ExecutionError::Fatal(format!("Error extracting stdout: {}", err)))); self @@ -613,7 +564,7 @@ impl CommandRunner { }) .to_boxed() } else { - let stdout_raw = Bytes::from(result.stdout_raw.as_slice()); + let stdout_raw = Bytes::from(execute_response.get_result().get_stdout_raw()); let stdout_copy = stdout_raw.clone(); self .store @@ -628,10 +579,11 @@ impl CommandRunner { fn extract_stderr( &self, - result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + execute_response: &bazel_protos::remote_execution::ExecuteResponse, ) -> BoxFuture { - if let Some(ref stderr_digest) = result.stderr_digest { - let stderr_digest_result: Result = stderr_digest.into(); + if execute_response.get_result().has_stderr_digest() { + let stderr_digest_result: Result = + execute_response.get_result().get_stderr_digest().into(); let stderr_digest = try_future!(stderr_digest_result .map_err(|err| ExecutionError::Fatal(format!("Error extracting stderr: {}", err)))); self @@ -653,7 +605,7 @@ impl CommandRunner { }) .to_boxed() } else { - let stderr_raw = Bytes::from(result.stderr_raw.as_slice()); + let stderr_raw = Bytes::from(execute_response.get_result().get_stderr_raw()); let stderr_copy = stderr_raw.clone(); self .store @@ -668,16 +620,21 @@ impl CommandRunner { fn extract_output_files( &self, - result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + execute_response: &bazel_protos::remote_execution::ExecuteResponse, ) -> BoxFuture { // Get Digests of output Directories. // Then we'll make a Directory for the output files, and merge them. - let output_directories = result.output_directories.clone(); - let mut directory_digests = Vec::with_capacity(output_directories.len() + 1); + let mut directory_digests = + Vec::with_capacity(execute_response.get_result().get_output_directories().len() + 1); + // TODO: Maybe take rather than clone + let output_directories = execute_response + .get_result() + .get_output_directories() + .to_owned(); for dir in output_directories { - let digest_result: Result = (&dir.tree_digest.unwrap()).into(); + let digest_result: Result = dir.get_tree_digest().into(); let mut digest = future::done(digest_result).to_boxed(); - for component in dir.path.rsplit('/') { + for component in dir.get_path().rsplit('/') { let component = component.to_owned(); let store = self.store.clone(); digest = digest @@ -700,21 +657,19 @@ impl CommandRunner { // Make a directory for the files let mut path_map = HashMap::new(); - let output_files = result.output_files.clone(); - let path_stats_result: Result, String> = output_files - .into_iter() + let path_stats_result: Result, String> = execute_response + .get_result() + .get_output_files() + .iter() .map(|output_file| { - let output_file_path_buf = PathBuf::from(output_file.path); - let digest = output_file - .digest - .ok_or_else(|| "No digest on remote execution output file".to_string())?; - let digest: Result = (&digest).into(); + let output_file_path_buf = PathBuf::from(output_file.get_path()); + let digest: Result = output_file.get_digest().into(); path_map.insert(output_file_path_buf.clone(), digest?); Ok(PathStat::file( output_file_path_buf.clone(), File { path: output_file_path_buf, - is_executable: output_file.is_executable, + is_executable: output_file.get_is_executable(), }, )) }) @@ -782,7 +737,7 @@ fn make_execute_request( ( bazel_protos::remote_execution::Action, bazel_protos::remote_execution::Command, - bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, + bazel_protos::remote_execution::ExecuteRequest, ), String, > { @@ -851,43 +806,38 @@ fn make_execute_request( action.set_command_digest((&digest(&command)?).into()); action.set_input_root_digest((&req.input_files).into()); - let execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some((&digest(&action)?).into()), - skip_cache_lookup: false, - instance_name: instance_name.clone().unwrap_or_default(), - execution_policy: None, - results_cache_policy: None, - }; + let mut execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); + if let Some(instance_name) = instance_name { + execute_request.set_instance_name(instance_name.clone()); + } + execute_request.set_action_digest((&digest(&action)?).into()); Ok((action, command, execute_request)) } -fn format_error(error: &bazel_protos::google::rpc::Status) -> String { - let error_code_enum = bazel_protos::code::Code::from_i32(error.code); +fn format_error(error: &bazel_protos::status::Status) -> String { + let error_code_enum = bazel_protos::code::Code::from_i32(error.get_code()); let error_code = match error_code_enum { Some(x) => format!("{:?}", x), - None => format!("{:?}", error.code), + None => format!("{:?}", error.get_code()), }; - format!("{}: {}", error_code, error.message) + format!("{}: {}", error_code, error.get_message()) } /// /// If the given operation represents a cancelled request, recover it into /// ExecutionError::NotFinished. /// -fn rpcerror_recover_cancelled( +fn rpcerror_recover_cancelled( operation_name: String, - err: tower_grpc::Error, -) -> Result> { + err: grpcio::Error, +) -> Result { // If the error represented cancellation, return an Operation for the given Operation name. match &err { - &tower_grpc::Error::Grpc(ref status) if status.code() == tower_grpc::Code::Cancelled => { - return Ok(bazel_protos::google::longrunning::Operation { - name: operation_name, - done: false, - metadata: None, - result: None, - }); + &grpcio::Error::RpcFailure(ref rs) if rs.status == grpcio::RpcStatusCode::Cancelled => { + let mut next_operation = bazel_protos::operations::Operation::new(); + next_operation.set_name(operation_name); + return Ok(next_operation); } _ => {} } @@ -895,21 +845,41 @@ fn rpcerror_recover_cancelled( Err(err) } -fn towergrpcerror_to_string(error: tower_grpc::Error) -> String { +fn rpcerror_to_status_or_string( + error: grpcio::Error, +) -> Result { match error { - tower_grpc::Error::Grpc(status) => { - let error_message = if status.error_message() == "" { - "[no message]" - } else { - &status.error_message() - }; - format!("{:?}: {}", status.code(), error_message) + grpcio::Error::RpcFailure(grpcio::RpcStatus { + status_proto_bytes: Some(status_proto_bytes), + .. + }) => { + let mut status_proto = bazel_protos::status::Status::new(); + status_proto.merge_from_bytes(&status_proto_bytes).unwrap(); + Ok(status_proto) } - tower_grpc::Error::Inner(v) => format!("{:?}", v), + grpcio::Error::RpcFailure(grpcio::RpcStatus { + status, details, .. + }) => Err(format!( + "{:?}: {:?}", + status, + details.unwrap_or_else(|| "[no message]".to_string()) + )), + err => Err(format!("{:?}", err)), + } +} + +fn rpcerror_to_string(error: grpcio::Error) -> String { + match error { + grpcio::Error::RpcFailure(status) => format!( + "{:?}: {:?}", + status.status, + status.details.unwrap_or_else(|| "[no message]".to_string()) + ), + err => format!("{:?}", err), } } -fn digest(message: &dyn GrpcioMessage) -> Result { +fn digest(message: &dyn Message) -> Result { let bytes = message.write_to_bytes().map_err(|e| format!("{:?}", e))?; let mut hasher = Sha256::default(); @@ -921,25 +891,20 @@ fn digest(message: &dyn GrpcioMessage) -> Result { )) } -fn timespec_from(timestamp: &Option) -> time::Timespec { - if let Some(timestamp) = timestamp { - time::Timespec::new(timestamp.seconds, timestamp.nanos) - } else { - time::Timespec::new(0, 0) - } +fn timespec_from(timestamp: &protobuf::well_known_types::Timestamp) -> time::Timespec { + time::Timespec::new(timestamp.seconds, timestamp.nanos) } #[cfg(test)] mod tests { use bazel_protos; - use bytes::{Bytes, BytesMut}; + use bytes::Bytes; use fs; use futures::Future; + use grpcio; use hashing::{Digest, Fingerprint}; use mock; - use prost::Message; - use prost_types; - use protobuf::{self, ProtobufEnum}; + use protobuf::{self, Message, ProtobufEnum}; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory}; use testutil::{as_bytes, owned_string_vec}; @@ -1024,19 +989,17 @@ mod tests { ); want_action.set_input_root_digest((&input_directory.digest()).into()); - let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some( - (&Digest( - Fingerprint::from_hex_string( - "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", - ) - .unwrap(), - 140, - )) - .into(), - ), - ..Default::default() - }; + let mut want_execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); + want_execute_request.set_action_digest( + (&Digest( + Fingerprint::from_hex_string( + "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", + ) + .unwrap(), + 140, + )) + .into(), + ); assert_eq!( super::make_execute_request(&req, &None, &None), @@ -1112,21 +1075,6 @@ mod tests { .into(), ); - let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some( - (&Digest( - Fingerprint::from_hex_string( - "844c929423444f3392e0dcc89ebf1febbfdf3a2e2fcab7567cc474705a5385e4", - ) - .unwrap(), - 140, - )) - .into(), - ), - instance_name: "dark-tower".to_owned(), - ..Default::default() - }; - assert_eq!( super::make_execute_request(&req, &Some("dark-tower".to_owned()), &None), Ok((want_action, want_command, want_execute_request)) @@ -1194,19 +1142,17 @@ mod tests { ); want_action.set_input_root_digest((&input_directory.digest()).into()); - let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some( - (&Digest( - Fingerprint::from_hex_string( - "0ee5d4c8ac12513a87c8d949c6883ac533a264d30215126af71a9028c4ab6edf", - ) - .unwrap(), - 140, - )) - .into(), - ), - ..Default::default() - }; + let mut want_execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); + want_execute_request.set_action_digest( + (&Digest( + Fingerprint::from_hex_string( + "0ee5d4c8ac12513a87c8d949c6883ac533a264d30215126af71a9028c4ab6edf", + ) + .unwrap(), + 140, + )) + .into(), + ); assert_eq!( super::make_execute_request(&req, &None, &Some("meep".to_owned())), @@ -1251,19 +1197,17 @@ mod tests { ); want_action.set_input_root_digest((&input_directory.digest()).into()); - let want_execute_request = bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest { - action_digest: Some( - (&Digest( - Fingerprint::from_hex_string( - "b1fb7179ce496995a4e3636544ec000dca1b951f1f6216493f6c7608dc4dd910", - ) - .unwrap(), - 140, - )) - .into(), - ), - ..Default::default() - }; + let mut want_execute_request = bazel_protos::remote_execution::ExecuteRequest::new(); + want_execute_request.set_action_digest( + (&Digest( + Fingerprint::from_hex_string( + "b1fb7179ce496995a4e3636544ec000dca1b951f1f6216493f6c7608dc4dd910", + ) + .unwrap(), + 140, + )) + .into(), + ); assert_eq!( super::make_execute_request(&req, &None, &None), @@ -1301,7 +1245,7 @@ mod tests { let error = run_command_remote(mock_server.address(), execute_request).expect_err("Want Err"); assert_eq!( error, - "InvalidArgument: Did not expect this request".to_string() + "InvalidArgument: \"Did not expect this request\"".to_string() ); } @@ -1444,19 +1388,17 @@ mod tests { ) .expect("Failed to make store"); - let mut rt = tokio::runtime::Runtime::new().unwrap(); - let cmd_runner = CommandRunner::new( &mock_server.address(), None, None, None, + None, + 1, store, timer_thread, - ) - .unwrap(); - let result = rt.block_on(cmd_runner.run(echo_roland_request())).unwrap(); - rt.shutdown_now().wait().unwrap(); + ); + let result = cmd_runner.run(echo_roland_request()).wait().unwrap(); assert_eq!( result.without_execution_attempts(), FallibleExecuteProcessResult { @@ -1621,17 +1563,21 @@ mod tests { vec![ make_incomplete_operation(&op_name), MockOperation::new({ - bazel_protos::google::longrunning::Operation { - name: op_name.clone(), - done: true, - result: Some( - bazel_protos::google::longrunning::operation::Result::Response(prost_types::Any { - type_url: "build.bazel.remote.execution.v2.ExecuteResponse".to_string(), - value: vec![0x00, 0x00, 0x00], - }), - ), - ..Default::default() - } + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(op_name.clone()); + op.set_done(true); + op.set_response({ + let mut response_wrapper = protobuf::well_known_types::Any::new(); + response_wrapper.set_type_url(format!( + "type.googleapis.com/{}", + bazel_protos::remote_execution::ExecuteResponse::new() + .descriptor() + .full_name() + )); + response_wrapper.set_value(vec![0x00, 0x00, 0x00]); + response_wrapper + }); + op }), ], )) @@ -1652,20 +1598,18 @@ mod tests { super::make_execute_request(&execute_request, &None, &None) .unwrap() .2, - vec![MockOperation::new( - bazel_protos::google::longrunning::Operation { - name: op_name.clone(), - done: true, - result: Some(bazel_protos::google::longrunning::operation::Result::Error( - bazel_protos::google::rpc::Status { - code: bazel_protos::code::Code::INTERNAL.value(), - message: "Something went wrong".to_string(), - details: vec![], - }, - )), - ..Default::default() - }, - )], + vec![MockOperation::new({ + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(op_name.to_string()); + op.set_done(true); + op.set_error({ + let mut error = bazel_protos::status::Status::new(); + error.set_code(bazel_protos::code::Code::INTERNAL.value()); + error.set_message("Something went wrong".to_string()); + error + }); + op + })], )) }; @@ -1688,17 +1632,17 @@ mod tests { .2, vec![ make_incomplete_operation(&op_name), - MockOperation::new(bazel_protos::google::longrunning::Operation { - name: op_name.clone(), - done: true, - result: Some(bazel_protos::google::longrunning::operation::Result::Error( - bazel_protos::google::rpc::Status { - code: bazel_protos::code::Code::INTERNAL.value(), - message: "Something went wrong".to_string(), - details: vec![], - }, - )), - ..Default::default() + MockOperation::new({ + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(op_name.to_string()); + op.set_done(true); + op.set_error({ + let mut error = bazel_protos::status::Status::new(); + error.set_code(bazel_protos::code::Code::INTERNAL.value()); + error.set_message("Something went wrong".to_string()); + error + }); + op }), ], )) @@ -1721,14 +1665,12 @@ mod tests { super::make_execute_request(&execute_request, &None, &None) .unwrap() .2, - vec![MockOperation::new( - bazel_protos::google::longrunning::Operation { - name: op_name.clone(), - done: true, - result: None, - ..Default::default() - }, - )], + vec![MockOperation::new({ + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(op_name.to_string()); + op.set_done(true); + op + })], )) }; @@ -1751,11 +1693,11 @@ mod tests { .2, vec![ make_incomplete_operation(&op_name), - MockOperation::new(bazel_protos::google::longrunning::Operation { - name: op_name.clone(), - done: true, - result: None, - ..Default::default() + MockOperation::new({ + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(op_name.to_string()); + op.set_done(true); + op }), ], )) @@ -1822,23 +1764,21 @@ mod tests { .wait() .expect("Saving directory bytes to store"); - let mut rt = tokio::runtime::Runtime::new().unwrap(); - - let result = rt.block_on( - CommandRunner::new( - &mock_server.address(), - None, - None, - None, - store, - timer_thread, - ) - .unwrap() - .run(cat_roland_request()), - ); - rt.shutdown_now().wait().unwrap(); + let result = CommandRunner::new( + &mock_server.address(), + None, + None, + None, + None, + 1, + store, + timer_thread, + ) + .run(cat_roland_request()) + .wait() + .unwrap(); assert_eq!( - result.unwrap().without_execution_attempts(), + result.without_execution_attempts(), FallibleExecuteProcessResult { stdout: roland.bytes(), stderr: Bytes::from(""), @@ -1862,9 +1802,17 @@ mod tests { let mock_server = { let op_name = "cat".to_owned(); - let status = make_precondition_failure_status(vec![missing_preconditionfailure_violation( - &roland.digest(), - )]); + let status = grpcio::RpcStatus { + status: grpcio::RpcStatusCode::FailedPrecondition, + details: None, + status_proto_bytes: Some( + make_precondition_failure_status(vec![missing_preconditionfailure_violation( + &roland.digest(), + )]) + .write_to_bytes() + .unwrap(), + ), + }; mock::execution_server::TestServer::new(mock::execution_server::MockExecution::new( op_name.clone(), @@ -1912,19 +1860,18 @@ mod tests { .wait() .expect("Saving file bytes to store"); - let mut rt = tokio::runtime::Runtime::new().unwrap(); - let result = rt.block_on( - CommandRunner::new( - &mock_server.address(), - None, - None, - None, - store, - timer_thread, - ) - .unwrap() - .run(cat_roland_request()), - ); + let result = CommandRunner::new( + &mock_server.address(), + None, + None, + None, + None, + 1, + store, + timer_thread, + ) + .run(cat_roland_request()) + .wait(); assert_eq!( result, Ok(FallibleExecuteProcessResult { @@ -1981,31 +1928,27 @@ mod tests { ) .expect("Failed to make store"); - let mut rt = tokio::runtime::Runtime::new().unwrap(); - let result = rt.block_on( - CommandRunner::new( - &mock_server.address(), - None, - None, - None, - store, - timer_thread, - ) - .unwrap() - .run(cat_roland_request()), - ); - rt.shutdown_now().wait().unwrap(); - let error = result.expect_err("Want error"); + let error = CommandRunner::new( + &mock_server.address(), + None, + None, + None, + None, + 1, + store, + timer_thread, + ) + .run(cat_roland_request()) + .wait() + .expect_err("Want error"); assert_contains(&error, &format!("{}", missing_digest.0)); } #[test] fn format_error_complete() { - let error = bazel_protos::google::rpc::Status { - code: bazel_protos::code::Code::CANCELLED.value(), - message: "Oops, oh well!".to_string(), - details: vec![], - }; + let mut error = bazel_protos::status::Status::new(); + error.set_code(bazel_protos::code::Code::CANCELLED.value()); + error.set_message("Oops, oh well!".to_string()); assert_eq!( super::format_error(&error), "CANCELLED: Oops, oh well!".to_string() @@ -2014,11 +1957,9 @@ mod tests { #[test] fn extract_execute_response_unknown_code() { - let error = bazel_protos::google::rpc::Status { - code: 555, - message: "Oops, oh well!".to_string(), - details: vec![], - }; + let mut error = bazel_protos::status::Status::new(); + error.set_code(555); + error.set_message("Oops, oh well!".to_string()); assert_eq!( super::format_error(&error), "555: Oops, oh well!".to_string() @@ -2035,35 +1976,28 @@ mod tests { execution_attempts: vec![], }; - let response = bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { - result: Some( - bazel_protos::build::bazel::remote::execution::v2::ActionResult { - exit_code: want_result.exit_code, - stdout_raw: want_result.stdout.to_vec(), - stderr_raw: want_result.stderr.to_vec(), - output_files: vec![ - bazel_protos::build::bazel::remote::execution::v2::OutputFile { - path: "cats/roland".to_string(), - digest: Some((&TestData::roland().digest()).into()), - is_executable: false, - }, - ], - ..Default::default() - }, - ), - ..Default::default() - }; - - let operation = bazel_protos::google::longrunning::Operation { - name: "cat".to_owned(), - done: true, - result: Some( - bazel_protos::google::longrunning::operation::Result::Response( - make_any_prost_executeresponse(&response), - ), - ), - ..Default::default() - }; + let mut output_file = bazel_protos::remote_execution::OutputFile::new(); + output_file.set_path("cats/roland".into()); + output_file.set_digest((&TestData::roland().digest()).into()); + output_file.set_is_executable(false); + let mut output_files = protobuf::RepeatedField::new(); + output_files.push(output_file); + + let mut operation = bazel_protos::operations::Operation::new(); + operation.set_name("cat".to_owned()); + operation.set_done(true); + operation.set_response(make_any_proto(&{ + let mut response = bazel_protos::remote_execution::ExecuteResponse::new(); + response.set_result({ + let mut result = bazel_protos::remote_execution::ActionResult::new(); + result.set_exit_code(want_result.exit_code); + result.set_stdout_raw(Bytes::from(want_result.stdout.clone())); + result.set_stderr_raw(Bytes::from(want_result.stderr.clone())); + result.set_output_files(output_files); + result + }); + response + })); assert_eq!( extract_execute_response(operation) @@ -2076,11 +2010,9 @@ mod tests { #[test] fn extract_execute_response_pending() { let operation_name = "cat".to_owned(); - let operation = bazel_protos::google::longrunning::Operation { - name: operation_name.clone(), - done: false, - ..Default::default() - }; + let mut operation = bazel_protos::operations::Operation::new(); + operation.set_name(operation_name.clone()); + operation.set_done(false); assert_eq!( extract_execute_response(operation), @@ -2115,10 +2047,11 @@ mod tests { fn extract_execute_response_missing_other_things() { let missing = vec![ missing_preconditionfailure_violation(&TestData::roland().digest()), - bazel_protos::google::rpc::precondition_failure::Violation { - type_: "MISSING".to_string(), - subject: "monkeys".to_string(), - description: "".to_string(), + { + let mut violation = bazel_protos::error_details::PreconditionFailure_Violation::new(); + violation.set_field_type("MISSING".to_owned()); + violation.set_subject("monkeys".to_owned()); + violation }, ]; @@ -2135,9 +2068,10 @@ mod tests { #[test] fn extract_execute_response_other_failed_precondition() { - let missing = vec![bazel_protos::google::rpc::precondition_failure::Violation { - type_: "OUT_OF_CAPACITY".to_string(), - ..Default::default() + let missing = vec![{ + let mut violation = bazel_protos::error_details::PreconditionFailure_Violation::new(); + violation.set_field_type("OUT_OF_CAPACITY".to_owned()); + violation }]; let operation = make_precondition_failure_operation(missing) @@ -2168,24 +2102,18 @@ mod tests { #[test] fn extract_execute_response_other_status() { - let operation = bazel_protos::google::longrunning::Operation { - name: "cat".to_owned(), - done: true, - result: Some( - bazel_protos::google::longrunning::operation::Result::Response( - make_any_prost_executeresponse( - &bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { - status: Some(bazel_protos::google::rpc::Status { - code: bazel_protos::google::rpc::Code::PermissionDenied.into(), - ..Default::default() - }), - ..Default::default() - }, - ), - ), - ), - ..Default::default() - }; + let mut operation = bazel_protos::operations::Operation::new(); + operation.set_name("cat".to_owned()); + operation.set_done(true); + operation.set_response(make_any_proto(&{ + let mut response = bazel_protos::remote_execution::ExecuteResponse::new(); + response.set_status({ + let mut status = bazel_protos::status::Status::new(); + status.set_code(grpcio::RpcStatusCode::PermissionDenied as i32); + status + }); + response + })); match extract_execute_response(operation) { Err(ExecutionError::Fatal(err)) => assert_contains(&err, "PermissionDenied"), @@ -2314,90 +2242,103 @@ mod tests { #[test] fn extract_output_files_from_response_one_file() { - let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { - exit_code: 0, - output_files: vec![ - bazel_protos::build::bazel::remote::execution::v2::OutputFile { - path: "roland".to_string(), - digest: Some((&TestData::roland().digest()).into()), - is_executable: false, - }, - ], - ..Default::default() - }; + let mut output_file = bazel_protos::remote_execution::OutputFile::new(); + output_file.set_path("roland".into()); + output_file.set_digest((&TestData::roland().digest()).into()); + output_file.set_is_executable(false); + let mut output_files = protobuf::RepeatedField::new(); + output_files.push(output_file); + + let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); + execute_response.set_result({ + let mut result = bazel_protos::remote_execution::ActionResult::new(); + result.set_exit_code(0); + result.set_output_files(output_files); + result + }); + assert_eq!( - extract_output_files_from_response(&result), + extract_output_files_from_response(&execute_response), Ok(TestDirectory::containing_roland().digest()) ) } #[test] fn extract_output_files_from_response_two_files_not_nested() { - let output_files = vec![ - bazel_protos::build::bazel::remote::execution::v2::OutputFile { - path: "roland".to_string(), - digest: Some((&TestData::roland().digest()).into()), - is_executable: false, - }, - bazel_protos::build::bazel::remote::execution::v2::OutputFile { - path: "treats".to_string(), - digest: Some((&TestData::catnip().digest()).into()), - is_executable: false, - }, - ]; - - let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { - output_files, - ..Default::default() - }; + let mut output_file_1 = bazel_protos::remote_execution::OutputFile::new(); + output_file_1.set_path("roland".into()); + output_file_1.set_digest((&TestData::roland().digest()).into()); + output_file_1.set_is_executable(false); + + let mut output_file_2 = bazel_protos::remote_execution::OutputFile::new(); + output_file_2.set_path("treats".into()); + output_file_2.set_digest((&TestData::catnip().digest()).into()); + output_file_2.set_is_executable(false); + let mut output_files = protobuf::RepeatedField::new(); + output_files.push(output_file_1); + output_files.push(output_file_2); + + let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); + execute_response.set_result({ + let mut result = bazel_protos::remote_execution::ActionResult::new(); + result.set_exit_code(0); + result.set_output_files(output_files); + result + }); assert_eq!( - extract_output_files_from_response(&result), + extract_output_files_from_response(&execute_response), Ok(TestDirectory::containing_roland_and_treats().digest()) ) } #[test] fn extract_output_files_from_response_two_files_nested() { - let output_files = vec![ - bazel_protos::build::bazel::remote::execution::v2::OutputFile { - path: "cats/roland".to_string(), - digest: Some((&TestData::roland().digest()).into()), - is_executable: false, - }, - bazel_protos::build::bazel::remote::execution::v2::OutputFile { - path: "treats".to_string(), - digest: Some((&TestData::catnip().digest()).into()), - is_executable: false, - }, - ]; - - let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { - output_files, - ..Default::default() - }; + let mut output_file_1 = bazel_protos::remote_execution::OutputFile::new(); + output_file_1.set_path("cats/roland".into()); + output_file_1.set_digest((&TestData::roland().digest()).into()); + output_file_1.set_is_executable(false); + + let mut output_file_2 = bazel_protos::remote_execution::OutputFile::new(); + output_file_2.set_path("treats".into()); + output_file_2.set_digest((&TestData::catnip().digest()).into()); + output_file_2.set_is_executable(false); + let mut output_files = protobuf::RepeatedField::new(); + output_files.push(output_file_1); + output_files.push(output_file_2); + + let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); + execute_response.set_result({ + let mut result = bazel_protos::remote_execution::ActionResult::new(); + result.set_exit_code(0); + result.set_output_files(output_files); + result + }); assert_eq!( - extract_output_files_from_response(&result), + extract_output_files_from_response(&execute_response), Ok(TestDirectory::recursive().digest()) ) } #[test] fn extract_output_files_from_response_just_directory() { - let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { - exit_code: 0, - output_directories: vec![ - bazel_protos::build::bazel::remote::execution::v2::OutputDirectory { - path: "cats".to_owned(), - tree_digest: Some((&TestDirectory::containing_roland().digest()).into()), - }, - ], - ..Default::default() - }; + let mut output_directory = bazel_protos::remote_execution::OutputDirectory::new(); + output_directory.set_path("cats".into()); + output_directory.set_tree_digest((&TestDirectory::containing_roland().digest()).into()); + let mut output_directories = protobuf::RepeatedField::new(); + output_directories.push(output_directory); + + let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); + execute_response.set_result({ + let mut result = bazel_protos::remote_execution::ActionResult::new(); + result.set_exit_code(0); + result.set_output_directories(output_directories); + result + }); assert_eq!( - extract_output_files_from_response(&result), + extract_output_files_from_response(&execute_response), Ok(TestDirectory::nested().digest()) ) } @@ -2408,29 +2349,40 @@ mod tests { // /pets/cats/roland // /pets/dogs/robin - let result = bazel_protos::build::bazel::remote::execution::v2::ActionResult { - output_files: vec![ - bazel_protos::build::bazel::remote::execution::v2::OutputFile { - path: "treats".to_owned(), - digest: Some((&TestData::catnip().digest()).into()), - is_executable: false, - }, - ], - output_directories: vec![ - bazel_protos::build::bazel::remote::execution::v2::OutputDirectory { - path: "pets/cats".to_owned(), - tree_digest: Some((&TestDirectory::containing_roland().digest()).into()), - }, - bazel_protos::build::bazel::remote::execution::v2::OutputDirectory { - path: "pets/dogs".to_owned(), - tree_digest: Some((&TestDirectory::containing_robin().digest()).into()), - }, - ], - ..Default::default() - }; + let mut output_directories = protobuf::RepeatedField::new(); + output_directories.push({ + let mut output_directory = bazel_protos::remote_execution::OutputDirectory::new(); + output_directory.set_path("pets/cats".into()); + output_directory.set_tree_digest((&TestDirectory::containing_roland().digest()).into()); + output_directory + }); + output_directories.push({ + let mut output_directory = bazel_protos::remote_execution::OutputDirectory::new(); + output_directory.set_path("pets/dogs".into()); + output_directory.set_tree_digest((&TestDirectory::containing_robin().digest()).into()); + output_directory + }); + + let mut execute_response = bazel_protos::remote_execution::ExecuteResponse::new(); + execute_response.set_result({ + let mut result = bazel_protos::remote_execution::ActionResult::new(); + result.set_exit_code(0); + result.set_output_directories(output_directories); + result.set_output_files({ + let mut output_files = protobuf::RepeatedField::new(); + output_files.push({ + let mut output_file = bazel_protos::remote_execution::OutputFile::new(); + output_file.set_path("treats".into()); + output_file.set_digest((&TestData::catnip().digest()).into()); + output_file + }); + output_files + }); + result + }); assert_eq!( - extract_output_files_from_response(&result), + extract_output_files_from_response(&execute_response), Ok(Digest( Fingerprint::from_hex_string( "639b4b84bb58a9353d49df8122e7987baf038efe54ed035e67910846c865b1e2" @@ -2462,19 +2414,16 @@ mod tests { } fn make_incomplete_operation(operation_name: &str) -> MockOperation { - MockOperation::new(bazel_protos::google::longrunning::Operation { - name: operation_name.to_string(), - done: false, - ..Default::default() - }) + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(operation_name.to_string()); + op.set_done(false); + MockOperation::new(op) } fn make_delayed_incomplete_operation(operation_name: &str, delay: Duration) -> MockOperation { - let op = bazel_protos::google::longrunning::Operation { - name: operation_name.to_string(), - done: false, - ..Default::default() - }; + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(operation_name.to_string()); + op.set_done(false); MockOperation { op: Ok(Some(op)), duration: Some(delay), @@ -2487,74 +2436,72 @@ mod tests { stderr: StderrType, exit_code: i32, ) -> MockOperation { - let (stdout_raw, stdout_digest) = match stdout { - StdoutType::Raw(stdout_raw) => (stdout_raw.as_bytes().to_vec(), None), - StdoutType::Digest(stdout_digest) => (vec![], Some((&stdout_digest).into())), - }; - - let (stderr_raw, stderr_digest) = match stderr { - StderrType::Raw(stderr_raw) => (stderr_raw.as_bytes().to_vec(), None), - StderrType::Digest(stderr_digest) => (vec![], Some((&stderr_digest).into())), - }; - - let response_proto = bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { - result: Some( - bazel_protos::build::bazel::remote::execution::v2::ActionResult { - stdout_raw, - stdout_digest, - stderr_raw, - stderr_digest, - exit_code, - ..Default::default() - }, - ), - ..Default::default() - }; + let mut op = bazel_protos::operations::Operation::new(); + op.set_name(operation_name.to_string()); + op.set_done(true); + op.set_response({ + let mut response_proto = bazel_protos::remote_execution::ExecuteResponse::new(); + response_proto.set_result({ + let mut action_result = bazel_protos::remote_execution::ActionResult::new(); + match stdout { + StdoutType::Raw(stdout_raw) => { + action_result.set_stdout_raw(Bytes::from(stdout_raw)); + } + StdoutType::Digest(stdout_digest) => { + action_result.set_stdout_digest((&stdout_digest).into()); + } + } + match stderr { + StderrType::Raw(stderr_raw) => { + action_result.set_stderr_raw(Bytes::from(stderr_raw)); + } + StderrType::Digest(stderr_digest) => { + action_result.set_stderr_digest((&stderr_digest).into()); + } + } + action_result.set_exit_code(exit_code); + action_result + }); - let op = bazel_protos::google::longrunning::Operation { - name: operation_name.to_string(), - done: true, - result: Some( - bazel_protos::google::longrunning::operation::Result::Response( - make_any_prost_executeresponse(&response_proto), - ), - ), - ..Default::default() - }; + let mut response_wrapper = protobuf::well_known_types::Any::new(); + response_wrapper.set_type_url(format!( + "type.googleapis.com/{}", + response_proto.descriptor().full_name() + )); + let response_proto_bytes = response_proto.write_to_bytes().unwrap(); + response_wrapper.set_value(response_proto_bytes); + response_wrapper + }); MockOperation::new(op) } fn make_precondition_failure_operation( - violations: Vec, + violations: Vec, ) -> MockOperation { - let response = bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse { - status: Some(make_precondition_failure_status(violations)), - ..Default::default() - }; - let operation = bazel_protos::google::longrunning::Operation { - name: "cat".to_string(), - done: true, - result: Some( - bazel_protos::google::longrunning::operation::Result::Response( - make_any_prost_executeresponse(&response), - ), - ), - ..Default::default() - }; + let mut operation = bazel_protos::operations::Operation::new(); + operation.set_name("cat".to_owned()); + operation.set_done(true); + operation.set_response(make_any_proto(&{ + let mut response = bazel_protos::remote_execution::ExecuteResponse::new(); + response.set_status(make_precondition_failure_status(violations)); + response + })); MockOperation::new(operation) } fn make_precondition_failure_status( - violations: Vec, - ) -> bazel_protos::google::rpc::Status { - bazel_protos::google::rpc::Status { - code: bazel_protos::google::rpc::Code::FailedPrecondition.into(), - details: vec![make_any_prost_proto( - "google.rpc.PreconditionFailure", - &bazel_protos::google::rpc::PreconditionFailure { violations }, - )], - ..Default::default() - } + violations: Vec, + ) -> bazel_protos::status::Status { + let mut status = bazel_protos::status::Status::new(); + status.set_code(grpcio::RpcStatusCode::FailedPrecondition as i32); + status.mut_details().push(make_any_proto(&{ + let mut precondition_failure = bazel_protos::error_details::PreconditionFailure::new(); + for violation in violations.into_iter() { + precondition_failure.mut_violations().push(violation); + } + precondition_failure + })); + status } fn run_command_remote( @@ -2565,11 +2512,8 @@ mod tests { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let mut runtime = tokio::runtime::Runtime::new().unwrap(); let command_runner = create_command_runner(address, &cas); - let result = runtime.block_on(command_runner.run(request)); - runtime.shutdown_now().wait().unwrap(); - result + command_runner.run(request).wait() } fn create_command_runner(address: String, cas: &mock::StubCAS) -> CommandRunner { @@ -2591,8 +2535,7 @@ mod tests { ) .expect("Failed to make store"); - CommandRunner::new(&address, None, None, None, store, timer_thread) - .expect("Failed to make command runner") + CommandRunner::new(&address, None, None, None, None, 1, store, timer_thread) } fn timer_thread() -> resettable::Resettable { @@ -2600,62 +2543,52 @@ mod tests { } fn extract_execute_response( - operation: bazel_protos::google::longrunning::Operation, + operation: bazel_protos::operations::Operation, ) -> Result { let cas = mock::StubCAS::builder() .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let mut runtime = tokio::runtime::Runtime::new().unwrap(); - let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas); - let result = runtime.block_on( - command_runner.extract_execute_response(operation, &mut ExecutionHistory::default()), - ); - - runtime.shutdown_now().wait().unwrap(); - result + let command_runner = create_command_runner("".to_owned(), &cas); + command_runner + .extract_execute_response( + super::OperationOrStatus::Operation(operation), + &mut ExecutionHistory::default(), + ) + .wait() } fn extract_output_files_from_response( - result: &bazel_protos::build::bazel::remote::execution::v2::ActionResult, + execute_response: &bazel_protos::remote_execution::ExecuteResponse, ) -> Result { let cas = mock::StubCAS::builder() .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - - let mut runtime = tokio::runtime::Runtime::new().unwrap(); - let command_runner = create_command_runner("127.0.0.1:0".to_owned(), &cas); - let result = runtime.block_on(command_runner.extract_output_files(result)); - runtime.shutdown_now().wait().unwrap(); - result - } - - fn make_any_prost_executeresponse( - message: &bazel_protos::build::bazel::remote::execution::v2::ExecuteResponse, - ) -> prost_types::Any { - make_any_prost_proto("build.bazel.remote.execution.v2.ExecuteResponse", message) + let command_runner = create_command_runner("".to_owned(), &cas); + command_runner + .extract_output_files(&execute_response) + .wait() } - fn make_any_prost_proto(message_name: &str, message: &M) -> prost_types::Any { - let size = message.encoded_len(); - let mut value = BytesMut::with_capacity(size); - message.encode(&mut value).expect("Error serializing proto"); - prost_types::Any { - type_url: format!("type.googleapis.com/{}", message_name), - value: value.to_vec(), - } + fn make_any_proto(message: &dyn Message) -> protobuf::well_known_types::Any { + let mut any = protobuf::well_known_types::Any::new(); + any.set_type_url(format!( + "type.googleapis.com/{}", + message.descriptor().full_name() + )); + any.set_value(message.write_to_bytes().expect("Error serializing proto")); + any } fn missing_preconditionfailure_violation( digest: &Digest, - ) -> bazel_protos::google::rpc::precondition_failure::Violation { + ) -> bazel_protos::error_details::PreconditionFailure_Violation { { - bazel_protos::google::rpc::precondition_failure::Violation { - type_: "MISSING".to_owned(), - subject: format!("blobs/{}/{}", digest.0, digest.1), - ..Default::default() - } + let mut violation = bazel_protos::error_details::PreconditionFailure_Violation::new(); + violation.set_field_type("MISSING".to_owned()); + violation.set_subject(format!("blobs/{}/{}", digest.0, digest.1)); + violation } } diff --git a/src/rust/engine/process_executor/Cargo.toml b/src/rust/engine/process_executor/Cargo.toml index 87453d8b797..d4c45ad05ec 100644 --- a/src/rust/engine/process_executor/Cargo.toml +++ b/src/rust/engine/process_executor/Cargo.toml @@ -15,4 +15,3 @@ hashing = { path = "../hashing" } futures = "^0.1.16" process_execution = { path = "../process_execution" } resettable = { path = "../resettable" } -tokio = "0.1.14" diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 247b40872f1..4946381b05f 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -92,6 +92,13 @@ fn main() { If unspecified, local execution will be performed.", ), ) + .arg( + Arg::with_name("execution-root-ca-cert-file") + .help("Path to file containing root certificate authority certificates for the execution server. If not set, TLS will not be used when connecting to the execution server.") + .takes_value(true) + .long("execution-root-ca-cert-file") + .required(false) + ) .arg( Arg::with_name("execution-oauth-bearer-token-path") .help("Path to file containing oauth bearer token for communication with the execution server. If not set, no authorization will be provided to remote servers.") @@ -283,6 +290,12 @@ fn main() { let runner: Box = match server_arg { Some(address) => { + let root_ca_certs = if let Some(path) = args.value_of("execution-root-ca-cert-file") { + Some(std::fs::read(path).expect("Error reading root CA certs file")) + } else { + None + }; + let oauth_bearer_token = if let Some(path) = args.value_of("execution-oauth-bearer-token-path") { Some(std::fs::read_to_string(path).expect("Error reading oauth bearer token file")) @@ -290,17 +303,16 @@ fn main() { None }; - Box::new( - process_execution::remote::CommandRunner::new( - address, - args.value_of("cache-key-gen-version").map(str::to_owned), - remote_instance_arg, - oauth_bearer_token, - store.clone(), - timer_thread, - ) - .expect("Could not initialize remote execution client"), - ) as Box + Box::new(process_execution::remote::CommandRunner::new( + address, + args.value_of("cache-key-gen-version").map(str::to_owned), + remote_instance_arg, + root_ca_certs, + oauth_bearer_token, + 1, + store.clone(), + timer_thread, + )) as Box } None => Box::new(process_execution::local::CommandRunner::new( store.clone(), @@ -309,18 +321,17 @@ fn main() { true, )) as Box, }; - let mut rt = tokio::runtime::Runtime::new().unwrap(); - let result = rt.block_on(runner.run(request)).unwrap(); + + let result = runner.run(request).wait().expect("Error executing"); if let Some(output) = args.value_of("materialize-output-to").map(PathBuf::from) { - rt.block_on(store.materialize_directory(output, result.output_directory)) + store + .materialize_directory(output, result.output_directory) + .wait() .unwrap(); - }; + } print!("{}", String::from_utf8(result.stdout.to_vec()).unwrap()); eprint!("{}", String::from_utf8(result.stderr.to_vec()).unwrap()); - - rt.shutdown_now().wait().unwrap(); - exit(result.exit_code); } diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 1fac707784c..024f9845c3e 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -130,23 +130,23 @@ impl Core { .unwrap_or_else(|e| panic!("Could not initialize Store: {:?}", e)); let underlying_command_runner: Box = match &remote_execution_server { - Some(ref address) => Box::new( - process_execution::remote::CommandRunner::new( - address, - remote_execution_process_cache_namespace.clone(), - remote_instance_name.clone(), - oauth_bearer_token.clone(), - store.clone(), - futures_timer_thread2.clone(), - ) - .expect("Could not initialize remote execution client"), - ) as Box, + Some(ref address) => Box::new(process_execution::remote::CommandRunner::new( + address, + remote_execution_process_cache_namespace.clone(), + remote_instance_name.clone(), + root_ca_certs.clone(), + oauth_bearer_token.clone(), + // Allow for some overhead for bookkeeping threads (if any). + process_execution_parallelism + 2, + store.clone(), + futures_timer_thread2.clone(), + )), None => Box::new(process_execution::local::CommandRunner::new( store.clone(), fs_pool2.clone(), work_dir.clone(), process_execution_cleanup_local_dirs, - )) as Box, + )), }; let command_runner = diff --git a/src/rust/engine/testutil/mock/src/execution_server.rs b/src/rust/engine/testutil/mock/src/execution_server.rs index d61c5c89ce5..f4a7973aeb9 100644 --- a/src/rust/engine/testutil/mock/src/execution_server.rs +++ b/src/rust/engine/testutil/mock/src/execution_server.rs @@ -22,13 +22,12 @@ use protobuf; /// #[derive(Clone, Debug)] pub struct MockOperation { - pub op: - Result, bazel_protos::google::rpc::Status>, + pub op: Result, grpcio::RpcStatus>, pub duration: Option, } impl MockOperation { - pub fn new(op: bazel_protos::google::longrunning::Operation) -> MockOperation { + pub fn new(op: bazel_protos::operations::Operation) -> MockOperation { MockOperation { op: Ok(Some(op)), duration: None, @@ -54,12 +53,12 @@ impl MockExecution { /// pub fn new( name: String, - execute_request: bazel_protos::build::bazel::remote::execution::v2::ExecuteRequest, + execute_request: bazel_protos::remote_execution::ExecuteRequest, operation_responses: Vec, ) -> MockExecution { MockExecution { name: name, - execute_request: execute_request.into(), + execute_request: execute_request, operation_responses: Arc::new(Mutex::new(VecDeque::from(operation_responses))), } } @@ -199,9 +198,9 @@ impl MockResponder { } if let Ok(Some(op)) = op { // Complete the channel with the op. - sink.success(op.clone().into()); + sink.success(op.clone()); } else if let Err(status) = op { - sink.fail(status.into()); + sink.fail(status); } else { // Cancel the request by dropping the sink. drop(sink); @@ -227,13 +226,13 @@ impl MockResponder { if let Ok(Some(op)) = op { ctx.spawn( sink - .send((op.clone().into(), grpcio::WriteFlags::default())) + .send((op.clone(), grpcio::WriteFlags::default())) .map(|mut stream| stream.close()) .map(|_| ()) .map_err(|_| ()), ) } else if let Err(status) = op { - sink.fail(status.into()); + sink.fail(status); } else { // Cancel the request by dropping the sink. drop(sink)