Skip to content

Commit

Permalink
Backtrack execution for missing digests to make eager_fetch=false m…
Browse files Browse the repository at this point in the history
…ore resilient (#15850)

As described in #11331, in order to avoid having to deal with missing remote content later in the pipeline, `--remote-cache-eager-fetch` currently defaults to true. This means that before calling a cache hit a hit, we fully download the output of the cache entry.

In warm-cache situations, this can mean downloading a lot more than is strictly necessary. In theory, you could imagine `eager_fetch=False` downloading only stdio and no file content at all for a 100% cache hit rate run of tests. In practice, high hitrate runs [see about 80% fewer bytes downloaded, and 50% fewer RPCs](#11331 (comment)) than with `eager_fetch=True`.

To begin moving toward disabling `eager_fetch` by default (and eventually, ideally, removing the flag entirely), this change begins "backtracking" when missing digests are encountered. Backtracking is implemented by "catching" `MissingDigest` errors (introduced in #15761), and invalidating their source `Node` in the graph. When a `Node` that produced a missing digest re-runs, it does so using progressively fewer caches (as introduced in #15854), in order to cache bust both local and remote partial cache entries.

`eager_fetch=False` was already experimental, in that any `MissingDigest` error encountered later in the run would kill the entire run. Backtracking makes `eager_fetch=False` less experimental, in that we are now very likely to recover from a `MissingDigest` error. But it is still the case with `eager_fetch=False` that persistent remote infrastructure errors (those that last longer than our retry budget or timeout) could kill a run. Given that, we will likely want to gain more experience and further tune timeouts and retries before changing the default.

Fixes #11331.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Jun 17, 2022
1 parent c5168c5 commit 7dd8605
Show file tree
Hide file tree
Showing 22 changed files with 1,240 additions and 1,003 deletions.
2 changes: 2 additions & 0 deletions pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ unmatched_build_file_globs = "error"
remote_store_address = "grpcs://cache.toolchain.com:443"
remote_instance_name = "main"
remote_auth_plugin = "toolchain.pants.auth.plugin:toolchain_auth_plugin"
# See https://github.com/pantsbuild/pants/issues/11331.
remote_cache_eager_fetch = false

[anonymous-telemetry]
enabled = true
Expand Down
9 changes: 8 additions & 1 deletion src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,21 @@ class PantsdClientException(Exception):
# ------------------------------------------------------------------------------

class PyStubCASBuilder:
def always_errors(self) -> PyStubCASBuilder: ...
def ac_always_errors(self) -> PyStubCASBuilder: ...
def cas_always_errors(self) -> PyStubCASBuilder: ...
def build(self, executor: PyExecutor) -> PyStubCAS: ...

class PyStubCAS:
@classmethod
def builder(cls) -> PyStubCASBuilder: ...
@property
def address(self) -> str: ...
def remove(self, digest: FileDigest) -> bool: ...
def action_cache_len(self) -> int: ...

# ------------------------------------------------------------------------------
# (etc.)
# ------------------------------------------------------------------------------

class RawFdRunner(Protocol):
def __call__(
Expand Down
8 changes: 4 additions & 4 deletions src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async fn missing_directory() {
#[tokio::test]
async fn load_file_grpc_error() {
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();

let error = load_file_bytes(&new_byte_store(&cas), TestData::roland().digest())
.await
Expand All @@ -80,7 +80,7 @@ async fn load_file_grpc_error() {
#[tokio::test]
async fn load_directory_grpc_error() {
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();

let error = load_directory_proto_bytes(
&new_byte_store(&cas),
Expand Down Expand Up @@ -212,7 +212,7 @@ async fn write_empty_file() {

#[tokio::test]
async fn write_file_errors() {
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();

let store = new_byte_store(&cas);
let error = store
Expand Down Expand Up @@ -290,7 +290,7 @@ async fn list_missing_digests_some_missing() {
#[tokio::test]
async fn list_missing_digests_error() {
let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();

let store = new_byte_store(&cas);

Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/fs/store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async fn load_file_remote_error_is_error() {
let dir = TempDir::new().unwrap();

let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();
let error = load_file_bytes(
&new_store(dir.path(), &cas.address()),
TestData::roland().digest(),
Expand All @@ -303,7 +303,7 @@ async fn load_directory_remote_error_is_error() {
let dir = TempDir::new().unwrap();

let _ = WorkunitStore::setup_for_tests();
let cas = StubCAS::always_errors();
let cas = StubCAS::cas_always_errors();
let error = new_store(dir.path(), &cas.address())
.load_directory(TestData::roland().digest())
.await
Expand Down
39 changes: 23 additions & 16 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct CommandRunner {
inner: Arc<dyn crate::CommandRunner>,
cache: PersistentCache,
file_store: Store,
eager_fetch: bool,
metadata: ProcessMetadata,
}

Expand All @@ -41,12 +42,14 @@ impl CommandRunner {
inner: Arc<dyn crate::CommandRunner>,
cache: PersistentCache,
file_store: Store,
eager_fetch: bool,
metadata: ProcessMetadata,
) -> CommandRunner {
CommandRunner {
inner,
cache,
file_store,
eager_fetch,
metadata,
}
}
Expand Down Expand Up @@ -196,22 +199,26 @@ impl CommandRunner {
return Ok(None);
};

// Ensure that all digests in the result are loadable, erroring if any are not.
let _ = future::try_join_all(vec![
self
.file_store
.ensure_local_has_file(result.stdout_digest)
.boxed(),
self
.file_store
.ensure_local_has_file(result.stderr_digest)
.boxed(),
self
.file_store
.ensure_local_has_recursive_directory(result.output_directory.clone())
.boxed(),
])
.await?;
// If eager_fetch is enabled, ensure that all digests in the result are loadable, erroring
// if any are not. If eager_fetch is disabled, a Digest which is discovered to be missing later
// on during execution will cause backtracking.
if self.eager_fetch {
let _ = future::try_join_all(vec![
self
.file_store
.ensure_local_has_file(result.stdout_digest)
.boxed(),
self
.file_store
.ensure_local_has_file(result.stderr_digest)
.boxed(),
self
.file_store
.ensure_local_has_recursive_directory(result.output_directory.clone())
.boxed(),
])
.await?;
}

Ok(Some(result))
}
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn create_cached_runner(
local.into(),
cache,
store,
true,
ProcessMetadata::default(),
));

Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ pub trait CapturedWorkdir {
workdir_token: Self::WorkdirToken,
exclusive_spawn: bool,
platform: Platform,
) -> Result<FallibleProcessResultWithPlatform, ProcessError> {
) -> Result<FallibleProcessResultWithPlatform, String> {
let start_time = Instant::now();

// Spawn the process.
Expand Down Expand Up @@ -578,7 +578,7 @@ pub trait CapturedWorkdir {
metadata: result_metadata,
})
}
Err(msg) => Err(msg.into()),
Err(msg) => Err(msg),
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ impl super::CommandRunner for CommandRunner {
client_args,
client_main_class,
..
} = ParsedJVMCommandLines::parse_command_lines(&req.argv)?;
} = ParsedJVMCommandLines::parse_command_lines(&req.argv)
.map_err(ProcessError::Unclassified)?;

let nailgun_name = CommandRunner::calculate_nailgun_name(&client_main_class);
let (client_input_digests, server_input_digests) =
Expand All @@ -173,7 +174,7 @@ impl super::CommandRunner for CommandRunner {
self.inner.immutable_inputs(),
)
.await
.map_err(|e| format!("Failed to connect to nailgun! {}", e))?;
.map_err(|e| e.enrich("Failed to connect to nailgun"))?;

// Prepare the workdir.
let exclusive_spawn = prepare_workdir(
Expand Down Expand Up @@ -204,7 +205,7 @@ impl super::CommandRunner for CommandRunner {
// release, it assumes that it has been canceled and kills the server.
nailgun_process.release().await?;

res
Ok(res?)
}
)
.await
Expand Down
Loading

0 comments on commit 7dd8605

Please sign in to comment.