diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index d00acc766ad..e4db49cc353 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -18,8 +18,8 @@ use protos::gen::build::bazel::remote::execution::v2 as remexec; use protos::gen::google::bytestream::byte_stream_client::ByteStreamClient; use remexec::{ capabilities_client::CapabilitiesClient, - content_addressable_storage_client::ContentAddressableStorageClient, BatchReadBlobsRequest, - BatchUpdateBlobsRequest, ServerCapabilities, + content_addressable_storage_client::ContentAddressableStorageClient, BatchUpdateBlobsRequest, + ServerCapabilities, }; use tonic::{Code, Request, Status}; use workunit_store::{in_workunit, ObservationMetric}; @@ -203,7 +203,16 @@ impl ByteStore { }) } - async fn len_is_allowed_for_batch_api(&self, len: usize) -> Result { + async fn store_bytes_source( + &self, + digest: Digest, + bytes: ByteSource, + ) -> Result<(), ByteStoreError> + where + ByteSource: Fn(Range) -> Bytes + Send + Sync + 'static, + { + let len = digest.size_bytes; + let max_batch_total_size_bytes = { let capabilities = self.get_capabilities().await?; @@ -217,18 +226,7 @@ impl ByteStore { let batch_api_allowed_by_local_config = len <= self.batch_api_size_limit; let batch_api_allowed_by_server_config = max_batch_total_size_bytes == 0 || len < max_batch_total_size_bytes; - Ok(batch_api_allowed_by_local_config && batch_api_allowed_by_server_config) - } - - async fn store_bytes_source( - &self, - digest: Digest, - bytes: ByteSource, - ) -> Result<(), ByteStoreError> - where - ByteSource: Fn(Range) -> Bytes + Send + Sync + 'static, - { - if self.len_is_allowed_for_batch_api(digest.size_bytes).await? { + if batch_api_allowed_by_local_config && batch_api_allowed_by_server_config { self.store_bytes_source_batch(digest, bytes).await } else { self.store_bytes_source_stream(digest, bytes).await @@ -341,62 +339,6 @@ impl ByteStore { &self, digest: Digest, f: F, - ) -> Result, ByteStoreError> { - if self.len_is_allowed_for_batch_api(digest.size_bytes).await? { - self.load_bytes_with_batch(digest, f).await - } else { - self.load_bytes_with_stream(digest, f).await - } - } - - async fn load_bytes_with_batch< - T: Send + 'static, - F: Fn(Bytes) -> Result + Send + Sync + Clone + 'static, - >( - &self, - digest: Digest, - f: F, - ) -> Result, ByteStoreError> { - let request = BatchReadBlobsRequest { - instance_name: self.instance_name.clone().unwrap_or_default(), - digests: vec![digest.into()], - }; - let mut client = self.cas_client.as_ref().clone(); - let response = client - .batch_read_blobs(request) - .await - .map_err(ByteStoreError::Grpc)?; - - let response = response.into_inner(); - if response.responses.len() != 1 { - return Err(ByteStoreError::Other( - format!( - "Response from remote store for BatchReadBlobs API had inconsistent number of responses (got {}, expected 1)", - response.responses.len() - ) - )); - } - - let blob_response = response.responses.into_iter().next().unwrap(); - let rpc_status = blob_response.status.unwrap_or_default(); - let status = Status::from(rpc_status); - match status.code() { - Code::Ok => { - let result = f(blob_response.data); - result.map(Some).map_err(ByteStoreError::Other) - } - Code::NotFound => Ok(None), - _ => Err(ByteStoreError::Grpc(status)), - } - } - - async fn load_bytes_with_stream< - T: Send + 'static, - F: Fn(Bytes) -> Result + Send + Sync + Clone + 'static, - >( - &self, - digest: Digest, - f: F, ) -> Result, ByteStoreError> { let start = Instant::now(); let store = self.clone(); diff --git a/src/rust/engine/protos/src/conversions.rs b/src/rust/engine/protos/src/conversions.rs index c7fb43da86e..1e385d8cf00 100644 --- a/src/rust/engine/protos/src/conversions.rs +++ b/src/rust/engine/protos/src/conversions.rs @@ -1,5 +1,3 @@ -use tonic::{Code, Status}; - impl<'a> From<&'a hashing::Digest> for crate::gen::build::bazel::remote::execution::v2::Digest { fn from(d: &'a hashing::Digest) -> Self { Self { @@ -53,9 +51,3 @@ pub fn require_digest< None => Err("Protocol violation: Digest missing from a Remote Execution API protobuf.".into()), } } - -impl From for Status { - fn from(rpc_status: crate::gen::google::rpc::Status) -> Self { - Status::new(Code::from_i32(rpc_status.code), rpc_status.message) - } -} diff --git a/src/rust/engine/testutil/mock/src/cas_service.rs b/src/rust/engine/testutil/mock/src/cas_service.rs index 8a95f41fbe2..de855a1ae36 100644 --- a/src/rust/engine/testutil/mock/src/cas_service.rs +++ b/src/rust/engine/testutil/mock/src/cas_service.rs @@ -446,11 +446,6 @@ impl ContentAddressableStorage for StubCASResponder { &self, request: Request, ) -> Result, Status> { - { - let mut request_count = self.read_request_count.lock(); - *request_count += 1; - } - check_auth!(self, request); if self.always_errors {