diff --git a/src/rust/engine/fs/brfs/src/main.rs b/src/rust/engine/fs/brfs/src/main.rs index 9d4f708b247..02e811edc48 100644 --- a/src/rust/engine/fs/brfs/src/main.rs +++ b/src/rust/engine/fs/brfs/src/main.rs @@ -42,7 +42,7 @@ use log::{debug, error, warn}; use parking_lot::Mutex; use protos::gen::build::bazel::remote::execution::v2 as remexec; use protos::require_digest; -use store::Store; +use store::{Store, StoreError}; use tokio::signal::unix::{signal, SignalKind}; use tokio::task; use tokio_stream::wrappers::SignalStream; @@ -196,7 +196,7 @@ impl BuildResultFS { .runtime .block_on(async move { store.load_file_bytes_with(digest, |_| ()).await }) { - Ok(Some(())) => { + Ok(()) => { let executable_inode = self.next_inode; self.next_inode += 1; let non_executable_inode = self.next_inode; @@ -224,8 +224,8 @@ impl BuildResultFS { non_executable_inode })) } - Ok(None) => Ok(None), - Err(err) => Err(err), + Err(StoreError::MissingDigest { .. }) => Ok(None), + Err(err) => Err(err.to_string()), } } } @@ -240,7 +240,7 @@ impl BuildResultFS { .runtime .block_on(async move { store.load_directory(digest).await }) { - Ok(Some(_)) => { + Ok(_) => { // TODO: Kick off some background futures to pre-load the contents of this Directory into // an in-memory cache. Keep a background CPU pool driving those Futures. let inode = self.next_inode; @@ -256,8 +256,8 @@ impl BuildResultFS { ); Ok(Some(inode)) } - Ok(None) => Ok(None), - Err(err) => Err(err), + Err(StoreError::MissingDigest { .. }) => Ok(None), + Err(err) => Err(err.to_string()), } } } @@ -335,7 +335,7 @@ impl BuildResultFS { .block_on(async move { store.load_directory(digest).await }); match maybe_directory { - Ok(Some(directory)) => { + Ok(directory) => { let mut entries = vec![ ReaddirEntry { inode: inode, @@ -396,7 +396,7 @@ impl BuildResultFS { Ok(entries) } - Ok(None) => Err(libc::ENOENT), + Err(StoreError::MissingDigest { .. }) => Err(libc::ENOENT), Err(err) => { error!("Error loading directory {:?}: {}", digest, err); Err(libc::EINVAL) @@ -477,14 +477,18 @@ impl fuser::Filesystem for BuildResultFS { .and_then(|cache_entry| { let store = self.store.clone(); let parent_digest = cache_entry.digest; - self + let directory = self .runtime .block_on(async move { store.load_directory(parent_digest).await }) - .map_err(|err| { - error!("Error reading directory {:?}: {}", parent_digest, err); - libc::EINVAL - })? - .and_then(|directory| self.node_for_digest(&directory, filename)) + .map_err(|err| match err { + StoreError::MissingDigest { .. } => libc::ENOENT, + err => { + error!("Error reading directory {:?}: {}", parent_digest, err); + libc::EINVAL + } + })?; + self + .node_for_digest(&directory, filename) .ok_or(libc::ENOENT) }) .and_then(|node| match node { @@ -583,19 +587,20 @@ impl fuser::Filesystem for BuildResultFS { }) .await }) - .map(|v| { - if v.is_none() { - let maybe_reply = reply2.lock().take(); - if let Some(reply) = maybe_reply { - reply.error(libc::ENOENT); - } - } - }) .or_else(|err| { - error!("Error loading bytes for {:?}: {}", digest, err); let maybe_reply = reply2.lock().take(); - if let Some(reply) = maybe_reply { - reply.error(libc::EINVAL); + match err { + StoreError::MissingDigest { .. } => { + if let Some(reply) = maybe_reply { + reply.error(libc::ENOENT); + } + } + err => { + error!("Error loading bytes for {:?}: {}", digest, err); + if let Some(reply) = maybe_reply { + reply.error(libc::EINVAL); + } + } } Ok(()) }); diff --git a/src/rust/engine/fs/fs_util/src/main.rs b/src/rust/engine/fs/fs_util/src/main.rs index db8126a438e..4f23a17ae58 100644 --- a/src/rust/engine/fs/fs_util/src/main.rs +++ b/src/rust/engine/fs/fs_util/src/main.rs @@ -47,7 +47,9 @@ use parking_lot::Mutex; use protos::require_digest; use serde_derive::Serialize; use std::collections::BTreeMap; -use store::{Snapshot, SnapshotOps, Store, StoreFileByDigest, SubsetParams, UploadSummary}; +use store::{ + Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest, SubsetParams, UploadSummary, +}; #[derive(Debug)] enum ExitCode { @@ -58,6 +60,15 @@ enum ExitCode { #[derive(Debug)] struct ExitError(String, ExitCode); +impl From for ExitError { + fn from(s: StoreError) -> Self { + match s { + md @ StoreError::MissingDigest { .. } => ExitError(md.to_string(), ExitCode::NotFound), + StoreError::Unclassified(s) => ExitError(s, ExitCode::UnknownError), + } + } +} + impl From for ExitError { fn from(s: String) -> Self { ExitError(s, ExitCode::UnknownError) @@ -455,15 +466,11 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { .parse::() .expect("size_bytes must be a non-negative number"); let digest = Digest::new(fingerprint, size_bytes); - let write_result = store - .load_file_bytes_with(digest, |bytes| io::stdout().write_all(bytes).unwrap()) - .await?; - write_result.ok_or_else(|| { - ExitError( - format!("File with digest {:?} not found", digest), - ExitCode::NotFound, - ) - }) + Ok( + store + .load_file_bytes_with(digest, |bytes| io::stdout().write_all(bytes).unwrap()) + .await?, + ) } ("save", args) => { let path = PathBuf::from(args.value_of("path").unwrap()); @@ -523,16 +530,11 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { .expect("protocol error"); let output_digest = output_digest_opt.ok_or_else(|| ExitError("not found".into(), ExitCode::NotFound))?; - store - .materialize_directory(destination, output_digest, Permissions::Writable) - .await - .map_err(|err| { - if err.contains("not found") { - ExitError(err, ExitCode::NotFound) - } else { - err.into() - } - }) + Ok( + store + .materialize_directory(destination, output_digest, Permissions::Writable) + .await?, + ) } (_, _) => unimplemented!(), }, @@ -546,16 +548,11 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { .parse::() .expect("size_bytes must be a non-negative number"); let digest = DirectoryDigest::from_persisted_digest(Digest::new(fingerprint, size_bytes)); - store - .materialize_directory(destination, digest, Permissions::Writable) - .await - .map_err(|err| { - if err.contains("not found") { - ExitError(err, ExitCode::NotFound) - } else { - err.into() - } - }) + Ok( + store + .materialize_directory(destination, digest, Permissions::Writable) + .await?, + ) } ("save", args) => { let posix_fs = Arc::new(make_posix_fs( @@ -629,53 +626,36 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { .await?; } - let proto_bytes: Option> = match args.value_of("output-format").unwrap() { - "binary" => { - let maybe_directory = store.load_directory(digest.as_digest()).await?; - maybe_directory.map(|d| d.to_bytes().to_vec()) - } - "text" => { - let maybe_p = store.load_directory(digest.as_digest()).await?; - maybe_p.map(|p| format!("{:?}\n", p).as_bytes().to_vec()) - } - "recursive-file-list" => { - let maybe_v = expand_files(store, digest.as_digest()).await?; - maybe_v - .map(|v| { - v.into_iter() - .map(|(name, _digest)| format!("{}\n", name)) - .collect::>() - .join("") - }) - .map(String::into_bytes) - } - "recursive-file-list-with-digests" => { - let maybe_v = expand_files(store, digest.as_digest()).await?; - maybe_v - .map(|v| { - v.into_iter() - .map(|(name, digest)| { - format!("{} {:<16} {}\n", digest.hash, digest.size_bytes, name) - }) - .collect::>() - .join("") - }) - .map(String::into_bytes) - } + let proto_bytes: Vec = match args.value_of("output-format").unwrap() { + "binary" => store + .load_directory(digest.as_digest()) + .await? + .to_bytes() + .to_vec(), + "text" => format!("{:?}\n", store.load_directory(digest.as_digest()).await?) + .as_bytes() + .to_vec(), + "recursive-file-list" => expand_files(store, digest.as_digest()) + .await? + .into_iter() + .map(|(name, _digest)| format!("{}\n", name)) + .collect::>() + .join("") + .into_bytes(), + "recursive-file-list-with-digests" => expand_files(store, digest.as_digest()) + .await? + .into_iter() + .map(|(name, digest)| format!("{} {:<16} {}\n", digest.hash, digest.size_bytes, name)) + .collect::>() + .join("") + .into_bytes(), format => { return Err(format!("Unexpected value of --output-format arg: {}", format).into()) } }; - match proto_bytes { - Some(bytes) => { - io::stdout().write_all(&bytes).unwrap(); - Ok(()) - } - None => Err(ExitError( - format!("Directory with digest {:?} not found", digest), - ExitCode::NotFound, - )), - } + + io::stdout().write_all(&proto_bytes).unwrap(); + Ok(()) } (_, _) => unimplemented!(), }, @@ -687,26 +667,17 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> { .parse::() .expect("size_bytes must be a non-negative number"); let digest = Digest::new(fingerprint, size_bytes); - let v = match store + let bytes = match store .load_file_bytes_with(digest, Bytes::copy_from_slice) - .await? + .await { - None => { - let maybe_dir = store.load_directory(digest).await?; - maybe_dir.map(|dir| dir.to_bytes()) - } - Some(bytes) => Some(bytes), + Err(StoreError::MissingDigest { .. }) => store.load_directory(digest).await?.to_bytes(), + Err(e) => return Err(e.into()), + Ok(bytes) => bytes, }; - match v { - Some(bytes) => { - io::stdout().write_all(&bytes).unwrap(); - Ok(()) - } - None => Err(ExitError( - format!("Digest {:?} not found", digest), - ExitCode::NotFound, - )), - } + + io::stdout().write_all(&bytes).unwrap(); + Ok(()) } ("directories", sub_match) => match expect_subcommand(sub_match) { ("list", _) => { @@ -738,17 +709,13 @@ fn expect_subcommand(matches: &clap::ArgMatches) -> (&str, &clap::ArgMatches) { .unwrap_or_else(|| panic!("Expected subcommand. See `--help`.")) } -async fn expand_files( - store: Store, - digest: Digest, -) -> Result>, String> { +async fn expand_files(store: Store, digest: Digest) -> Result, StoreError> { let files = Arc::new(Mutex::new(Vec::new())); - let vec_opt = expand_files_helper(store, digest, String::new(), files.clone()).await?; - Ok(vec_opt.map(|_| { - let mut v = Arc::try_unwrap(files).unwrap().into_inner(); - v.sort_by(|(l, _), (r, _)| l.cmp(r)); - v - })) + expand_files_helper(store, digest, String::new(), files.clone()).await?; + + let mut v = Arc::try_unwrap(files).unwrap().into_inner(); + v.sort_by(|(l, _), (r, _)| l.cmp(r)); + Ok(v) } fn expand_files_helper( @@ -756,44 +723,39 @@ fn expand_files_helper( digest: Digest, prefix: String, files: Arc>>, -) -> BoxFuture<'static, Result, String>> { +) -> BoxFuture<'static, Result<(), StoreError>> { async move { - let maybe_dir = store.load_directory(digest).await?; - match maybe_dir { - Some(dir) => { - { - let mut files_unlocked = files.lock(); - for file in &dir.files { - let file_digest = require_digest(file.digest.as_ref())?; - files_unlocked.push((format!("{}{}", prefix, file.name), file_digest)); - } - } - let subdirs_and_digests = dir - .directories - .iter() - .map(move |subdir| { - let digest = require_digest(subdir.digest.as_ref()); - digest.map(|digest| (subdir, digest)) - }) - .collect::, _>>()?; - future::try_join_all( - subdirs_and_digests - .into_iter() - .map(move |(subdir, digest)| { - expand_files_helper( - store.clone(), - digest, - format!("{}{}/", prefix, subdir.name), - files.clone(), - ) - }) - .collect::>(), - ) - .await - .map(|_| Some(())) + let dir = store.load_directory(digest).await?; + { + let mut files_unlocked = files.lock(); + for file in &dir.files { + let file_digest = require_digest(file.digest.as_ref())?; + files_unlocked.push((format!("{}{}", prefix, file.name), file_digest)); } - None => Ok(None), } + let subdirs_and_digests = dir + .directories + .iter() + .map(move |subdir| { + let digest = require_digest(subdir.digest.as_ref()); + digest.map(|digest| (subdir, digest)) + }) + .collect::, _>>()?; + future::try_join_all( + subdirs_and_digests + .into_iter() + .map(move |(subdir, digest)| { + expand_files_helper( + store.clone(), + digest, + format!("{}{}/", prefix, subdir.name), + files.clone(), + ) + }) + .collect::>(), + ) + .await + .map(|_| ()) } .boxed() } @@ -812,7 +774,7 @@ async fn ensure_uploaded_to_remote( store: &Store, store_has_remote: bool, digest: Digest, -) -> Result { +) -> Result { let summary = if store_has_remote { store .ensure_remote_has_recursive(vec![digest]) diff --git a/src/rust/engine/fs/store/benches/store.rs b/src/rust/engine/fs/store/benches/store.rs index 821261488fe..e834711f157 100644 --- a/src/rust/engine/fs/store/benches/store.rs +++ b/src/rust/engine/fs/store/benches/store.rs @@ -166,7 +166,6 @@ pub fn criterion_benchmark_merge(c: &mut Criterion) { .unwrap(); let directory = executor .block_on(store.load_directory(digest.as_digest())) - .unwrap() .unwrap(); let mut all_file_nodes = directory.files.to_vec(); let mut file_nodes_to_modify = all_file_nodes.split_off(all_file_nodes.len() / 2); diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index ad58c834b69..36dafed6030 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -36,7 +36,7 @@ mod snapshot_tests; pub use crate::snapshot_ops::{SnapshotOps, SubsetParams}; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::fmt::Debug; +use std::fmt::{self, Debug, Display}; use std::fs::OpenOptions; use std::io::{self, Read, Write}; use std::os::unix::fs::{OpenOptionsExt, PermissionsExt}; @@ -101,6 +101,40 @@ impl Default for LocalOptions { } } +#[derive(Debug, PartialEq)] +pub enum StoreError { + /// A Digest was not present in either of the local or remote Stores. + MissingDigest(String, Digest), + /// All other error types. + Unclassified(String), +} + +impl StoreError { + pub fn enrich(self, prefix: &str) -> Self { + match self { + Self::MissingDigest(s, d) => Self::MissingDigest(format!("{prefix}: {s}"), d), + Self::Unclassified(s) => Self::Unclassified(format!("{prefix}: {s}")), + } + } +} + +impl Display for StoreError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::MissingDigest(s, d) => { + write!(f, "{s}: {d:?}") + } + Self::Unclassified(s) => write!(f, "{s}"), + } + } +} + +impl From for StoreError { + fn from(err: String) -> Self { + Self::Unclassified(err) + } +} + // Summary of the files and directories uploaded with an operation // ingested_file_{count, bytes}: Number and combined size of processed files // uploaded_file_{count, bytes}: Number and combined size of files uploaded to the remote @@ -357,7 +391,7 @@ impl Store { &self, digest: Digest, f: F, - ) -> Result, String> { + ) -> Result { // No transformation or verification is needed for files, so we pass in a pair of functions // which always succeed, whether the underlying bytes are coming from a local or remote store. // Unfortunately, we need to be a little verbose to do this. @@ -434,7 +468,7 @@ impl Store { /// TODO: Add a native implementation that skips creating PathStats and directly produces /// a DigestTrie. /// - pub async fn load_digest_trie(&self, digest: DirectoryDigest) -> Result { + pub async fn load_digest_trie(&self, digest: DirectoryDigest) -> Result { if let Some(tree) = digest.tree { // The DigestTrie is already loaded. return Ok(tree); @@ -472,11 +506,14 @@ impl Store { let tree = DigestTrie::from_path_stats(path_stats, &file_digests)?; let computed_digest = tree.compute_root_digest(); if digest.as_digest() != computed_digest { - return Err(format!( - "Computed digest for Snapshot loaded from store mismatched: {:?} vs {:?}", - digest.as_digest(), - computed_digest - )); + return Err( + format!( + "Computed digest for Snapshot loaded from store mismatched: {:?} vs {:?}", + digest.as_digest(), + computed_digest + ) + .into(), + ); } Ok(tree) @@ -489,7 +526,7 @@ impl Store { /// In general, DirectoryDigests should be consumed lazily to avoid fetching from a remote /// store unnecessarily, so this method is primarily useful for tests and benchmarks. /// - pub async fn load_directory_digest(&self, digest: Digest) -> Result { + pub async fn load_directory_digest(&self, digest: Digest) -> Result { Ok(DirectoryDigest::new( digest, self @@ -501,11 +538,11 @@ impl Store { /// /// Loads a directory proto from the local store, back-filling from remote if necessary. /// - /// Guarantees that if an Ok Some value is returned, it is valid, and canonical, and its - /// fingerprint exactly matches that which is requested. Will return an Err if it would return a - /// non-canonical Directory. + /// Guarantees that if an Ok value is returned, it is valid, and canonical, and its fingerprint + /// exactly matches that which is requested. Will return an Err if it would return a non-canonical + /// Directory. /// - pub async fn load_directory(&self, digest: Digest) -> Result, String> { + pub async fn load_directory(&self, digest: Digest) -> Result { self .load_bytes_with( EntryType::Directory, @@ -549,7 +586,7 @@ impl Store { pub async fn ensure_directory_digest_persisted( &self, digest: DirectoryDigest, - ) -> Result<(), String> { + ) -> Result<(), StoreError> { let tree = self.load_digest_trie(digest).await?; let _ = self.record_digest_trie(tree, true).await?; Ok(()) @@ -570,49 +607,56 @@ impl Store { digest: Digest, f_local: FLocal, f_remote: FRemote, - ) -> Result, String> { + ) -> Result { let local = self.local.clone(); let maybe_remote = self.remote.clone(); - let maybe_local_value = self + + if let Some(bytes_res) = self .local .load_bytes_with(entry_type, digest, f_local) - .await?; + .await? + { + return Ok(bytes_res?); + } + + let remote = maybe_remote + .ok_or_else(|| { + StoreError::MissingDigest("Was not present in the local store".to_owned(), digest) + })? + .store; + + let bytes = retry_call( + remote, + |remote| async move { remote.load_bytes_with(digest, Ok).await }, + |err| match err { + ByteStoreError::Grpc(status) => status_is_retryable(status), + _ => false, + }, + ) + .await + .map_err(|err| match err { + ByteStoreError::Grpc(status) => status_to_str(status), + ByteStoreError::Other(msg) => msg, + })? + .ok_or_else(|| { + StoreError::MissingDigest( + "Was not present in either the local or remote store".to_owned(), + digest, + ) + })?; - match (maybe_local_value, maybe_remote) { - (Some(value_result), _) => value_result.map(Some), - (None, None) => Ok(None), - (None, Some(remote_store)) => { - let remote = remote_store.store.clone(); - let maybe_bytes = retry_call( - remote, - |remote| async move { remote.load_bytes_with(digest, Ok).await }, - |err| match err { - ByteStoreError::Grpc(status) => status_is_retryable(status), - _ => false, - }, + let value = f_remote(bytes.clone())?; + let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?; + if digest == stored_digest { + Ok(value) + } else { + Err( + format!( + "CAS gave wrong digest: expected {:?}, got {:?}", + digest, stored_digest ) - .await - .map_err(|err| match err { - ByteStoreError::Grpc(status) => status_to_str(status), - ByteStoreError::Other(msg) => msg, - })?; - - match maybe_bytes { - Some(bytes) => { - let value = f_remote(bytes.clone())?; - let stored_digest = local.store_bytes(entry_type, None, bytes, true).await?; - if digest == stored_digest { - Ok(Some(value)) - } else { - Err(format!( - "CAS gave wrong digest: expected {:?}, got {:?}", - digest, stored_digest - )) - } - } - None => Ok(None), - } - } + .into(), + ) } } @@ -628,14 +672,18 @@ impl Store { pub fn ensure_remote_has_recursive( &self, digests: Vec, - ) -> BoxFuture<'static, Result> { + ) -> BoxFuture<'static, Result> { let start_time = Instant::now(); let remote_store = if let Some(ref remote) = self.remote { remote.clone() } else { - return futures::future::err("Cannot ensure remote has blobs without a remote".to_owned()) - .boxed(); + return futures::future::err( + "Cannot ensure remote has blobs without a remote" + .to_owned() + .into(), + ) + .boxed(); }; let store = self.clone(); @@ -708,7 +756,7 @@ impl Store { remote: remote::ByteStore, entry_type: EntryType, digest: Digest, - ) -> Result<(), String> { + ) -> Result<(), StoreError> { // We need to copy the bytes into memory so that they may be used safely in an async // future. While this unfortunately increases memory consumption, we prioritize // being able to run `remote.store_bytes()` as async. @@ -721,11 +769,13 @@ impl Store { }) .await?; match maybe_bytes { - Some(bytes) => remote.store_bytes(bytes).await, - None => Err(format!( - "Failed to upload {entry_type:?} {digest:?}: Not found in local store.", - entry_type = entry_type, - digest = digest + Some(bytes) => Ok(remote.store_bytes(bytes).await?), + None => Err(StoreError::MissingDigest( + format!( + "Failed to upload {entry_type:?}: Not found in local store", + entry_type = entry_type, + ), + digest, )), } } @@ -735,7 +785,7 @@ impl Store { remote: remote::ByteStore, entry_type: EntryType, digest: Digest, - ) -> Result<(), String> { + ) -> Result<(), StoreError> { remote .store_buffered(digest, |mut buffer| async { let result = local @@ -751,12 +801,14 @@ impl Store { }) .await?; match result { - None => Err(format!( - "Failed to upload {entry_type:?} {digest:?}: Not found in local store.", - entry_type = entry_type, - digest = digest + None => Err(StoreError::MissingDigest( + format!( + "Failed to upload {entry_type:?}: Not found in local store", + entry_type = entry_type, + ), + digest, )), - Some(Err(err)) => Err(err), + Some(Err(err)) => Err(err.into()), Some(Ok(())) => Ok(()), } }) @@ -770,7 +822,7 @@ impl Store { pub async fn ensure_local_has_recursive_directory( &self, dir_digest: DirectoryDigest, - ) -> Result<(), String> { + ) -> Result<(), StoreError> { let mut file_digests = Vec::new(); self .load_digest_trie(dir_digest) @@ -792,24 +844,23 @@ impl Store { /// Ensure that a file is locally loadable, which will download it from the Remote store as /// a side effect (if one is configured). Called only with the Digest of a File. - pub async fn ensure_local_has_file(&self, file_digest: Digest) -> Result<(), String> { - let result = self + pub async fn ensure_local_has_file(&self, file_digest: Digest) -> Result<(), StoreError> { + if let Err(e) = self .load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(())) - .await?; - match result { - Some(_) => Ok(()), - None => { - log::debug!("Missing file digest from remote store: {:?}", file_digest); - in_workunit!( - "missing_file_counter", - Level::Trace, - |workunit| async move { - workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1); - }, - ) - .await; - Err("File did not exist in the remote store.".to_owned()) - } + .await + { + log::debug!("Missing file digest from remote store: {:?}", file_digest); + in_workunit!( + "missing_file_counter", + Level::Trace, + |workunit| async move { + workunit.increment_counter(Metric::RemoteStoreMissingDigest, 1); + }, + ) + .await; + Err(e) + } else { + Ok(()) } } @@ -867,14 +918,15 @@ impl Store { pub async fn lease_all_recursively<'a, Ds: Iterator>( &self, digests: Ds, - ) -> Result<(), String> { + ) -> Result<(), StoreError> { let reachable_digests_and_types = self .expand_digests(digests, LocalMissingBehavior::Ignore) .await?; self .local .lease_all(reachable_digests_and_types.into_iter()) - .await + .await?; + Ok(()) } pub fn garbage_collect( @@ -931,7 +983,7 @@ impl Store { &self, digests: Ds, missing_behavior: LocalMissingBehavior, - ) -> Result, String> { + ) -> Result, StoreError> { // Expand each digest into either a single file digest, or a collection of recursive digests // below a directory. let expanded_digests = future::try_join_all( @@ -939,7 +991,7 @@ impl Store { .map(|digest| { let store = self.clone(); async move { - match store.local.entry_type(digest.hash).await { + let res: Result<_, StoreError> = match store.local.entry_type(digest.hash).await { Ok(Some(EntryType::File)) => Ok(Either::Left(*digest)), Ok(Some(EntryType::Directory)) => { let store_for_expanding = match missing_behavior { @@ -953,12 +1005,13 @@ impl Store { } Ok(None) => match missing_behavior { LocalMissingBehavior::Ignore => Ok(Either::Right(HashMap::new())), - LocalMissingBehavior::Fetch | LocalMissingBehavior::Error => { - Err(format!("Failed to expand digest {:?}: Not found", digest)) - } + LocalMissingBehavior::Fetch | LocalMissingBehavior::Error => Err( + StoreError::MissingDigest("Failed to expand digest".to_owned(), *digest), + ), }, - Err(err) => Err(format!("Failed to expand digest {:?}: {:?}", digest, err)), - } + Err(err) => Err(format!("Failed to expand digest {:?}: {:?}", digest, err).into()), + }; + res } }) .collect::>(), @@ -982,7 +1035,7 @@ impl Store { pub fn expand_directory( &self, digest: Digest, - ) -> BoxFuture<'static, Result, String>> { + ) -> BoxFuture<'static, Result, StoreError>> { self .walk(digest, |_, _, digest, directory| { let mut digest_types = vec![(digest, EntryType::Directory)]; @@ -1015,7 +1068,7 @@ impl Store { destination: PathBuf, digest: DirectoryDigest, perms: Permissions, - ) -> Result<(), String> { + ) -> Result<(), StoreError> { // Load the DigestTrie for the digest, and convert it into a mapping between a fully qualified // parent path and its children. let mut parent_to_child = HashMap::new(); @@ -1042,7 +1095,7 @@ impl Store { is_root: bool, parent_to_child: &'a HashMap>, perms: Permissions, - ) -> BoxFuture<'a, Result<(), String>> { + ) -> BoxFuture<'a, Result<(), StoreError>> { let store = self.clone(); async move { let destination2 = destination.clone(); @@ -1113,8 +1166,8 @@ impl Store { destination: PathBuf, digest: Digest, mode: u32, - ) -> Result<(), String> { - let write_result = self + ) -> Result<(), StoreError> { + self .load_file_bytes_with(digest, move |bytes| { let mut f = OpenOptions::new() .create(true) @@ -1133,12 +1186,7 @@ impl Store { .map_err(|e| format!("Error writing file {}: {:?}", destination.display(), e))?; Ok(()) }) - .await?; - match write_result { - Some(Ok(())) => Ok(()), - Some(Err(e)) => Err(e), - None => Err(format!("File with digest {:?} not found", digest)), - } + .await? } /// @@ -1147,7 +1195,7 @@ impl Store { pub async fn contents_for_directory( &self, digest: DirectoryDigest, - ) -> Result, String> { + ) -> Result, StoreError> { let mut files = Vec::new(); self .load_digest_trie(digest) @@ -1160,16 +1208,15 @@ impl Store { future::try_join_all(files.into_iter().map(|(path, digest, is_executable)| { let store = self.clone(); async move { - let maybe_bytes = store + let content = store .load_file_bytes_with(digest, Bytes::copy_from_slice) - .await?; - maybe_bytes - .ok_or_else(|| format!("Couldn't find file contents for {:?}", path)) - .map(|content| FileContent { - path, - content, - is_executable, - }) + .await + .map_err(|e| e.enrich(&format!("Couldn't find file contents for {:?}", path)))?; + Ok(FileContent { + path, + content, + is_executable, + }) } })) .await @@ -1181,7 +1228,7 @@ impl Store { pub async fn entries_for_directory( &self, digest: DirectoryDigest, - ) -> Result, String> { + ) -> Result, StoreError> { if digest == *EMPTY_DIRECTORY_DIGEST { return Ok(vec![]); } @@ -1232,7 +1279,7 @@ impl Store { &self, digest: Digest, f: F, - ) -> BoxFuture<'static, Result, String>> { + ) -> BoxFuture<'static, Result, StoreError>> { let f = Arc::new(f); let accumulator = Arc::new(Mutex::new(Vec::new())); self @@ -1264,35 +1311,32 @@ impl Store { path_so_far: PathBuf, f: Arc, accumulator: Arc>>, - ) -> BoxFuture<'static, Result<(), String>> { + ) -> BoxFuture<'static, Result<(), StoreError>> { let store = self.clone(); let res = async move { - let maybe_directory = store.load_directory(digest).await?; - match maybe_directory { - Some(directory) => { - let result_for_directory = f(&store, &path_so_far, digest, &directory).await?; - { - let mut accumulator = accumulator.lock(); - accumulator.push(result_for_directory); - } - future::try_join_all( - directory - .directories - .iter() - .map(move |dir_node| { - let subdir_digest = try_future!(require_digest(dir_node.digest.as_ref())); - let path = path_so_far.join(dir_node.name.clone()); - store.walk_helper(subdir_digest, path, f.clone(), accumulator.clone()) - }) - .collect::>(), - ) - .await?; - Ok(()) - } - None => Err(format!( - "Could not walk unknown directory at {path_so_far:?}: {digest:?}" - )), + let directory = store.load_directory(digest).await.map_err(|e| { + e.enrich(&format!( + "Could not walk unknown directory at {path_so_far:?}" + )) + })?; + let result_for_directory = f(&store, &path_so_far, digest, &directory).await?; + { + let mut accumulator = accumulator.lock(); + accumulator.push(result_for_directory); } + future::try_join_all( + directory + .directories + .iter() + .map(move |dir_node| { + let subdir_digest = try_future!(require_digest(dir_node.digest.as_ref())); + let path = path_so_far.join(dir_node.name.clone()); + store.walk_helper(subdir_digest, path, f.clone(), accumulator.clone()) + }) + .collect::>(), + ) + .await?; + Ok(()) }; res.boxed() } @@ -1315,33 +1359,19 @@ pub enum LocalMissingBehavior { #[async_trait] impl SnapshotOps for Store { + type Error = StoreError; + async fn load_file_bytes_with T + Send + Sync + 'static>( &self, digest: Digest, f: F, - ) -> Result, String> { + ) -> Result { Store::load_file_bytes_with(self, digest, f).await } - async fn load_digest_trie(&self, digest: DirectoryDigest) -> Result { + async fn load_digest_trie(&self, digest: DirectoryDigest) -> Result { Store::load_digest_trie(self, digest).await } - - async fn load_directory(&self, digest: Digest) -> Result, String> { - Store::load_directory(self, digest).await - } - - async fn load_directory_or_err(&self, digest: Digest) -> Result { - Snapshot::get_directory_or_err(self.clone(), digest).await - } - - async fn record_digest_trie(&self, tree: DigestTrie) -> Result { - Store::record_digest_trie(self, tree, true).await - } - - async fn record_directory(&self, directory: &remexec::Directory) -> Result { - Store::record_directory(self, directory, true).await - } } // Only public for testing. diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index 277e576ea76..e0ffbffe73c 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -24,6 +24,8 @@ use remexec::{ use tonic::{Code, Request, Status}; use workunit_store::{in_workunit, Metric, ObservationMetric}; +use crate::StoreError; + #[derive(Clone)] pub struct ByteStore { instance_name: Option, @@ -121,10 +123,10 @@ impl ByteStore { &self, digest: Digest, mut write_to_buffer: WriteToBuffer, - ) -> Result<(), String> + ) -> Result<(), StoreError> where WriteToBuffer: FnMut(std::fs::File) -> WriteResult, - WriteResult: Future>, + WriteResult: Future>, { let write_buffer = tempfile::tempfile().map_err(|e| { format!( @@ -179,8 +181,8 @@ impl ByteStore { ) .await .map_err(|err| match err { - ByteStoreError::Grpc(status) => status_to_str(status), - ByteStoreError::Other(msg) => msg, + ByteStoreError::Grpc(status) => status_to_str(status).into(), + ByteStoreError::Other(msg) => msg.into(), }) } diff --git a/src/rust/engine/fs/store/src/snapshot.rs b/src/rust/engine/fs/store/src/snapshot.rs index 6c0208faaac..036ff76b701 100644 --- a/src/rust/engine/fs/store/src/snapshot.rs +++ b/src/rust/engine/fs/store/src/snapshot.rs @@ -17,9 +17,8 @@ use fs::{ PreparedPathGlobs, SymlinkBehavior, EMPTY_DIGEST_TREE, }; use hashing::{Digest, EMPTY_DIGEST}; -use protos::gen::build::bazel::remote::execution::v2 as remexec; -use crate::Store; +use crate::{Store, StoreError}; /// The listing of a DirectoryDigest. /// @@ -88,21 +87,13 @@ impl Snapshot { }) } - pub async fn from_digest(store: Store, digest: DirectoryDigest) -> Result { + pub async fn from_digest(store: Store, digest: DirectoryDigest) -> Result { Ok(Self { digest: digest.as_digest(), tree: store.load_digest_trie(digest).await?, }) } - pub async fn get_directory_or_err( - store: Store, - digest: Digest, - ) -> Result { - let maybe_dir = store.load_directory(digest).await?; - maybe_dir.ok_or_else(|| format!("{:?} was not known", digest)) - } - /// /// Capture a Snapshot of a presumed-immutable piece of the filesystem. /// @@ -126,7 +117,9 @@ impl Snapshot { // Attempt to use the digest hint to load a Snapshot without expanding the globs; otherwise, // expand the globs to capture a Snapshot. let snapshot_result = if let Some(digest) = digest_hint { - Snapshot::from_digest(store.clone(), digest).await + Snapshot::from_digest(store.clone(), digest) + .await + .map_err(|e| e.to_string()) } else { Err("No digest hint provided.".to_string()) }; diff --git a/src/rust/engine/fs/store/src/snapshot_ops.rs b/src/rust/engine/fs/store/src/snapshot_ops.rs index ace2b2a9f0f..a29ada825b5 100644 --- a/src/rust/engine/fs/store/src/snapshot_ops.rs +++ b/src/rust/engine/fs/store/src/snapshot_ops.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::convert::From; +use std::fmt::{Debug, Display}; use std::iter::Iterator; use async_trait::async_trait; @@ -15,7 +16,6 @@ use futures::future::{self, FutureExt}; use hashing::Digest; use itertools::Itertools; use log::log_enabled; -use protos::gen::build::bazel::remote::execution::v2 as remexec; /// /// Parameters used to determine which files and directories to operate on within a parent snapshot. @@ -35,7 +35,7 @@ pub struct SubsetParams { async fn merge_directories( store: T, dir_digests: Vec, -) -> Result { +) -> Result { let trees = future::try_join_all( dir_digests .into_iter() @@ -52,7 +52,7 @@ async fn merge_directories( let err_string = match render_merge_error(&store, merge_error).await { Ok(e) | Err(e) => e, }; - return Err(err_string); + return Err(err_string.into()); } }; @@ -98,8 +98,8 @@ async fn render_merge_error( } String::from_utf8_lossy(bytes.to_vec().as_slice()).to_string() }) - .await? - .unwrap_or_else(|| "".to_string()); + .await + .unwrap_or_else(|_| "".to_string()); let detail = format!("{}{}", header, contents); let res: Result<_, String> = Ok((file.name().to_owned(), detail)); res @@ -140,7 +140,7 @@ async fn render_merge_error( })) .collect(); - let res: Result, String> = Ok(enumerated_details); + let res: Result, T::Error> = Ok(enumerated_details); res } .await @@ -163,23 +163,20 @@ async fn render_merge_error( /// #[async_trait] pub trait SnapshotOps: Clone + Send + Sync + 'static { + type Error: Debug + Display + From; + async fn load_file_bytes_with T + Send + Sync + 'static>( &self, digest: Digest, f: F, - ) -> Result, String>; - - async fn load_digest_trie(&self, digest: DirectoryDigest) -> Result; - async fn load_directory(&self, digest: Digest) -> Result, String>; - async fn load_directory_or_err(&self, digest: Digest) -> Result; + ) -> Result; - async fn record_digest_trie(&self, tree: DigestTrie) -> Result; - async fn record_directory(&self, directory: &remexec::Directory) -> Result; + async fn load_digest_trie(&self, digest: DirectoryDigest) -> Result; /// /// Given N Snapshots, returns a new Snapshot that merges them. /// - async fn merge(&self, digests: Vec) -> Result { + async fn merge(&self, digests: Vec) -> Result { merge_directories(self.clone(), digests).await } @@ -187,7 +184,7 @@ pub trait SnapshotOps: Clone + Send + Sync + 'static { &self, digest: DirectoryDigest, prefix: &RelativePath, - ) -> Result { + ) -> Result { Ok( self .load_digest_trie(digest) @@ -201,7 +198,7 @@ pub trait SnapshotOps: Clone + Send + Sync + 'static { &self, digest: DirectoryDigest, prefix: &RelativePath, - ) -> Result { + ) -> Result { Ok( self .load_digest_trie(digest) @@ -215,7 +212,7 @@ pub trait SnapshotOps: Clone + Send + Sync + 'static { &self, directory_digest: DirectoryDigest, params: SubsetParams, - ) -> Result { + ) -> Result { let input_tree = self.load_digest_trie(directory_digest.clone()).await?; let path_stats = input_tree .expand_globs(params.globs, None) @@ -233,7 +230,7 @@ pub trait SnapshotOps: Clone + Send + Sync + 'static { Ok(DigestTrie::from_path_stats(path_stats, &files)?.into()) } - async fn create_empty_dir(&self, path: &RelativePath) -> Result { + async fn create_empty_dir(&self, path: &RelativePath) -> Result { self.add_prefix(EMPTY_DIRECTORY_DIGEST.clone(), path).await } } diff --git a/src/rust/engine/fs/store/src/snapshot_tests.rs b/src/rust/engine/fs/store/src/snapshot_tests.rs index dc4f584bf85..af62e0dd70b 100644 --- a/src/rust/engine/fs/store/src/snapshot_tests.rs +++ b/src/rust/engine/fs/store/src/snapshot_tests.rs @@ -8,7 +8,7 @@ use tempfile; use testutil::data::TestDirectory; use testutil::make_file; -use crate::{OneOffStoreFileByDigest, RelativePath, Snapshot, SnapshotOps, Store}; +use crate::{OneOffStoreFileByDigest, RelativePath, Snapshot, SnapshotOps, Store, StoreError}; use fs::{ Dir, DirectoryDigest, File, GitignoreStyleExcludes, GlobExpansionConjunction, GlobMatching, PathGlobs, PathStat, PosixFS, StrictGlobMatching, @@ -308,11 +308,7 @@ async fn snapshot_merge_two_files() { .ensure_directory_digest_persisted(merged.clone()) .await .unwrap(); - let merged_root_directory = store - .load_directory(merged.as_digest()) - .await - .unwrap() - .unwrap(); + let merged_root_directory = store.load_directory(merged.as_digest()).await.unwrap(); assert_eq!(merged_root_directory.files.len(), 0); assert_eq!(merged_root_directory.directories.len(), 1); @@ -325,7 +321,6 @@ async fn snapshot_merge_two_files() { let merged_child_directory = store .load_directory(merged_child_dirnode_digest.unwrap()) .await - .unwrap() .unwrap(); assert_eq!(merged_child_dirnode.name, common_dir_name); @@ -448,16 +443,7 @@ async fn strip_dir_not_in_store() { let digest = TestDirectory::nested().directory_digest(); let prefix = RelativePath::new(PathBuf::from("cats")).unwrap(); let result = store.strip_prefix(digest.clone(), &prefix).await; - assert_eq!( - result, - Err( - format!( - "Could not walk unknown directory at \"\": {:?}", - digest.as_digest() - ) - .into() - ) - ); + assert!(matches!(result, Err(StoreError::MissingDigest { .. })),); } #[tokio::test] diff --git a/src/rust/engine/fs/store/src/tests.rs b/src/rust/engine/fs/store/src/tests.rs index f239b88b937..0a950b3d30b 100644 --- a/src/rust/engine/fs/store/src/tests.rs +++ b/src/rust/engine/fs/store/src/tests.rs @@ -16,7 +16,7 @@ use mock::StubCAS; use protos::gen::build::bazel::remote::execution::v2 as remexec; use workunit_store::WorkunitStore; -use crate::{EntryType, FileContent, Store, UploadSummary, MEGABYTES}; +use crate::{EntryType, FileContent, Store, StoreError, UploadSummary, MEGABYTES}; pub(crate) const STORE_BATCH_API_SIZE_LIMIT: usize = 4 * 1024 * 1024; @@ -55,7 +55,7 @@ pub fn extra_big_file_bytes() -> Bytes { bytes.freeze() } -pub async fn load_file_bytes(store: &Store, digest: Digest) -> Result, String> { +pub async fn load_file_bytes(store: &Store, digest: Digest) -> Result { store .load_file_bytes_with(digest, |bytes| Bytes::copy_from_slice(bytes)) .await @@ -120,7 +120,7 @@ async fn load_file_prefers_local() { let cas = new_cas(1024); assert_eq!( load_file_bytes(&new_store(dir.path(), &cas.address()), testdata.digest()).await, - Ok(Some(testdata.bytes())) + Ok(testdata.bytes()) ); assert_eq!(0, cas.read_request_count()); } @@ -141,7 +141,6 @@ async fn load_directory_prefers_local() { new_store(dir.path(), &cas.address()) .load_directory(testdir.digest(),) .await - .unwrap() .unwrap(), testdir.directory() ); @@ -157,7 +156,7 @@ async fn load_file_falls_back_and_backfills() { let cas = new_cas(1024); assert_eq!( load_file_bytes(&new_store(dir.path(), &cas.address()), testdata.digest()).await, - Ok(Some(testdata.bytes())), + Ok(testdata.bytes()), "Read from CAS" ); assert_eq!(1, cas.read_request_count()); @@ -184,7 +183,6 @@ async fn load_directory_falls_back_and_backfills() { new_store(dir.path(), &cas.address()) .load_directory(testdir.digest(),) .await - .unwrap() .unwrap(), testdir.directory() ); @@ -227,17 +225,16 @@ async fn load_recursive_directory() { assert_eq!( load_file_bytes(&new_local_store(dir.path()), roland.digest()).await, - Ok(Some(roland.bytes())) + Ok(roland.bytes()) ); assert_eq!( load_file_bytes(&new_local_store(dir.path()), catnip.digest()).await, - Ok(Some(catnip.bytes())) + Ok(catnip.bytes()) ); assert_eq!( new_local_store(dir.path()) .load_directory(testdir_digest,) .await - .unwrap() .unwrap(), testdir_directory ); @@ -245,7 +242,6 @@ async fn load_recursive_directory() { new_local_store(dir.path()) .load_directory(recursive_testdir_digest.as_digest()) .await - .unwrap() .unwrap(), recursive_testdir_directory ); @@ -256,28 +252,24 @@ async fn load_file_missing_is_none() { let dir = TempDir::new().unwrap(); let cas = new_empty_cas(); - assert_eq!( - load_file_bytes( - &new_store(dir.path(), &cas.address()), - TestData::roland().digest() - ) - .await, - Ok(None) - ); + let result = load_file_bytes( + &new_store(dir.path(), &cas.address()), + TestData::roland().digest(), + ) + .await; + assert!(matches!(result, Err(StoreError::MissingDigest { .. })),); assert_eq!(1, cas.read_request_count()); } #[tokio::test] -async fn load_directory_missing_is_none() { +async fn load_directory_missing_errors() { let dir = TempDir::new().unwrap(); let cas = new_empty_cas(); - assert_eq!( - new_store(dir.path(), &cas.address()) - .load_directory(TestDirectory::containing_roland().digest(),) - .await, - Ok(None) - ); + let result = new_store(dir.path(), &cas.address()) + .load_directory(TestDirectory::containing_roland().digest()) + .await; + assert!(matches!(result, Err(StoreError::MissingDigest { .. })),); assert_eq!(1, cas.read_request_count()); } @@ -299,7 +291,9 @@ async fn load_file_remote_error_is_error() { cas.read_request_count() ); assert!( - error.contains("StubCAS is configured to always fail"), + error + .to_string() + .contains("StubCAS is configured to always fail"), "Bad error message" ); } @@ -320,7 +314,9 @@ async fn load_directory_remote_error_is_error() { cas.read_request_count() ); assert!( - error.contains("StubCAS is configured to always fail"), + error + .to_string() + .contains("StubCAS is configured to always fail"), "Bad error message" ); } @@ -523,8 +519,8 @@ async fn expand_missing_directory() { .await .expect_err("Want error"); assert!( - error.contains(&format!("{:?}", digest)), - "Bad error message: {}", + matches!(error, StoreError::MissingDigest { .. }), + "Bad error: {}", error ); } @@ -545,10 +541,7 @@ async fn expand_directory_missing_subdir() { .await .expect_err("Want error"); assert!( - error.contains(&format!( - "{}", - TestDirectory::containing_roland().fingerprint() - )), + matches!(error, StoreError::MissingDigest { .. }), "Bad error message: {}", error ); @@ -752,9 +745,10 @@ async fn upload_missing_files() { .ensure_remote_has_recursive(vec![testdata.digest()]) .await .expect_err("Want error"); - assert_eq!( - error, - format!("Failed to expand digest {:?}: Not found", testdata.digest()) + assert!( + matches!(error, StoreError::MissingDigest { .. }), + "Bad error: {}", + error ); } @@ -777,13 +771,10 @@ async fn upload_missing_file_in_directory() { .ensure_remote_has_recursive(vec![testdir.digest()]) .await .expect_err("Want error"); - assert_eq!( - error, - format!( - "Failed to upload File {:?}: Not found in local store.", - TestData::roland().digest() - ), - "Bad error message" + assert!( + matches!(error, StoreError::MissingDigest { .. }), + "Bad error: {}", + error ); } @@ -886,7 +877,6 @@ async fn instance_name_download() { store_with_remote .load_file_bytes_with(TestData::roland().digest(), |b| Bytes::copy_from_slice(b)) .await - .unwrap() .unwrap(), TestData::roland().bytes() ) @@ -971,7 +961,6 @@ async fn auth_download() { store_with_remote .load_file_bytes_with(TestData::roland().digest(), |b| Bytes::copy_from_slice(b)) .await - .unwrap() .unwrap(), TestData::roland().bytes() ) diff --git a/src/rust/engine/process_execution/src/bounded.rs b/src/rust/engine/process_execution/src/bounded.rs index 29673c32129..ef408595542 100644 --- a/src/rust/engine/process_execution/src/bounded.rs +++ b/src/rust/engine/process_execution/src/bounded.rs @@ -15,7 +15,7 @@ use tokio::sync::{Notify, Semaphore, SemaphorePermit}; use tokio::time::sleep; use workunit_store::{in_workunit, RunningWorkunit}; -use crate::{Context, FallibleProcessResultWithPlatform, Process}; +use crate::{Context, FallibleProcessResultWithPlatform, Process, ProcessError}; lazy_static! { // TODO: Runtime formatting is unstable in Rust, so we imitate it. @@ -60,7 +60,7 @@ impl crate::CommandRunner for CommandRunner { context: Context, workunit: &mut RunningWorkunit, process: Process, - ) -> Result { + ) -> Result { let semaphore_acquisition = self.sema.acquire(process.concurrency_available); let permit = in_workunit!( "acquire_command_runner_slot", @@ -113,11 +113,14 @@ impl crate::CommandRunner for CommandRunner { ) .collect(); if !matched { - return Err(format!( - "Process {} set `concurrency_available={}`, but did not include \ + return Err( + format!( + "Process {} set `concurrency_available={}`, but did not include \ the `{}` template variable in its arguments.", - process.description, process.concurrency_available, *CONCURRENCY_TEMPLATE_RE - )); + process.description, process.concurrency_available, *CONCURRENCY_TEMPLATE_RE + ) + .into(), + ); } } diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index 3511b57cc13..8468e3fa07a 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -10,13 +10,13 @@ use prost::Message; use protos::gen::build::bazel::remote::execution::v2 as remexec; use protos::gen::pants::cache::{CacheKey, CacheKeyType}; use serde::{Deserialize, Serialize}; -use store::Store; +use store::{Store, StoreError}; use workunit_store::{ in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata, }; use crate::{ - Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, + Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, ProcessMetadata, ProcessResultSource, }; @@ -58,7 +58,7 @@ impl crate::CommandRunner for CommandRunner { context: Context, workunit: &mut RunningWorkunit, req: Process, - ) -> Result { + ) -> Result { let cache_lookup_start = Instant::now(); let write_failures_to_cache = req.cache_scope == ProcessCacheScope::Always; let key = CacheKey { @@ -148,7 +148,7 @@ impl CommandRunner { &self, context: &Context, action_key: &CacheKey, - ) -> Result, String> { + ) -> Result, StoreError> { use remexec::ExecuteResponse; // See whether there is a cache entry. @@ -177,7 +177,11 @@ impl CommandRunner { ) .await? } else { - return Err("action result missing from ExecuteResponse".into()); + return Err( + "action result missing from ExecuteResponse" + .to_owned() + .into(), + ); } } else { return Ok(None); @@ -207,7 +211,7 @@ impl CommandRunner { &self, action_key: &CacheKey, result: &FallibleProcessResultWithPlatform, - ) -> Result<(), String> { + ) -> Result<(), StoreError> { let stdout_digest = result.stdout_digest; let stderr_digest = result.stderr_digest; @@ -255,6 +259,7 @@ impl CommandRunner { ) })?; - self.cache.store(action_key, bytes_to_store).await + self.cache.store(action_key, bytes_to_store).await?; + Ok(()) } } diff --git a/src/rust/engine/process_execution/src/cache_tests.rs b/src/rust/engine/process_execution/src/cache_tests.rs index eaa3b52aa60..eeb185b1ed4 100644 --- a/src/rust/engine/process_execution/src/cache_tests.rs +++ b/src/rust/engine/process_execution/src/cache_tests.rs @@ -12,12 +12,12 @@ use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, ImmutableInputs, - NamedCaches, Process, ProcessMetadata, + NamedCaches, Process, ProcessError, ProcessMetadata, }; struct RoundtripResults { - uncached: Result, - maybe_cached: Result, + uncached: Result, + maybe_cached: Result, } fn create_local_runner() -> (Box, Store, TempDir) { @@ -158,7 +158,6 @@ async fn recover_from_missing_store_contents() { let output_dir = store .load_directory(output_dir_digest.as_digest()) .await - .unwrap() .unwrap(); let output_child_digest = output_dir .files diff --git a/src/rust/engine/process_execution/src/immutable_inputs.rs b/src/rust/engine/process_execution/src/immutable_inputs.rs index d32673b0d9f..e9fa81d44bb 100644 --- a/src/rust/engine/process_execution/src/immutable_inputs.rs +++ b/src/rust/engine/process_execution/src/immutable_inputs.rs @@ -6,7 +6,7 @@ use async_oncecell::OnceCell; use fs::{DirectoryDigest, Permissions, RelativePath}; use hashing::Digest; use parking_lot::Mutex; -use store::Store; +use store::{Store, StoreError}; use tempfile::TempDir; use crate::WorkdirSymlink; @@ -40,7 +40,7 @@ impl ImmutableInputs { } /// Returns an absolute Path to immutably consume the given Digest from. - async fn path(&self, directory_digest: DirectoryDigest) -> Result { + async fn path(&self, directory_digest: DirectoryDigest) -> Result { let digest = directory_digest.as_digest(); let cell = self.contents.lock().entry(digest).or_default().clone(); @@ -105,7 +105,7 @@ impl ImmutableInputs { pub(crate) async fn local_paths( &self, immutable_inputs: &BTreeMap, - ) -> Result, String> { + ) -> Result, StoreError> { let dsts = futures::future::try_join_all( immutable_inputs .values() diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 05b506ca72e..c0294e43f7d 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -29,6 +29,7 @@ extern crate derivative; use std::collections::{BTreeMap, BTreeSet}; use std::convert::{TryFrom, TryInto}; +use std::fmt::{self, Display}; use std::path::PathBuf; use async_trait::async_trait; @@ -42,7 +43,7 @@ use itertools::Itertools; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ExecutedActionMetadata; use serde::{Deserialize, Serialize}; -use store::{SnapshotOps, Store}; +use store::{SnapshotOps, Store, StoreError}; use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; pub mod bounded; @@ -80,6 +81,49 @@ pub use crate::immutable_inputs::ImmutableInputs; pub use crate::named_caches::{CacheName, NamedCaches}; pub use crate::remote_cache::RemoteCacheWarningsBehavior; +#[derive(Clone, Debug, PartialEq)] +pub enum ProcessError { + /// A Digest was not present in either of the local or remote Stores. + MissingDigest(String, Digest), + /// All other error types. + Unclassified(String), +} + +impl ProcessError { + pub fn enrich(self, prefix: &str) -> Self { + match self { + Self::MissingDigest(s, d) => Self::MissingDigest(format!("{prefix}: {s}"), d), + Self::Unclassified(s) => Self::Unclassified(format!("{prefix}: {s}")), + } + } +} + +impl Display for ProcessError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::MissingDigest(s, d) => { + write!(f, "{s}: {d:?}") + } + Self::Unclassified(s) => write!(f, "{s}"), + } + } +} + +impl From for ProcessError { + fn from(err: StoreError) -> Self { + match err { + StoreError::MissingDigest(s, d) => Self::MissingDigest(s, d), + StoreError::Unclassified(s) => Self::Unclassified(s), + } + } +} + +impl From for ProcessError { + fn from(err: String) -> Self { + Self::Unclassified(err) + } +} + #[derive( PartialOrd, Ord, Clone, Copy, Debug, DeepSizeOf, Eq, PartialEq, Hash, Serialize, Deserialize, )] @@ -254,7 +298,7 @@ impl InputDigests { input_files: DirectoryDigest, immutable_inputs: BTreeMap, use_nailgun: Vec, - ) -> Result { + ) -> Result { // Collect all digests into `complete`. let mut complete_digests = try_join_all( immutable_inputs @@ -288,17 +332,20 @@ impl InputDigests { }) } - pub async fn new_from_merged(store: &Store, from: Vec) -> Result { + pub async fn new_from_merged(store: &Store, from: Vec) -> Result { let mut merged_immutable_inputs = BTreeMap::new(); for input_digests in from.iter() { let size_before = merged_immutable_inputs.len(); let immutable_inputs = &input_digests.immutable_inputs; merged_immutable_inputs.append(&mut immutable_inputs.clone()); if size_before + immutable_inputs.len() != merged_immutable_inputs.len() { - return Err(format!( - "Tried to merge two-or-more immutable inputs at the same path with different values! \ + return Err( + format!( + "Tried to merge two-or-more immutable inputs at the same path with different values! \ The collision involved one of the entries in: {immutable_inputs:?}" - )); + ) + .into(), + ); } } @@ -747,7 +794,7 @@ pub trait CommandRunner: Send + Sync { context: Context, workunit: &mut RunningWorkunit, req: Process, - ) -> Result; + ) -> Result; } // TODO(#8513) possibly move to the MEPR struct, or to the hashing crate? diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index 3eada86c54b..5fcf45a2c0c 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -24,7 +24,7 @@ use futures::stream::{BoxStream, StreamExt, TryStreamExt}; use log::{debug, info}; use nails::execution::ExitCode; use shell_quote::bash; -use store::{OneOffStoreFileByDigest, Snapshot, Store}; +use store::{OneOffStoreFileByDigest, Snapshot, Store, StoreError}; use tokio::process::{Child, Command}; use tokio::sync::RwLock; use tokio::time::{timeout, Duration}; @@ -34,7 +34,7 @@ use workunit_store::{in_workunit, Level, Metric, RunningWorkunit}; use crate::{ Context, FallibleProcessResultWithPlatform, ImmutableInputs, NamedCaches, Platform, Process, - ProcessResultMetadata, ProcessResultSource, + ProcessError, ProcessResultMetadata, ProcessResultSource, }; pub const USER_EXECUTABLE_MODE: u32 = 0o100755; @@ -252,7 +252,7 @@ impl super::CommandRunner for CommandRunner { context: Context, _workunit: &mut RunningWorkunit, req: Process, - ) -> Result { + ) -> Result { let req_debug_repr = format!("{:#?}", req); in_workunit!( "run_local_process", @@ -326,7 +326,7 @@ impl super::CommandRunner for CommandRunner { // // Given that this is expected to be rare, we dump the entire process definition in the // error. - format!("Failed to execute: {}\n\n{}", req_debug_repr, msg) + ProcessError::Unclassified(format!("Failed to execute: {}\n\n{}", req_debug_repr, msg)) }) .await; @@ -475,7 +475,7 @@ pub trait CapturedWorkdir { workdir_token: Self::WorkdirToken, exclusive_spawn: bool, platform: Platform, - ) -> Result { + ) -> Result { let start_time = Instant::now(); // Spawn the process. @@ -570,7 +570,7 @@ pub trait CapturedWorkdir { metadata: result_metadata, }) } - Err(msg) => Err(msg), + Err(msg) => Err(msg.into()), } } @@ -641,7 +641,7 @@ pub async fn prepare_workdir( executor: task_executor::Executor, named_caches: &NamedCaches, immutable_inputs: &ImmutableInputs, -) -> Result { +) -> Result { // Collect the symlinks to create for immutable inputs or named caches. let workdir_symlinks = immutable_inputs .local_paths(&req.input_digests.immutable_inputs) @@ -677,7 +677,7 @@ pub async fn prepare_workdir( Permissions::Writable, ) .await - },) + }) .await?; let workdir_path2 = workdir_path.clone(); diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index a3eca4efeca..845542cdae9 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -18,7 +18,7 @@ use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::{ local, CacheName, CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, ImmutableInputs, InputDigests, NamedCaches, Platform, Process, - RelativePath, + ProcessError, RelativePath, }; #[derive(PartialEq, Debug)] @@ -127,8 +127,8 @@ async fn binary_not_found() { let err_string = run_command_locally(Process::new(owned_string_vec(&["echo", "-n", "foo"]))) .await .expect_err("Want Err"); - assert!(err_string.contains("Failed to execute")); - assert!(err_string.contains("echo")); + assert!(err_string.to_string().contains("Failed to execute")); + assert!(err_string.to_string().contains("echo")); } #[tokio::test] @@ -766,7 +766,7 @@ fn named_caches_and_immutable_inputs(store: Store) -> (TempDir, NamedCaches, Imm ) } -async fn run_command_locally(req: Process) -> Result { +async fn run_command_locally(req: Process) -> Result { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let work_dir = TempDir::new().unwrap(); let work_dir_path = work_dir.path().to_owned(); @@ -780,7 +780,7 @@ async fn run_command_locally_in_dir( workunit: &mut RunningWorkunit, store: Option, executor: Option, -) -> Result { +) -> Result { let store_dir = TempDir::new().unwrap(); let executor = executor.unwrap_or_else(|| task_executor::Executor::new()); let store = @@ -798,12 +798,10 @@ async fn run_command_locally_in_dir( let original = runner.run(Context::default(), workunit, req.into()).await?; let stdout_bytes = store .load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec()) - .await? - .unwrap(); + .await?; let stderr_bytes = store .load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec()) - .await? - .unwrap(); + .await?; Ok(LocalTestResult { original, stdout_bytes, diff --git a/src/rust/engine/process_execution/src/nailgun/mod.rs b/src/rust/engine/process_execution/src/nailgun/mod.rs index 691d4afda0e..c938a429659 100644 --- a/src/rust/engine/process_execution/src/nailgun/mod.rs +++ b/src/rust/engine/process_execution/src/nailgun/mod.rs @@ -13,7 +13,9 @@ use tokio::net::TcpStream; use workunit_store::{in_workunit, Metric, RunningWorkunit}; use crate::local::{prepare_workdir, CapturedWorkdir, ChildOutput}; -use crate::{Context, FallibleProcessResultWithPlatform, InputDigests, Platform, Process}; +use crate::{ + Context, FallibleProcessResultWithPlatform, InputDigests, Platform, Process, ProcessError, +}; #[cfg(test)] pub mod tests; @@ -114,7 +116,7 @@ impl super::CommandRunner for CommandRunner { context: Context, workunit: &mut RunningWorkunit, req: Process, - ) -> Result { + ) -> Result { if req.input_digests.use_nailgun.is_empty() { trace!("The request is not nailgunnable! Short-circuiting to regular process execution"); return self.inner.run(context, workunit, req).await; diff --git a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs index 6c34c8f30d2..958fafd7dcb 100644 --- a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs +++ b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs @@ -22,7 +22,7 @@ use task_executor::Executor; use tempfile::TempDir; use crate::local::prepare_workdir; -use crate::{ImmutableInputs, NamedCaches, Process, ProcessMetadata}; +use crate::{ImmutableInputs, NamedCaches, Process, ProcessError, ProcessMetadata}; lazy_static! { static ref NAILGUN_PORT_REGEX: Regex = Regex::new(r".*\s+port\s+(\d+)\.$").unwrap(); @@ -88,7 +88,7 @@ impl NailgunPool { server_process: Process, named_caches: &NamedCaches, immutable_inputs: &ImmutableInputs, - ) -> Result { + ) -> Result { let name = server_process.description.clone(); let requested_fingerprint = NailgunProcessFingerprint::new(name.clone(), &server_process)?; let mut process_ref = { @@ -340,7 +340,7 @@ impl NailgunProcess { named_caches: &NamedCaches, immutable_inputs: &ImmutableInputs, nailgun_server_fingerprint: NailgunProcessFingerprint, - ) -> Result { + ) -> Result { let workdir = tempfile::Builder::new() .prefix("pants-sandbox-") .tempdir_in(workdir_base) diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 9f828f7b501..0b25b4e3a53 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -32,7 +32,7 @@ use remexec::{ execution_client::ExecutionClient, Action, Command, ExecuteRequest, ExecuteResponse, ExecutedActionMetadata, ServerCapabilities, WaitExecutionRequest, }; -use store::{Snapshot, SnapshotOps, Store, StoreFileByDigest}; +use store::{Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest}; use tonic::metadata::BinaryMetadataValue; use tonic::{Code, Request, Status}; use tryfuture::try_future; @@ -43,7 +43,7 @@ use workunit_store::{ }; use crate::{ - Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, + Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, ProcessMetadata, ProcessResultMetadata, ProcessResultSource, }; @@ -69,10 +69,11 @@ pub enum OperationOrStatus { #[derive(Debug, PartialEq)] pub enum ExecutionError { - // String is the error message. - Fatal(String), - // Digests are Files and Directories which have been reported to be missing. May be incomplete. - MissingDigests(Vec), + Fatal(ProcessError), + // Digests are Files and Directories which have been reported to be missing remotely (unlike + // `{Process,Store}Error::MissingDigest`, which indicates that a digest doesn't exist anywhere + // in the configured Stores). May be incomplete. + MissingRemoteDigests(Vec), // The server indicated that the request hit a timeout. Generally this is the timeout that the // client has pushed down on the ExecutionRequest. Timeout, @@ -392,31 +393,37 @@ impl CommandRunner { for violation in &precondition_failure.violations { if violation.r#type != "MISSING" { - return ExecutionError::Fatal(format!( - "Unknown PreconditionFailure violation: {:?}", - violation - )); + return ExecutionError::Fatal( + format!("Unknown PreconditionFailure violation: {:?}", violation).into(), + ); } let parts: Vec<_> = violation.subject.split('/').collect(); if parts.len() != 3 || parts[0] != "blobs" { - return ExecutionError::Fatal(format!( - "Received FailedPrecondition MISSING but didn't recognize subject {}", - violation.subject - )); + return ExecutionError::Fatal( + format!( + "Received FailedPrecondition MISSING but didn't recognize subject {}", + violation.subject + ) + .into(), + ); } let fingerprint = match Fingerprint::from_hex_string(parts[1]) { Ok(f) => f, Err(e) => { - return ExecutionError::Fatal(format!("Bad digest in missing blob: {}: {}", parts[1], e)) + return ExecutionError::Fatal( + format!("Bad digest in missing blob: {}: {}", parts[1], e).into(), + ) } }; let size = match parts[2].parse::() { Ok(s) => s, Err(e) => { - return ExecutionError::Fatal(format!("Missing blob had bad size: {}: {}", parts[2], e)) + return ExecutionError::Fatal( + format!("Missing blob had bad size: {}: {}", parts[2], e).into(), + ) } }; @@ -425,11 +432,13 @@ impl CommandRunner { if missing_digests.is_empty() { return ExecutionError::Fatal( - "Error from remote execution: FailedPrecondition, but no details".to_owned(), + "Error from remote execution: FailedPrecondition, but no details" + .to_owned() + .into(), ); } - ExecutionError::MissingDigests(missing_digests) + ExecutionError::MissingRemoteDigests(missing_digests) } // pub(crate) for testing @@ -447,16 +456,19 @@ impl CommandRunner { use protos::gen::google::longrunning::operation::Result as OperationResult; let execute_response = match operation.result { Some(OperationResult::Response(response_any)) => { - remexec::ExecuteResponse::decode(&response_any.value[..]) - .map_err(|e| ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e)))? + remexec::ExecuteResponse::decode(&response_any.value[..]).map_err(|e| { + ExecutionError::Fatal(format!("Invalid ExecuteResponse: {:?}", e).into()) + })? } Some(OperationResult::Error(rpc_status)) => { warn!("protocol violation: REv2 prohibits setting Operation::error"); - return Err(ExecutionError::Fatal(format_error(&rpc_status))); + return Err(ExecutionError::Fatal(format_error(&rpc_status).into())); } None => { return Err(ExecutionError::Fatal( - "Operation finished but no response supplied".to_string(), + "Operation finished but no response supplied" + .to_owned() + .into(), )); } }; @@ -478,7 +490,9 @@ impl CommandRunner { } else { warn!("REv2 protocol violation: action result not set"); return Err(ExecutionError::Fatal( - "REv2 protocol violation: action result not set".into(), + "REv2 protocol violation: action result not set" + .to_owned() + .into(), )); }; @@ -495,7 +509,7 @@ impl CommandRunner { }, ) .await - .map_err(ExecutionError::Fatal); + .map_err(|e| ExecutionError::Fatal(e.into())); } rpc_status @@ -510,11 +524,13 @@ impl CommandRunner { Code::FailedPrecondition => { let details = if status.details.is_empty() { - return Err(ExecutionError::Fatal(status.message)); + return Err(ExecutionError::Fatal(status.message.into())); } else if status.details.len() > 1 { // TODO(tonic): Should we be able to handle multiple details protos? return Err(ExecutionError::Fatal( - "too many detail protos for precondition failure".into(), + "too many detail protos for precondition failure" + .to_owned() + .into(), )); } else { &status.details[0] @@ -522,19 +538,21 @@ impl CommandRunner { let full_name = format!("type.googleapis.com/{}", "google.rpc.PreconditionFailure"); if details.type_url != full_name { - return Err(ExecutionError::Fatal(format!( + return Err(ExecutionError::Fatal( + format!( "Received PreconditionFailure, but didn't know how to resolve it: {}, protobuf type {}", status.message, details.type_url - ))); + ) + .into(), + )); } // Decode the precondition failure. let precondition_failure = PreconditionFailure::decode(Cursor::new(&details.value)) .map_err(|e| { - ExecutionError::Fatal(format!( - "Error deserializing PreconditionFailure proto: {:?}", - e - )) + ExecutionError::Fatal( + format!("Error deserializing PreconditionFailure proto: {:?}", e).into(), + ) })?; Err(self.extract_missing_digests(&precondition_failure)) @@ -545,10 +563,13 @@ impl CommandRunner { | Code::ResourceExhausted | Code::Unavailable | Code::Unknown => Err(ExecutionError::Retryable(status.message)), - code => Err(ExecutionError::Fatal(format!( - "Error from remote execution: {:?}: {:?}", - code, status.message, - ))), + code => Err(ExecutionError::Fatal( + format!( + "Error from remote execution: {:?}: {:?}", + code, status.message, + ) + .into(), + )), } } @@ -566,7 +587,7 @@ impl CommandRunner { process: Process, context: &Context, workunit: &mut RunningWorkunit, - ) -> Result { + ) -> Result { const MAX_RETRIES: u32 = 5; const MAX_BACKOFF_DURATION: Duration = Duration::from_secs(10); @@ -646,7 +667,7 @@ impl CommandRunner { if num_retries >= MAX_RETRIES { workunit.increment_counter(Metric::RemoteExecutionRPCErrors, 1); return Err( - "Too many failures from server. The last event was the server disconnecting with no error given.".to_owned(), + "Too many failures from server. The last event was the server disconnecting with no error given.".to_owned().into(), ); } else { // Increment the retry counter and allow loop to retry. @@ -686,16 +707,15 @@ impl CommandRunner { trace!("retryable error: {}", e); if num_retries >= MAX_RETRIES { workunit.increment_counter(Metric::RemoteExecutionRPCErrors, 1); - return Err(format!( - "Too many failures from server. The last error was: {}", - e - )); + return Err( + format!("Too many failures from server. The last error was: {}", e).into(), + ); } else { // Increment the retry counter and allow loop to retry. num_retries += 1; } } - ExecutionError::MissingDigests(missing_digests) => { + ExecutionError::MissingRemoteDigests(missing_digests) => { trace!( "Server reported missing digests; trying to upload: {:?}", missing_digests, @@ -708,7 +728,7 @@ impl CommandRunner { } ExecutionError::Timeout => { workunit.increment_counter(Metric::RemoteExecutionTimeouts, 1); - return populate_fallible_execution_result_for_timeout( + let result = populate_fallible_execution_result_for_timeout( &self.store, context, &process.description, @@ -716,7 +736,8 @@ impl CommandRunner { start_time.elapsed(), self.platform, ) - .await; + .await?; + return Ok(result); } }, } @@ -732,7 +753,7 @@ impl crate::CommandRunner for CommandRunner { context: Context, _workunit: &mut RunningWorkunit, request: Process, - ) -> Result { + ) -> Result { // Retrieve capabilities for this server. let capabilities = self.get_capabilities().await?; trace!("RE capabilities: {:?}", &capabilities); @@ -832,10 +853,7 @@ impl crate::CommandRunner for CommandRunner { &build_id, deadline_duration ); workunit.increment_counter(Metric::RemoteExecutionTimeouts, 1); - Err(format!( - "remote execution timed out after {:?}", - deadline_duration - )) + Err(format!("remote execution timed out after {:?}", deadline_duration).into()) } } }, @@ -1072,15 +1090,15 @@ pub async fn populate_fallible_execution_result_for_timeout( /// of the ActionResult/ExecuteResponse stored in the local cache. When /// `treat_tree_digest_as_final_directory_hack` is true, then that final merged directory /// will be extracted from the tree_digest of the single output directory. -pub fn populate_fallible_execution_result( +pub async fn populate_fallible_execution_result( store: Store, run_id: RunId, action_result: &remexec::ActionResult, platform: Platform, treat_tree_digest_as_final_directory_hack: bool, source: ProcessResultSource, -) -> BoxFuture> { - future::try_join3( +) -> Result { + let (stdout_digest, stderr_digest, output_directory) = future::try_join3( extract_stdout(&store, action_result), extract_stderr(&store, action_result), extract_output_files( @@ -1089,28 +1107,25 @@ pub fn populate_fallible_execution_result( treat_tree_digest_as_final_directory_hack, ), ) - .and_then( - move |(stdout_digest, stderr_digest, output_directory)| async move { - Ok(FallibleProcessResultWithPlatform { - stdout_digest, - stderr_digest, - exit_code: action_result.exit_code, - output_directory, - platform, - metadata: action_result.execution_metadata.clone().map_or( - ProcessResultMetadata::new(None, source, run_id), - |metadata| ProcessResultMetadata::new_from_metadata(metadata, source, run_id), - ), - }) - }, - ) - .boxed() + .await?; + + Ok(FallibleProcessResultWithPlatform { + stdout_digest, + stderr_digest, + exit_code: action_result.exit_code, + output_directory, + platform, + metadata: action_result.execution_metadata.clone().map_or( + ProcessResultMetadata::new(None, source, run_id), + |metadata| ProcessResultMetadata::new_from_metadata(metadata, source, run_id), + ), + }) } fn extract_stdout<'a>( store: &Store, action_result: &'a remexec::ActionResult, -) -> BoxFuture<'a, Result> { +) -> BoxFuture<'a, Result> { let store = store.clone(); async move { if let Some(digest_proto) = &action_result.stdout_digest { @@ -1133,7 +1148,7 @@ fn extract_stdout<'a>( fn extract_stderr<'a>( store: &Store, action_result: &'a remexec::ActionResult, -) -> BoxFuture<'a, Result> { +) -> BoxFuture<'a, Result> { let store = store.clone(); async move { if let Some(digest_proto) = &action_result.stderr_digest { @@ -1157,7 +1172,7 @@ pub fn extract_output_files( store: Store, action_result: &remexec::ActionResult, treat_tree_digest_as_final_directory_hack: bool, -) -> BoxFuture<'static, Result> { +) -> BoxFuture<'static, Result> { // HACK: The caching CommandRunner stores the digest of the Directory that merges all output // files and output directories in the `tree_digest` field of the `output_directories` field // of the ActionResult/ExecuteResponse stored in the local cache. When @@ -1169,18 +1184,22 @@ pub fn extract_output_files( &[ref directory] => { match require_digest(directory.tree_digest.as_ref()) { Ok(digest) => { - return future::ready::>(Ok(DirectoryDigest::from_persisted_digest( - digest, - ))) + return future::ready::>(Ok( + DirectoryDigest::from_persisted_digest(digest), + )) .boxed() } - Err(err) => return futures::future::err(err).boxed(), + Err(err) => return futures::future::err(err.into()).boxed(), }; } _ => { return futures::future::err( - "illegal state: treat_tree_digest_as_final_directory_hack expected single output directory".to_owned() - ).boxed(); + "illegal state: treat_tree_digest_as_final_directory_hack \ + expected single output directory" + .to_owned() + .into(), + ) + .boxed(); } } } @@ -1213,7 +1232,7 @@ pub fn extract_output_files( }) .map_err(|err| { format!( - "Error saving remote output directory to local cache: {:?}", + "Error saving remote output directory to local cache: {}", err ) }), @@ -1285,7 +1304,7 @@ pub fn extract_output_files( store .merge(directory_digests) - .map_err(|err| format!("Error when merging output files and directories: {:?}", err)) + .map_err(|err| err.enrich("Error when merging output files and directories")) .await } .boxed() @@ -1328,7 +1347,7 @@ pub async fn check_action_cache( store: Store, eager_fetch: bool, timeout_duration: Duration, -) -> Result, String> { +) -> Result, ProcessError> { in_workunit!( "check_action_cache", Level::Debug, @@ -1407,7 +1426,8 @@ pub async fn check_action_cache( } _ => { workunit.increment_counter(Metric::RemoteCacheReadErrors, 1); - Err(status_to_str(status)) + // TODO: Ensure that we're catching missing digests. + Err(status_to_str(status).into()) } }, } @@ -1449,7 +1469,7 @@ pub async fn ensure_action_uploaded( command_digest: Digest, action_digest: Digest, input_files: Option, -) -> Result<(), String> { +) -> Result<(), StoreError> { in_workunit!( "ensure_action_uploaded", Level::Trace, diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 2079bc82ddf..3e86a28b19a 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -17,14 +17,15 @@ use protos::gen::build::bazel::remote::execution::v2 as remexec; use protos::require_digest; use remexec::action_cache_client::ActionCacheClient; use remexec::{ActionResult, Command, Tree}; -use store::Store; +use store::{Store, StoreError}; use workunit_store::{ in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, WorkunitMetadata, }; use crate::remote::make_execute_request; use crate::{ - Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessMetadata, + Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, + ProcessMetadata, }; #[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)] @@ -183,7 +184,7 @@ impl CommandRunner { command: &Command, result: &FallibleProcessResultWithPlatform, store: &Store, - ) -> Result<(ActionResult, Vec), String> { + ) -> Result<(ActionResult, Vec), StoreError> { let output_trie = store .load_digest_trie(result.output_directory.clone()) .await?; @@ -248,8 +249,11 @@ impl CommandRunner { cache_lookup_start: Instant, action_digest: Digest, request: &Process, - mut local_execution_future: BoxFuture<'_, Result>, - ) -> Result<(FallibleProcessResultWithPlatform, bool), String> { + mut local_execution_future: BoxFuture< + '_, + Result, + >, + ) -> Result<(FallibleProcessResultWithPlatform, bool), ProcessError> { // A future to read from the cache and log the results accordingly. let cache_read_future = async { let response = crate::remote::check_action_cache( @@ -274,7 +278,7 @@ impl CommandRunner { cached_response_opt } Err(err) => { - self.log_cache_error(err, CacheErrorType::ReadError); + self.log_cache_error(err.to_string(), CacheErrorType::ReadError); None } } @@ -333,7 +337,7 @@ impl CommandRunner { command: &Command, action_digest: Digest, command_digest: Digest, - ) -> Result<(), String> { + ) -> Result<(), StoreError> { // Upload the Action and Command, but not the input files. See #12432. // Assumption: The Action and Command have already been stored locally. crate::remote::ensure_action_uploaded(&self.store, command_digest, action_digest, None).await?; @@ -422,7 +426,7 @@ impl crate::CommandRunner for CommandRunner { context: Context, workunit: &mut RunningWorkunit, request: Process, - ) -> Result { + ) -> Result { let cache_lookup_start = Instant::now(); // Construct the REv2 ExecuteRequest and related data for this execution request. let (action, command, _execute_request) = @@ -473,7 +477,7 @@ impl crate::CommandRunner for CommandRunner { match write_result { Ok(_) => workunit.increment_counter(Metric::RemoteCacheWriteSuccesses, 1), Err(err) => { - command_runner.log_cache_error(err, CacheErrorType::WriteError); + command_runner.log_cache_error(err.to_string(), CacheErrorType::WriteError); workunit.increment_counter(Metric::RemoteCacheWriteErrors, 1); } }; diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 70ad0d567a3..3e1022399bb 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -21,7 +21,7 @@ use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; use crate::remote::{ensure_action_stored_locally, make_execute_request}; use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, Platform, - Process, ProcessMetadata, ProcessResultMetadata, ProcessResultSource, + Process, ProcessError, ProcessMetadata, ProcessResultMetadata, ProcessResultSource, RemoteCacheWarningsBehavior, }; @@ -30,7 +30,7 @@ const CACHE_READ_TIMEOUT: Duration = Duration::from_secs(5); /// A mock of the local runner used for better hermeticity of the tests. #[derive(Clone)] struct MockLocalCommandRunner { - result: Result, + result: Result, call_counter: Arc, delay: Duration, } @@ -63,7 +63,7 @@ impl CommandRunnerTrait for MockLocalCommandRunner { _context: Context, _workunit: &mut RunningWorkunit, _req: Process, - ) -> Result { + ) -> Result { sleep(self.delay).await; self.call_counter.fetch_add(1, Ordering::SeqCst); self.result.clone() @@ -562,7 +562,7 @@ async fn make_action_result_basic() { _context: Context, _workunit: &mut RunningWorkunit, _req: Process, - ) -> Result { + ) -> Result { unimplemented!() } } diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index c3bd2905589..bffe3eb75c7 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -15,7 +15,7 @@ use protos::gen::google::longrunning::Operation; use remexec::ExecutedActionMetadata; use spectral::prelude::*; use spectral::{assert_that, string::StrAssertions}; -use store::{SnapshotOps, Store}; +use store::{SnapshotOps, Store, StoreError}; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory, TestTree}; use testutil::{owned_string_vec, relative_paths}; @@ -24,7 +24,7 @@ use workunit_store::{RunId, WorkunitStore}; use crate::remote::{digest, CommandRunner, ExecutionError, OperationOrStatus}; use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, InputDigests, - Platform, Process, ProcessCacheScope, ProcessMetadata, + Platform, Process, ProcessCacheScope, ProcessError, ProcessMetadata, }; use fs::{RelativePath, EMPTY_DIRECTORY_DIGEST}; use std::any::type_name; @@ -886,8 +886,8 @@ async fn server_rejecting_execute_request_gives_error() { let error = run_command_remote(mock_server.address(), execute_request) .await .expect_err("Want Err"); - assert_that(&error).contains("InvalidArgument"); - assert_that(&error).contains("Did not expect this request"); + assert_that(&error.to_string()).contains("InvalidArgument"); + assert_that(&error.to_string()).contains("Did not expect this request"); } #[tokio::test] @@ -1221,7 +1221,6 @@ async fn ensure_inline_stdio_is_stored() { local_store .load_file_bytes_with(test_stdout.digest(), |v| Bytes::copy_from_slice(v)) .await - .unwrap() .unwrap(), test_stdout.bytes() ); @@ -1229,7 +1228,6 @@ async fn ensure_inline_stdio_is_stored() { local_store .load_file_bytes_with(test_stderr.digest(), |v| Bytes::copy_from_slice(v)) .await - .unwrap() .unwrap(), test_stderr.bytes() ); @@ -1325,7 +1323,7 @@ async fn initial_response_error() { .await .expect_err("Want Err"); - assert_eq!(result, "Internal: Something went wrong"); + assert_eq!(result.to_string(), "Internal: Something went wrong"); } #[tokio::test] @@ -1368,7 +1366,10 @@ async fn initial_response_missing_response_and_error() { .await .expect_err("Want Err"); - assert_eq!(result, "Operation finished but no response supplied"); + assert_eq!( + result.to_string(), + "Operation finished but no response supplied" + ); } #[tokio::test] @@ -1425,7 +1426,7 @@ async fn fails_after_retry_limit_exceeded() { .expect_err("Expected error"); assert_eq!( - result, + result.to_string(), "Too many failures from server. The last error was: the bot running the task appears to be lost" ); } @@ -1485,7 +1486,7 @@ async fn fails_after_retry_limit_exceeded_with_stream_close() { .expect_err("Expected error"); assert_eq!( - result, + result.to_string(), "Too many failures from server. The last event was the server disconnecting with no error given." ); } @@ -1665,7 +1666,7 @@ async fn execute_missing_file_errors_if_unknown() { .run(Context::default(), &mut workunit, cat_roland_request()) .await .expect_err("Want error"); - assert_contains(&error, &format!("{}", missing_digest.hash)); + assert_contains(&error.to_string(), &format!("{}", missing_digest.hash)); } #[tokio::test] @@ -1759,7 +1760,7 @@ async fn extract_execute_response_missing_digests() { assert_eq!( extract_execute_response(operation, Platform::Linux_x86_64).await, - Err(ExecutionError::MissingDigests(missing_files)) + Err(ExecutionError::MissingRemoteDigests(missing_files)) ); } @@ -1780,7 +1781,7 @@ async fn extract_execute_response_missing_other_things() { .unwrap(); match extract_execute_response(operation, Platform::Linux_x86_64).await { - Err(ExecutionError::Fatal(err)) => assert_contains(&err, "monkeys"), + Err(ExecutionError::Fatal(err)) => assert_contains(&err.to_string(), "monkeys"), other => assert!(false, "Want fatal error, got {:?}", other), }; } @@ -1798,7 +1799,7 @@ async fn extract_execute_response_other_failed_precondition() { .unwrap(); match extract_execute_response(operation, Platform::Linux_x86_64).await { - Err(ExecutionError::Fatal(err)) => assert_contains(&err, "OUT_OF_CAPACITY"), + Err(ExecutionError::Fatal(err)) => assert_contains(&err.to_string(), "OUT_OF_CAPACITY"), other => assert!(false, "Want fatal error, got {:?}", other), }; } @@ -1813,7 +1814,9 @@ async fn extract_execute_response_missing_without_list() { .unwrap(); match extract_execute_response(operation, Platform::Linux_x86_64).await { - Err(ExecutionError::Fatal(err)) => assert_contains(&err.to_lowercase(), "precondition"), + Err(ExecutionError::Fatal(err)) => { + assert_contains(&err.to_string().to_lowercase(), "precondition") + } other => assert!(false, "Want fatal error, got {:?}", other), }; } @@ -1839,7 +1842,7 @@ async fn extract_execute_response_other_status() { }; match extract_execute_response(operation, Platform::Linux_x86_64).await { - Err(ExecutionError::Fatal(err)) => assert_contains(&err, "PermissionDenied"), + Err(ExecutionError::Fatal(err)) => assert_contains(&err.to_string(), "PermissionDenied"), other => assert!(false, "Want fatal error, got {:?}", other), }; } @@ -2291,19 +2294,17 @@ pub(crate) async fn run_cmd_runner( request: Process, command_runner: R, store: Store, -) -> Result { +) -> Result { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let original = command_runner .run(Context::default(), &mut workunit, request) .await?; let stdout_bytes = store .load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec()) - .await? - .unwrap(); + .await?; let stderr_bytes = store .load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec()) - .await? - .unwrap(); + .await?; Ok(RemoteTestResult { original, stdout_bytes, @@ -2338,7 +2339,10 @@ fn create_command_runner( (command_runner, store) } -async fn run_command_remote(address: String, request: Process) -> Result { +async fn run_command_remote( + address: String, + request: Process, +) -> Result { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let cas = mock::StubCAS::builder() .file(&TestData::roland()) @@ -2352,12 +2356,10 @@ async fn run_command_remote(address: String, request: Process) -> Result = store .load_file_bytes_with(original.stdout_digest, |bytes| bytes.to_vec()) .await - .unwrap() .unwrap(); let stderr_bytes: Vec = store .load_file_bytes_with(original.stderr_digest, |bytes| bytes.to_vec()) .await - .unwrap() .unwrap(); Ok(RemoteTestResult { @@ -2425,7 +2425,7 @@ async fn extract_execute_response( async fn extract_output_files_from_response( execute_response: &remexec::ExecuteResponse, -) -> Result { +) -> Result { let cas = mock::StubCAS::builder() .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index c1ecdf442e7..c6fda924984 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -41,7 +41,7 @@ use prost::Message; use protos::gen::build::bazel::remote::execution::v2::{Action, Command}; use protos::gen::buildbarn::cas::UncachedActionResult; use protos::require_digest; -use store::{SnapshotOps, Store}; +use store::Store; use structopt::StructOpt; use workunit_store::{in_workunit, Level, WorkunitStore}; @@ -342,13 +342,11 @@ async fn main() { let stdout: Vec = store .load_file_bytes_with(result.stdout_digest, |bytes| bytes.to_vec()) .await - .unwrap() .unwrap(); let stderr: Vec = store .load_file_bytes_with(result.stderr_digest, |bytes| bytes.to_vec()) .await - .unwrap() .unwrap(); print!("{}", String::from_utf8(stdout).unwrap()); @@ -458,8 +456,8 @@ async fn extract_request_from_action_digest( ) -> Result<(process_execution::Process, ProcessMetadata), String> { let action = store .load_file_bytes_with(action_digest, |bytes| Action::decode(bytes)) - .await? - .ok_or_else(|| format!("Could not find action proto in CAS: {:?}", action_digest))? + .await + .map_err(|e| e.enrich("Could not load action proto from CAS").to_string())? .map_err(|err| { format!( "Error deserializing action proto {:?}: {:?}", @@ -471,8 +469,11 @@ async fn extract_request_from_action_digest( .map_err(|err| format!("Bad Command digest: {:?}", err))?; let command = store .load_file_bytes_with(command_digest, |bytes| Command::decode(bytes)) - .await? - .ok_or_else(|| format!("Could not find command proto in CAS: {:?}", command_digest))? + .await + .map_err(|e| { + e.enrich("Could not load command proto from CAS") + .to_string() + })? .map_err(|err| { format!( "Error deserializing command proto {:?}: {:?}", @@ -496,8 +497,9 @@ async fn extract_request_from_action_digest( // In case the local Store doesn't have the input root Directory, // have it fetch it and identify it as a Directory, so that it doesn't get confused about the unknown metadata. store - .load_directory_or_err(input_digests.complete.as_digest()) - .await?; + .load_directory(input_digests.complete.as_digest()) + .await + .map_err(|e| e.to_string())?; let process = process_execution::Process { argv: command.arguments, @@ -584,8 +586,8 @@ async fn extract_request_from_buildbarn_url( .load_file_bytes_with(action_result_digest, |bytes| { UncachedActionResult::decode(bytes) }) - .await? - .ok_or_else(|| "Couldn't fetch action result proto".to_owned())? + .await + .map_err(|e| e.enrich("Could not load action result proto").to_string())? .map_err(|err| format!("Error deserializing action result proto: {:?}", err))?; require_digest(&action_result.action_digest)? diff --git a/src/rust/engine/src/externs/fs.rs b/src/rust/engine/src/externs/fs.rs index faae0385b86..30077297ba7 100644 --- a/src/rust/engine/src/externs/fs.rs +++ b/src/rust/engine/src/externs/fs.rs @@ -36,6 +36,13 @@ pub(crate) fn register(m: &PyModule) -> PyResult<()> { Ok(()) } +// TODO: This method is a marker, but in a followup PR we will need to propagate a particular +// Exception type out of `@rule` bodies and back into `Failure::MissingDigest`, to allow for retry +// via #11331. +pub fn todo_possible_store_missing_digest(e: store::StoreError) -> PyErr { + PyException::new_err(e.to_string()) +} + #[pyclass(name = "Digest")] #[derive(Clone, Debug, PartialEq)] pub struct PyDigest(pub DirectoryDigest); diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index bea47decac4..28ed2473972 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -44,7 +44,7 @@ use workunit_store::{ ArtifactOutput, ObservationMetric, UserMetadataItem, Workunit, WorkunitState, WorkunitStore, }; -use crate::externs::fs::PyFileDigest; +use crate::externs::fs::{todo_possible_store_missing_digest, PyFileDigest}; use crate::{ externs, nodes, Context, Core, ExecutionRequest, ExecutionStrategyOptions, ExecutionTermination, Failure, Function, Intrinsic, Intrinsics, Key, LocalStoreOptions, Params, RemotingOptions, Rule, @@ -473,7 +473,7 @@ fn py_result_from_root(py: Python, result: Result) -> PyResult { }, Err(f) => { let (val, python_traceback, engine_traceback) = match f { - f @ Failure::Invalidated => { + f @ (Failure::Invalidated | Failure::MissingDigest { .. }) => { let msg = format!("{}", f); let python_traceback = Failure::native_traceback(&msg); ( @@ -782,7 +782,7 @@ async fn workunit_to_py_value( })?; let snapshot = store::Snapshot::from_digest(store, digest.clone()) .await - .map_err(PyException::new_err)?; + .map_err(todo_possible_store_missing_digest)?; let gil = Python::acquire_gil(); let py = gil.python(); crate::nodes::Snapshot::store_snapshot(py, snapshot).map_err(PyException::new_err)? @@ -1400,7 +1400,7 @@ fn lease_files_in_graph( .executor .block_on(core.store().lease_all_recursively(digests.iter())) }) - .map_err(PyException::new_err) + .map_err(todo_possible_store_missing_digest) }) } @@ -1487,7 +1487,7 @@ fn ensure_remote_has_recursive( .executor .block_on(core.store().ensure_remote_has_recursive(digests)) }) - .map_err(PyException::new_err)?; + .map_err(todo_possible_store_missing_digest)?; Ok(()) }) } @@ -1507,7 +1507,7 @@ fn ensure_directory_digest_persisted( .executor .block_on(core.store().ensure_directory_digest_persisted(digest)) }) - .map_err(PyException::new_err)?; + .map_err(todo_possible_store_missing_digest)?; Ok(()) }) } @@ -1530,17 +1530,13 @@ fn single_file_digests_to_bytes<'py>( externs::store_bytes(py, bytes) }) .await - .and_then(|maybe_bytes| { - maybe_bytes - .ok_or_else(|| format!("Error loading bytes from digest: {:?}", py_file_digest.0)) - }) } }); let bytes_values: Vec = py .allow_threads(|| core.executor.block_on(future::try_join_all(digest_futures))) .map(|values| values.into_iter().map(|val| val.into()).collect()) - .map_err(PyException::new_err)?; + .map_err(todo_possible_store_missing_digest)?; let output_list = PyList::new(py, &bytes_values); Ok(output_list) @@ -1577,7 +1573,7 @@ fn write_digest( ) .await }) - .map_err(PyValueError::new_err) + .map_err(todo_possible_store_missing_digest) }) } diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 09d6b4de9a1..98f13e77811 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -19,18 +19,20 @@ use crate::tasks::Intrinsic; use crate::types::Types; use crate::Failure; +use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; +use futures::try_join; +use indexmap::IndexMap; +use pyo3::{PyRef, Python}; +use tempfile::TempDir; +use tokio::process; + use fs::{ safe_create_dir_all_ioerror, DirectoryDigest, Permissions, RelativePath, EMPTY_DIRECTORY_DIGEST, }; -use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; use hashing::Digest; -use indexmap::IndexMap; use process_execution::{CacheName, ManagedChild, NamedCaches}; -use pyo3::{PyRef, Python}; use stdio::TryCloneAsFile; use store::{SnapshotOps, SubsetParams}; -use tempfile::TempDir; -use tokio::process; type IntrinsicFn = Box) -> BoxFuture<'static, NodeResult> + Send + Sync>; @@ -179,33 +181,15 @@ fn process_request_to_process_result( let result = context.get(process_request).await?.0; - let maybe_stdout = context - .core - .store() - .load_file_bytes_with(result.stdout_digest, |bytes: &[u8]| bytes.to_owned()) - .await - .map_err(throw)?; - - let maybe_stderr = context - .core - .store() - .load_file_bytes_with(result.stderr_digest, |bytes: &[u8]| bytes.to_owned()) - .await - .map_err(throw)?; - - let stdout_bytes = maybe_stdout.ok_or_else(|| { - throw(format!( - "Bytes from stdout Digest {:?} not found in store", - result.stdout_digest - )) - })?; - - let stderr_bytes = maybe_stderr.ok_or_else(|| { - throw(format!( - "Bytes from stderr Digest {:?} not found in store", - result.stderr_digest - )) - })?; + let store = context.core.store(); + let (stdout_bytes, stderr_bytes) = try_join!( + store + .load_file_bytes_with(result.stdout_digest, |bytes: &[u8]| bytes.to_owned()) + .map_err(|e| e.enrich("Bytes from stdout")), + store + .load_file_bytes_with(result.stderr_digest, |bytes: &[u8]| bytes.to_owned()) + .map_err(|e| e.enrich("Bytes from stderr")) + )?; let platform_name: String = result.platform.into(); let gil = Python::acquire_gil(); @@ -215,11 +199,11 @@ fn process_request_to_process_result( context.core.types.process_result, &[ externs::store_bytes(py, &stdout_bytes), - Snapshot::store_file_digest(py, result.stdout_digest).map_err(throw)?, + Snapshot::store_file_digest(py, result.stdout_digest)?, externs::store_bytes(py, &stderr_bytes), - Snapshot::store_file_digest(py, result.stderr_digest).map_err(throw)?, + Snapshot::store_file_digest(py, result.stderr_digest)?, externs::store_i64(py, result.exit_code.into()), - Snapshot::store_directory_digest(py, result.output_directory).map_err(throw)?, + Snapshot::store_directory_digest(py, result.output_directory)?, externs::unsafe_call( py, context.core.types.platform, @@ -252,18 +236,13 @@ fn directory_digest_to_digest_contents( let digest = Python::with_gil(|py| { let py_digest = (*args[0]).as_ref(py); lift_directory_digest(py_digest) - }) - .map_err(throw)?; + })?; - let digest_contents = context - .core - .store() - .contents_for_directory(digest) - .await - .map_err(throw)?; + let digest_contents = context.core.store().contents_for_directory(digest).await?; let gil = Python::acquire_gil(); - Snapshot::store_digest_contents(gil.python(), &context, &digest_contents).map_err(throw) + let value = Snapshot::store_digest_contents(gil.python(), &context, &digest_contents)?; + Ok(value) } .boxed() } @@ -276,19 +255,11 @@ fn directory_digest_to_digest_entries( let digest = Python::with_gil(|py| { let py_digest = (*args[0]).as_ref(py); lift_directory_digest(py_digest) - }) - .map_err(throw)?; - let snapshot = context - .core - .store() - .entries_for_directory(digest) - .await - .and_then(move |digest_entries| { - let gil = Python::acquire_gil(); - Snapshot::store_digest_entries(gil.python(), &context, &digest_entries) - }) - .map_err(throw)?; - Ok(snapshot) + })?; + let digest_entries = context.core.store().entries_for_directory(digest).await?; + let gil = Python::acquire_gil(); + let value = Snapshot::store_digest_entries(gil.python(), &context, &digest_entries)?; + Ok(value) } .boxed() } @@ -308,14 +279,10 @@ fn remove_prefix_request_to_digest( let res: NodeResult<_> = Ok((py_remove_prefix.digest.clone(), prefix)); res })?; - let digest = context - .core - .store() - .strip_prefix(digest, &prefix) - .await - .map_err(throw)?; + let digest = context.core.store().strip_prefix(digest, &prefix).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) + let value = Snapshot::store_directory_digest(gil.python(), digest)?; + Ok(value) } .boxed() } @@ -336,14 +303,10 @@ fn add_prefix_request_to_digest( Ok((py_add_prefix.digest.clone(), prefix)); res })?; - let digest = context - .core - .store() - .add_prefix(digest, &prefix) - .await - .map_err(throw)?; + let digest = context.core.store().add_prefix(digest, &prefix).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) + let value = Snapshot::store_directory_digest(gil.python(), digest)?; + Ok(value) } .boxed() } @@ -357,9 +320,9 @@ fn digest_to_snapshot(context: Context, args: Vec) -> BoxFuture<'static, })?; let snapshot = store::Snapshot::from_digest(store, digest).await?; let gil = Python::acquire_gil(); - Snapshot::store_snapshot(gil.python(), snapshot) + let value = Snapshot::store_snapshot(gil.python(), snapshot)?; + Ok(value) } - .map_err(throw) .boxed() } @@ -377,9 +340,10 @@ fn merge_digests_request_to_digest( .map(|py_merge_digests| py_merge_digests.0.clone()) .map_err(|e| throw(format!("{}", e))) })?; - let digest = store.merge(digests).await.map_err(throw)?; + let digest = store.merge(digests).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) + let value = Snapshot::store_directory_digest(gil.python(), digest)?; + Ok(value) } .boxed() } @@ -392,7 +356,8 @@ fn download_file_to_digest( let key = Key::from_value(args.pop().unwrap()).map_err(Failure::from_py_err)?; let snapshot = context.get(DownloadedFile(key)).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), snapshot.into()).map_err(throw) + let value = Snapshot::store_directory_digest(gil.python(), snapshot.into())?; + Ok(value) } .boxed() } @@ -409,7 +374,8 @@ fn path_globs_to_digest( .map_err(|e| throw(format!("Failed to parse PathGlobs: {}", e)))?; let snapshot = context.get(Snapshot::from_path_globs(path_globs)).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), snapshot.into()).map_err(throw) + let value = Snapshot::store_directory_digest(gil.python(), snapshot.into())?; + Ok(value) } .boxed() } @@ -427,7 +393,8 @@ fn path_globs_to_paths( .map_err(|e| throw(format!("Failed to parse PathGlobs: {}", e)))?; let paths = context.get(Paths::from_path_globs(path_globs)).await?; let gil = Python::acquire_gil(); - Paths::store_paths(gil.python(), &core, &paths).map_err(throw) + let value = Paths::store_paths(gil.python(), &core, &paths)?; + Ok(value) } .boxed() } @@ -494,7 +461,7 @@ fn create_digest_to_digest( CreateDigestItem::Dir(path) => store .create_empty_dir(&path) .await - .map_err(|e| format!("{:?}", e)), + .map_err(|e| e.to_string()), } } }) @@ -502,10 +469,11 @@ fn create_digest_to_digest( let store = context.core.store(); async move { - let digests = future::try_join_all(digest_futures).await.map_err(throw)?; - let digest = store.merge(digests).await.map_err(throw)?; + let digests = future::try_join_all(digest_futures).await?; + let digest = store.merge(digests).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) + let value = Snapshot::store_directory_digest(gil.python(), digest)?; + Ok(value) } .boxed() } @@ -521,18 +489,16 @@ fn digest_subset_to_digest( let py_path_globs = externs::getattr(py_digest_subset, "globs").unwrap(); let py_digest = externs::getattr(py_digest_subset, "digest").unwrap(); let res: NodeResult<_> = Ok(( - Snapshot::lift_prepared_path_globs(py_path_globs).map_err(throw)?, - lift_directory_digest(py_digest).map_err(throw)?, + Snapshot::lift_prepared_path_globs(py_path_globs)?, + lift_directory_digest(py_digest)?, )); res })?; let subset_params = SubsetParams { globs: path_globs }; - let digest = store - .subset(original_digest, subset_params) - .await - .map_err(throw)?; + let digest = store.subset(original_digest, subset_params).await?; let gil = Python::acquire_gil(); - Snapshot::store_directory_digest(gil.python(), digest).map_err(throw) + let value = Snapshot::store_directory_digest(gil.python(), digest)?; + Ok(value) } .boxed() } diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 18f36213fa5..d6bd8dd395f 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -428,8 +428,7 @@ impl WrappedNode for ExecuteProcess { let res = command_runner .run(execution_context, workunit, request.clone()) - .await - .map_err(throw)?; + .await?; let definition = serde_json::to_string(&request) .map_err(|e| throw(format!("Failed to serialize process: {}", e)))?; @@ -932,11 +931,11 @@ impl DownloadedFile { // If we hit the ObservedUrls cache, then we have successfully fetched this Digest from // this URL before. If we still have the bytes, then we skip fetching the content again. let usable_in_store = have_observed_url - && core + && (core .store() .load_file_bytes_with(digest, |_| ()) - .await? - .is_some(); + .await + .is_ok()); if !usable_in_store { downloads::download(core.clone(), url, file_name, digest).await?; diff --git a/src/rust/engine/src/python.rs b/src/rust/engine/src/python.rs index cfd0b567464..cb33bbfd223 100644 --- a/src/rust/engine/src/python.rs +++ b/src/rust/engine/src/python.rs @@ -12,6 +12,10 @@ use pyo3::types::{PyDict, PyType}; use pyo3::{FromPyObject, ToPyObject}; use smallvec::SmallVec; +use hashing::Digest; +use process_execution::ProcessError; +use store::StoreError; + use crate::externs; /// @@ -371,7 +375,10 @@ pub enum Failure { /// A Node failed because a filesystem change invalidated it or its inputs. /// A root requestor should usually immediately retry their request. Invalidated, - /// An error was thrown. + /// A Digest was missing from the configured Stores. This error may be recoverable if the source + /// of the missing Digest can be identified and retried (such as if it was produced by a cache). + MissingDigest(String, Digest), + /// An unclassified error was thrown. Throw { // A python exception value, which might have a python-level stacktrace val: Value, @@ -389,6 +396,13 @@ impl Failure { pub fn with_pushed_frame(self, frame: &impl fmt::Display) -> Failure { match self { Failure::Invalidated => Failure::Invalidated, + md @ Failure::MissingDigest { .. } => { + // MissingDigest errors are usually handled at the WrappedNode boundary by restarting the + // producer of the missing digest. So a Failure will only end up with a new frame if it + // traversed the node boundary for some reason, in which case it is safe to discard the + // type information and convert into a Throw. + throw(md.to_string()).with_pushed_frame(frame) + } Failure::Throw { val, python_traceback, @@ -454,6 +468,9 @@ impl fmt::Display for Failure { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Failure::Invalidated => write!(f, "Giving up on retrying due to changed files."), + Failure::MissingDigest(s, d) => { + write!(f, "Could not recover from missing digest: {s}: {d:?}") + } Failure::Throw { val, .. } => { let repr = Python::with_gil(|py| { let obj = (*val.0).as_ref(py); @@ -465,6 +482,24 @@ impl fmt::Display for Failure { } } +impl From for Failure { + fn from(err: ProcessError) -> Self { + match err { + ProcessError::MissingDigest(s, d) => Self::MissingDigest(s, d), + ProcessError::Unclassified(s) => throw(s), + } + } +} + +impl From for Failure { + fn from(err: StoreError) -> Self { + match err { + StoreError::MissingDigest(s, d) => Self::MissingDigest(s, d), + StoreError::Unclassified(s) => throw(s), + } + } +} + impl From for Failure { fn from(err: String) -> Self { throw(err) @@ -472,8 +507,8 @@ impl From for Failure { } pub fn throw(msg: String) -> Failure { - let gil = Python::acquire_gil(); let python_traceback = Failure::native_traceback(&msg); + let gil = Python::acquire_gil(); Failure::Throw { val: externs::create_exception(gil.python(), msg), python_traceback,