From f70454a69cfb7c3354d1822972018e74798740e0 Mon Sep 17 00:00:00 2001 From: Cormac Relf Date: Tue, 3 Sep 2024 15:23:57 -0500 Subject: [PATCH] buck2_execute: implement OSS `upload_blob` for `local_only` cache uploads Forward-port of patch 4 in , providing a clear piece of missing functionality: in the event that stdout or stderr were more than 50KiB of output when caching `local_only` actions, then this dead path was taken, and so stdout/stderr would not be uploaded successfully in the cache. Co-authored-by: Austin Seipp --- app/buck2_execute/src/execute/output.rs | 15 +++++++++++--- app/buck2_execute/src/re/client.rs | 8 +++++--- app/buck2_execute/src/re/manager.rs | 2 +- .../src/executors/caching.rs | 2 +- remote_execution/oss/re_grpc/src/client.rs | 20 ++++++++++++++----- 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/app/buck2_execute/src/execute/output.rs b/app/buck2_execute/src/execute/output.rs index 2b156aba2a58..2ecbbbae900c 100644 --- a/app/buck2_execute/src/execute/output.rs +++ b/app/buck2_execute/src/execute/output.rs @@ -14,10 +14,12 @@ use anyhow::Context; use buck2_common::file_ops::FileDigest; use buck2_core::execution_types::executor_config::RemoteExecutorUseCase; use futures::future; +use remote_execution::InlinedBlobWithDigest; use remote_execution::TDigest; use crate::digest::CasDigestConversionResultExt; use crate::digest::CasDigestFromReExt; +use crate::digest::CasDigestToReExt; use crate::digest_config::DigestConfig; use crate::re::manager::ManagedRemoteExecutionClient; use crate::re::streams::RemoteCommandStdStreams; @@ -238,12 +240,13 @@ impl CommandStdStreams { self, client: &ManagedRemoteExecutionClient, use_case: RemoteExecutorUseCase, + digest_config: DigestConfig, ) -> anyhow::Result> { match self { Self::Local { stdout, stderr } => { let (stdout, stderr) = future::try_join( - maybe_upload_to_re(client, use_case, stdout), - maybe_upload_to_re(client, use_case, stderr), + maybe_upload_to_re(client, use_case, stdout, digest_config), + maybe_upload_to_re(client, use_case, stderr, digest_config), ) .await?; @@ -276,11 +279,17 @@ async fn maybe_upload_to_re( client: &ManagedRemoteExecutionClient, use_case: RemoteExecutorUseCase, bytes: Vec, + digest_config: DigestConfig, ) -> anyhow::Result { const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE if bytes.len() < MIN_STREAM_UPLOAD_SIZE { return Ok(ReStdStream::Raw(bytes)); } - let digest = client.upload_blob(bytes, use_case).await?; + let inline_blob = InlinedBlobWithDigest { + digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(), + blob: bytes, + ..Default::default() + }; + let digest = client.upload_blob(inline_blob, use_case).await?; Ok(ReStdStream::Digest(digest)) } diff --git a/app/buck2_execute/src/re/client.rs b/app/buck2_execute/src/re/client.rs index cb0f68ecb15a..051afb631968 100644 --- a/app/buck2_execute/src/re/client.rs +++ b/app/buck2_execute/src/re/client.rs @@ -318,7 +318,7 @@ impl RemoteExecutionClient { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> anyhow::Result { self.data @@ -1151,9 +1151,10 @@ impl RemoteExecutionClientImpl { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> anyhow::Result { + let digest = blob.digest.clone(); with_error_handler( "upload_blob", self.get_session_id(), @@ -1161,7 +1162,8 @@ impl RemoteExecutionClientImpl { .upload_blob(blob, use_case.metadata(None)) .await, ) - .await + .await?; + Ok(digest) } async fn materialize_files( diff --git a/app/buck2_execute/src/re/manager.rs b/app/buck2_execute/src/re/manager.rs index cda180c08d9f..c267b14867d9 100644 --- a/app/buck2_execute/src/re/manager.rs +++ b/app/buck2_execute/src/re/manager.rs @@ -460,7 +460,7 @@ impl ManagedRemoteExecutionClient { pub async fn upload_blob( &self, - blob: Vec, + blob: InlinedBlobWithDigest, use_case: RemoteExecutorUseCase, ) -> anyhow::Result { self.lock()?.get().await?.upload_blob(blob, use_case).await diff --git a/app/buck2_execute_impl/src/executors/caching.rs b/app/buck2_execute_impl/src/executors/caching.rs index 409cec25cd62..4319760bcb33 100644 --- a/app/buck2_execute_impl/src/executors/caching.rs +++ b/app/buck2_execute_impl/src/executors/caching.rs @@ -416,7 +416,7 @@ impl CacheUploader { .report .std_streams .clone() - .into_re(&self.re_client, self.re_use_case) + .into_re(&self.re_client, self.re_use_case, digest_config) .await .context("Error accessing std_streams") }; diff --git a/remote_execution/oss/re_grpc/src/client.rs b/remote_execution/oss/re_grpc/src/client.rs index 3c5b8a3b941c..9713c84ffb60 100644 --- a/remote_execution/oss/re_grpc/src/client.rs +++ b/remote_execution/oss/re_grpc/src/client.rs @@ -758,11 +758,21 @@ impl REClient { pub async fn upload_blob( &self, - _blob: Vec, - _metadata: RemoteExecutionMetadata, - ) -> anyhow::Result { - // TODO(aloiscochard) - Err(anyhow::anyhow!("Not implemented (RE upload_blob)")) + blob: InlinedBlobWithDigest, + metadata: RemoteExecutionMetadata, + ) -> anyhow::Result<()> { + self.upload( + metadata, + UploadRequest { + inlined_blobs_with_digest: Some(vec![blob]), + files_with_digest: None, + directories: None, + upload_only_missing: false, + ..Default::default() + }, + ) + .await?; + Ok(()) } pub async fn download(