Skip to content

Commit

Permalink
Differentiate and propagate missing digest errors (#15761)
Browse files Browse the repository at this point in the history
In order to begin lazily fetching digests for #11331, we need to be able to differentiate errors caused by missing digests.

This change introduces `{StoreError,ProcessError,Failure}::MissingDigest`, which are bubbled up to the level of graph `Node`s, and which will allow us to introduce retry in a followup change. `MissingDigest` is produced by `load_bytes_with` and a few other low-level `Store` methods (which now return an `Err(MissingDigest)` rather than `Option`).

There should not be any behavior changes.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Jun 7, 2022
1 parent 5e208ec commit 1c495dd
Show file tree
Hide file tree
Showing 28 changed files with 758 additions and 712 deletions.
57 changes: 31 additions & 26 deletions src/rust/engine/fs/brfs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use log::{debug, error, warn};
use parking_lot::Mutex;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use protos::require_digest;
use store::Store;
use store::{Store, StoreError};
use tokio::signal::unix::{signal, SignalKind};
use tokio::task;
use tokio_stream::wrappers::SignalStream;
Expand Down Expand Up @@ -196,7 +196,7 @@ impl BuildResultFS {
.runtime
.block_on(async move { store.load_file_bytes_with(digest, |_| ()).await })
{
Ok(Some(())) => {
Ok(()) => {
let executable_inode = self.next_inode;
self.next_inode += 1;
let non_executable_inode = self.next_inode;
Expand Down Expand Up @@ -224,8 +224,8 @@ impl BuildResultFS {
non_executable_inode
}))
}
Ok(None) => Ok(None),
Err(err) => Err(err),
Err(StoreError::MissingDigest { .. }) => Ok(None),
Err(err) => Err(err.to_string()),
}
}
}
Expand All @@ -240,7 +240,7 @@ impl BuildResultFS {
.runtime
.block_on(async move { store.load_directory(digest).await })
{
Ok(Some(_)) => {
Ok(_) => {
// TODO: Kick off some background futures to pre-load the contents of this Directory into
// an in-memory cache. Keep a background CPU pool driving those Futures.
let inode = self.next_inode;
Expand All @@ -256,8 +256,8 @@ impl BuildResultFS {
);
Ok(Some(inode))
}
Ok(None) => Ok(None),
Err(err) => Err(err),
Err(StoreError::MissingDigest { .. }) => Ok(None),
Err(err) => Err(err.to_string()),
}
}
}
Expand Down Expand Up @@ -335,7 +335,7 @@ impl BuildResultFS {
.block_on(async move { store.load_directory(digest).await });

match maybe_directory {
Ok(Some(directory)) => {
Ok(directory) => {
let mut entries = vec![
ReaddirEntry {
inode: inode,
Expand Down Expand Up @@ -396,7 +396,7 @@ impl BuildResultFS {

Ok(entries)
}
Ok(None) => Err(libc::ENOENT),
Err(StoreError::MissingDigest { .. }) => Err(libc::ENOENT),
Err(err) => {
error!("Error loading directory {:?}: {}", digest, err);
Err(libc::EINVAL)
Expand Down Expand Up @@ -477,14 +477,18 @@ impl fuser::Filesystem for BuildResultFS {
.and_then(|cache_entry| {
let store = self.store.clone();
let parent_digest = cache_entry.digest;
self
let directory = self
.runtime
.block_on(async move { store.load_directory(parent_digest).await })
.map_err(|err| {
error!("Error reading directory {:?}: {}", parent_digest, err);
libc::EINVAL
})?
.and_then(|directory| self.node_for_digest(&directory, filename))
.map_err(|err| match err {
StoreError::MissingDigest { .. } => libc::ENOENT,
err => {
error!("Error reading directory {:?}: {}", parent_digest, err);
libc::EINVAL
}
})?;
self
.node_for_digest(&directory, filename)
.ok_or(libc::ENOENT)
})
.and_then(|node| match node {
Expand Down Expand Up @@ -583,19 +587,20 @@ impl fuser::Filesystem for BuildResultFS {
})
.await
})
.map(|v| {
if v.is_none() {
let maybe_reply = reply2.lock().take();
if let Some(reply) = maybe_reply {
reply.error(libc::ENOENT);
}
}
})
.or_else(|err| {
error!("Error loading bytes for {:?}: {}", digest, err);
let maybe_reply = reply2.lock().take();
if let Some(reply) = maybe_reply {
reply.error(libc::EINVAL);
match err {
StoreError::MissingDigest { .. } => {
if let Some(reply) = maybe_reply {
reply.error(libc::ENOENT);
}
}
err => {
error!("Error loading bytes for {:?}: {}", digest, err);
if let Some(reply) = maybe_reply {
reply.error(libc::EINVAL);
}
}
}
Ok(())
});
Expand Down
Loading

0 comments on commit 1c495dd

Please sign in to comment.