From 87a988670a9ebf372a83806ca91e26de13d405a0 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Wed, 3 Aug 2022 19:05:57 -0400 Subject: [PATCH] Implement a `validate` mode to reduce network usage for remote caches (#16398) As reported in #16096, backtracking still has some kinks to work out, and is unlikely to be fully stable before `2.13.0` ships. To gain "most" of the network-usage benefit of deferring fetching cache content while significantly narrowing the window in which backtracking is necessary, this change replaces the `remote_cache_eager_fetch` flag with a `cache_content_behavior` ternary option. As described in the `help` for the new option, the modes are: * `fetch`: Eagerly fetches cache content: the default. Equivalent to `eager_fetch=True`, the previous default. * `validate`: Validates that cache content exists either locally or remotely, without actually fetching files. This cannot be the default (without changing behavior), because it can still require backtracking if cache content expires during the lifetime of `pantsd` or a single run. * `defer`: Does not validate or fetch content: equivalent to `eager_fetch=False`. Hopefully can eventually be made the default once all issues like #16096 are resolved. Because `validate` narrows the window in which we backtrack to cases where content expires during a run, it should be relatively safe for use even within the `2.13.x` release. [ci skip-build-wheels] --- pants.toml | 6 +- .../pants/engine/internals/scheduler.py | 2 +- src/python/pants/option/global_options.py | 69 ++++++++++++++++-- src/rust/engine/fs/store/src/lib.rs | 72 ++++++++++++++++++- src/rust/engine/fs/store/src/local.rs | 14 ++++ src/rust/engine/fs/store/src/remote.rs | 6 +- src/rust/engine/fs/store/src/remote_tests.rs | 10 +-- .../engine/process_execution/src/cache.rs | 36 +++------- .../process_execution/src/cache_tests.rs | 7 +- src/rust/engine/process_execution/src/lib.rs | 63 +++++++++++++++- .../process_execution/src/remote_cache.rs | 46 +++++------- .../src/remote_cache_tests.rs | 54 ++++++++------ src/rust/engine/process_executor/src/main.rs | 5 +- src/rust/engine/src/context.rs | 22 +++--- src/rust/engine/src/externs/interface.rs | 6 +- .../remote_cache_integration_test.py | 52 +++++++++++++- 16 files changed, 350 insertions(+), 120 deletions(-) diff --git a/pants.toml b/pants.toml index 1653e397ba4..a0a34394368 100644 --- a/pants.toml +++ b/pants.toml @@ -71,8 +71,10 @@ unmatched_build_file_globs = "error" remote_store_address = "grpcs://cache.toolchain.com:443" remote_instance_name = "main" remote_auth_plugin = "toolchain.pants.auth.plugin:toolchain_auth_plugin" -# TODO: See https://github.com/pantsbuild/pants/issues/16096. -remote_cache_eager_fetch = true +# TODO: May cause tests which experience missing digests to hang. If you experience an +# issue, please change this to `fetch` and then report an issue on: +# https://github.com/pantsbuild/pants/issues/16096. +cache_content_behavior = "validate" [anonymous-telemetry] enabled = true diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index b088c284cdb..e1315812e94 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -180,7 +180,7 @@ def __init__( store_rpc_concurrency=execution_options.remote_store_rpc_concurrency, store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit, cache_warnings_behavior=execution_options.remote_cache_warnings.value, - cache_eager_fetch=execution_options.remote_cache_eager_fetch, + cache_content_behavior=execution_options.cache_content_behavior.value, cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency, cache_read_timeout_millis=execution_options.remote_cache_read_timeout_millis, execution_extra_platform_properties=tuple( diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 24b70de8e2e..49b0eb5bb89 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -23,7 +23,7 @@ is_in_container, pants_version, ) -from pants.base.deprecated import warn_or_error +from pants.base.deprecated import resolve_conflicting_options, warn_or_error from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior from pants.engine.environment import CompleteEnvironment from pants.engine.internals.native_engine import PyExecutor @@ -112,6 +112,13 @@ class RemoteCacheWarningsBehavior(Enum): backoff = "backoff" +@enum.unique +class CacheContentBehavior(Enum): + fetch = "fetch" + validate = "validate" + defer = "defer" + + @enum.unique class AuthPluginState(Enum): OK = "ok" @@ -353,6 +360,7 @@ class ExecutionOptions: process_execution_remote_parallelism: int process_execution_cache_namespace: str | None process_execution_graceful_shutdown_timeout: int + cache_content_behavior: CacheContentBehavior process_total_child_memory_usage: int | None process_per_child_memory_usage: int @@ -365,7 +373,6 @@ class ExecutionOptions: remote_store_rpc_concurrency: int remote_store_batch_api_size_limit: int - remote_cache_eager_fetch: bool remote_cache_warnings: RemoteCacheWarningsBehavior remote_cache_rpc_concurrency: int remote_cache_read_timeout_millis: int @@ -398,6 +405,7 @@ def from_options( process_execution_cache_namespace=bootstrap_options.process_execution_cache_namespace, process_execution_graceful_shutdown_timeout=bootstrap_options.process_execution_graceful_shutdown_timeout, process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun, + cache_content_behavior=GlobalOptions.resolve_cache_content_behavior(bootstrap_options), process_total_child_memory_usage=bootstrap_options.process_total_child_memory_usage, process_per_child_memory_usage=bootstrap_options.process_per_child_memory_usage, # Remote store setup. @@ -409,7 +417,6 @@ def from_options( remote_store_rpc_concurrency=dynamic_remote_options.store_rpc_concurrency, remote_store_batch_api_size_limit=bootstrap_options.remote_store_batch_api_size_limit, # Remote cache setup. - remote_cache_eager_fetch=bootstrap_options.remote_cache_eager_fetch, remote_cache_warnings=bootstrap_options.remote_cache_warnings, remote_cache_rpc_concurrency=dynamic_remote_options.cache_rpc_concurrency, remote_cache_read_timeout_millis=bootstrap_options.remote_cache_read_timeout_millis, @@ -482,6 +489,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions: process_execution_cache_namespace=None, process_cleanup=True, local_cache=True, + cache_content_behavior=CacheContentBehavior.fetch, process_execution_local_enable_nailgun=True, process_execution_graceful_shutdown_timeout=3, # Remote store setup. @@ -495,7 +503,6 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions: remote_store_rpc_concurrency=128, remote_store_batch_api_size_limit=4194304, # Remote cache setup. - remote_cache_eager_fetch=True, remote_cache_warnings=RemoteCacheWarningsBehavior.backoff, remote_cache_rpc_concurrency=128, remote_cache_read_timeout_millis=1500, @@ -1079,6 +1086,33 @@ class BootstrapOptions: """ ), ) + cache_content_behavior = EnumOption( + "--cache-content-behavior", + advanced=True, + default=DEFAULT_EXECUTION_OPTIONS.cache_content_behavior, + help=softwrap( + """ + Controls how the content of cache entries is handled during process execution. + + When using a remote cache, the `fetch` behavior will fetch remote cache content from the + remote store before considering the cache lookup a hit, while the `validate` behavior + will only validate (for either a local or remote cache) that the content exists, without + fetching it. + + The `defer` behavior, on the other hand, will neither fetch nor validate the cache + content before calling a cache hit a hit. This "defers" actually consuming the cache + entry until a consumer consumes it. + + The `defer` mode is the most network efficient (because it will completely skip network + requests in many cases), followed by the `validate` mode (since it can still skip + fetching the content if no consumer ends up needing it). But both the `validate` and + `defer` modes rely on an experimental feature called "backtracking" to attempt to + recover if content later turns out to be missing (`validate` has a much narrower window + for backtracking though, since content would need to disappear between validation and + consumption: generally, within one `pantsd` session). + """ + ), + ) ca_certs_path = StrOption( "--ca-certs-path", advanced=True, @@ -1374,7 +1408,9 @@ class BootstrapOptions: remote_cache_eager_fetch = BoolOption( "--remote-cache-eager-fetch", advanced=True, - default=DEFAULT_EXECUTION_OPTIONS.remote_cache_eager_fetch, + default=(DEFAULT_EXECUTION_OPTIONS.cache_content_behavior != CacheContentBehavior.defer), + removal_version="2.15.0.dev1", + removal_hint="Use the `cache_content_behavior` option instead.", help=softwrap( """ Eagerly fetch relevant content from the remote store instead of lazily fetching. @@ -1887,6 +1923,29 @@ def create_py_executor(bootstrap_options: OptionValueContainer) -> PyExecutor: core_threads=bootstrap_options.rule_threads_core, max_threads=rule_threads_max ) + @staticmethod + def resolve_cache_content_behavior( + bootstrap_options: OptionValueContainer, + ) -> CacheContentBehavior: + resolved_value = resolve_conflicting_options( + old_option="remote_cache_eager_fetch", + new_option="cache_content_behavior", + old_scope="", + new_scope="", + old_container=bootstrap_options, + new_container=bootstrap_options, + ) + + if isinstance(resolved_value, bool): + # Is `remote_cache_eager_fetch`. + return CacheContentBehavior.fetch if resolved_value else CacheContentBehavior.defer + elif isinstance(resolved_value, CacheContentBehavior): + return resolved_value + else: + raise TypeError( + f"Unexpected option value for `cache_content_behavior`: {resolved_value}" + ) + @staticmethod def compute_pants_ignore(buildroot, global_options): """Computes the merged value of the `--pants-ignore` flag. diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index e5d038e7b68..6cf14e84535 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -35,7 +35,7 @@ mod snapshot_ops_tests; mod snapshot_tests; pub use crate::snapshot_ops::{SnapshotOps, SubsetParams}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{self, Debug, Display}; use std::fs::OpenOptions; use std::future::Future; @@ -749,8 +749,11 @@ impl Store { if Store::upload_is_faster_than_checking_whether_to_upload(&ingested_digests) { ingested_digests.keys().cloned().collect() } else { - let request = remote.find_missing_blobs_request(ingested_digests.keys()); - remote.list_missing_digests(request).await? + remote + .list_missing_digests( + remote.find_missing_blobs_request(ingested_digests.keys().cloned()), + ) + .await? }; future::try_join_all( @@ -853,6 +856,69 @@ impl Store { .await } + /// + /// Return true if the given directory and file digests are loadable from either the local or remote + /// Store, without downloading any file content. + /// + /// The given directory digests will be recursively expanded, so it is not necessary to + /// explicitly list their file digests in the file digests list. + /// + pub async fn exists_recursive( + &self, + directory_digests: impl IntoIterator, + file_digests: impl IntoIterator, + ) -> Result { + // Load directories, which implicitly validates that they exist. + let digest_tries = future::try_join_all( + directory_digests + .into_iter() + .map(|dd| self.load_digest_trie(dd)), + ) + .await?; + + // Collect all file digests. + let mut file_digests = file_digests.into_iter().collect::>(); + for digest_trie in digest_tries { + digest_trie.walk(&mut |_, entry| match entry { + directory::Entry::File(f) => { + file_digests.insert(f.digest()); + } + directory::Entry::Directory(_) => (), + }); + } + + // Filter out file digests that exist locally. + // TODO: Implement a local batch API: see https://github.com/pantsbuild/pants/issues/16400. + let local_file_exists = future::try_join_all( + file_digests + .iter() + .map(|file_digest| self.local.exists(EntryType::File, *file_digest)) + .collect::>(), + ) + .await?; + let missing_locally = local_file_exists + .into_iter() + .zip(file_digests.into_iter()) + .filter_map(|(exists, digest)| if exists { None } else { Some(digest) }) + .collect::>(); + + // If there are any digests which don't exist locally, check remotely. + if missing_locally.is_empty() { + return Ok(true); + } + let remote = if let Some(remote) = self.remote.clone() { + remote + } else { + return Ok(false); + }; + let missing = remote + .store + .list_missing_digests(remote.store.find_missing_blobs_request(missing_locally)) + .await?; + + Ok(missing.is_empty()) + } + /// /// Ensure that a directory is locally loadable, which will download it from the Remote store as /// a sideeffect (if one is configured). diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index 0180c6ca35b..9fca35d9135 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -338,6 +338,20 @@ impl ByteStore { .await } + pub async fn exists(&self, entry_type: EntryType, digest: Digest) -> Result { + if digest == EMPTY_DIGEST { + // Avoid I/O for this case. This allows some client-provided operations (like merging + // snapshots) to work without needing to first store the empty snapshot. + return Ok(true); + } + + let dbs = match entry_type { + EntryType::Directory => self.inner.directory_dbs.clone(), + EntryType::File => self.inner.file_dbs.clone(), + }?; + dbs.exists(digest.hash).await + } + /// /// Loads bytes from the underlying LMDB store using the given function. Because the database is /// blocking, this accepts a function that views a slice rather than returning a clone of the diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index e4db49cc353..fb533f7c784 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -480,13 +480,13 @@ impl ByteStore { } } - pub fn find_missing_blobs_request<'a, Digests: Iterator>( + pub fn find_missing_blobs_request( &self, - digests: Digests, + digests: impl IntoIterator, ) -> remexec::FindMissingBlobsRequest { remexec::FindMissingBlobsRequest { instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(), - blob_digests: digests.map(|d| d.into()).collect::>(), + blob_digests: digests.into_iter().map(|d| d.into()).collect::>(), } } diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 52363edc543..87dba970f95 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -259,9 +259,7 @@ async fn list_missing_digests_none_missing() { let store = new_byte_store(&cas); assert_eq!( store - .list_missing_digests( - store.find_missing_blobs_request(vec![TestData::roland().digest()].iter()), - ) + .list_missing_digests(store.find_missing_blobs_request(vec![TestData::roland().digest()]),) .await, Ok(HashSet::new()) ); @@ -281,7 +279,7 @@ async fn list_missing_digests_some_missing() { assert_eq!( store - .list_missing_digests(store.find_missing_blobs_request(vec![digest].iter()),) + .list_missing_digests(store.find_missing_blobs_request(vec![digest])) .await, Ok(digest_set) ); @@ -295,9 +293,7 @@ async fn list_missing_digests_error() { let store = new_byte_store(&cas); let error = store - .list_missing_digests( - store.find_missing_blobs_request(vec![TestData::roland().digest()].iter()), - ) + .list_missing_digests(store.find_missing_blobs_request(vec![TestData::roland().digest()])) .await .expect_err("Want error"); assert!( diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index 02f22dc4803..3fb5f94c97b 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -5,7 +5,6 @@ use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; use cache::PersistentCache; -use futures::{future, FutureExt}; use log::{debug, warn}; use prost::Message; use protos::gen::build::bazel::remote::execution::v2 as remexec; @@ -17,8 +16,8 @@ use workunit_store::{ }; use crate::{ - Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, - ProcessMetadata, ProcessResultSource, + check_cache_content, CacheContentBehavior, Context, FallibleProcessResultWithPlatform, Platform, + Process, ProcessCacheScope, ProcessError, ProcessMetadata, ProcessResultSource, }; // TODO: Consider moving into protobuf as a CacheValue type. @@ -34,7 +33,7 @@ pub struct CommandRunner { cache: PersistentCache, file_store: Store, cache_read: bool, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, metadata: ProcessMetadata, } @@ -44,7 +43,7 @@ impl CommandRunner { cache: PersistentCache, file_store: Store, cache_read: bool, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, metadata: ProcessMetadata, ) -> CommandRunner { CommandRunner { @@ -52,7 +51,7 @@ impl CommandRunner { cache, file_store, cache_read, - eager_fetch, + cache_content_behavior, metadata, } } @@ -205,28 +204,11 @@ impl CommandRunner { return Ok(None); }; - // If eager_fetch is enabled, ensure that all digests in the result are loadable, erroring - // if any are not. If eager_fetch is disabled, a Digest which is discovered to be missing later - // on during execution will cause backtracking. - if self.eager_fetch { - let _ = future::try_join_all(vec![ - self - .file_store - .ensure_local_has_file(result.stdout_digest) - .boxed(), - self - .file_store - .ensure_local_has_file(result.stderr_digest) - .boxed(), - self - .file_store - .ensure_local_has_recursive_directory(result.output_directory.clone()) - .boxed(), - ]) - .await?; + if check_cache_content(&result, &self.file_store, self.cache_content_behavior).await? { + Ok(Some(result)) + } else { + Ok(None) } - - Ok(Some(result)) } async fn store( diff --git a/src/rust/engine/process_execution/src/cache_tests.rs b/src/rust/engine/process_execution/src/cache_tests.rs index 028f5bb2f7b..68a8b07ab3e 100644 --- a/src/rust/engine/process_execution/src/cache_tests.rs +++ b/src/rust/engine/process_execution/src/cache_tests.rs @@ -11,8 +11,9 @@ use testutil::relative_paths; use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::{ - CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, ImmutableInputs, - NamedCaches, Process, ProcessError, ProcessMetadata, + CacheContentBehavior, CommandRunner as CommandRunnerTrait, Context, + FallibleProcessResultWithPlatform, ImmutableInputs, NamedCaches, Process, ProcessError, + ProcessMetadata, }; struct RoundtripResults { @@ -59,7 +60,7 @@ fn create_cached_runner( cache, store, true, - true, + CacheContentBehavior::Fetch, ProcessMetadata::default(), )); diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index df9c74fd182..953fe120837 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -44,7 +44,7 @@ use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ExecutedActionMetadata; use serde::{Deserialize, Serialize}; use store::{SnapshotOps, Store, StoreError}; -use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; +use workunit_store::{in_workunit, Level, RunId, RunningWorkunit, WorkunitStore}; pub mod bounded; #[cfg(test)] @@ -755,6 +755,67 @@ impl From for &'static str { } } +#[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)] +#[strum(serialize_all = "snake_case")] +pub enum CacheContentBehavior { + Fetch, + Validate, + Defer, +} + +/// +/// Optionally validate that all digests in the result are loadable, returning false if any are not. +/// +/// If content loading is deferred, a Digest which is discovered to be missing later on during +/// execution will cause backtracking. +/// +pub(crate) async fn check_cache_content( + response: &FallibleProcessResultWithPlatform, + store: &Store, + cache_content_behavior: CacheContentBehavior, +) -> Result { + match cache_content_behavior { + CacheContentBehavior::Fetch => { + let response = response.clone(); + let fetch_result = in_workunit!( + "eager_fetch_action_cache", + Level::Trace, + |_workunit| async move { + try_join_all(vec![ + store.ensure_local_has_file(response.stdout_digest).boxed(), + store.ensure_local_has_file(response.stderr_digest).boxed(), + store + .ensure_local_has_recursive_directory(response.output_directory) + .boxed(), + ]) + .await + } + ) + .await; + match fetch_result { + Err(StoreError::MissingDigest { .. }) => Ok(false), + Ok(_) => Ok(true), + Err(e) => Err(e), + } + } + CacheContentBehavior::Validate => { + let directory_digests = vec![response.output_directory.clone()]; + let file_digests = vec![response.stdout_digest, response.stderr_digest]; + in_workunit!( + "eager_validate_action_cache", + Level::Trace, + |_workunit| async move { + store + .exists_recursive(directory_digests, file_digests) + .await + } + ) + .await + } + CacheContentBehavior::Defer => Ok(true), + } +} + #[derive(Clone)] pub struct Context { workunit_store: WorkunitStore, diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 8162813bb6f..2f7091671fc 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use fs::{directory, DigestTrie, RelativePath}; -use futures::future::{self, BoxFuture, TryFutureExt}; +use futures::future::{BoxFuture, TryFutureExt}; use futures::FutureExt; use grpc_util::retry::{retry_call, status_is_retryable}; use grpc_util::{headers_to_http_header_map, layered_service, status_to_str, LayeredService}; @@ -23,8 +23,8 @@ use workunit_store::{ use crate::remote::{apply_headers, make_execute_request, populate_fallible_execution_result}; use crate::{ - Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, - ProcessMetadata, ProcessResultSource, + check_cache_content, CacheContentBehavior, Context, FallibleProcessResultWithPlatform, Platform, + Process, ProcessCacheScope, ProcessError, ProcessMetadata, ProcessResultSource, }; use tonic::{Code, Request, Status}; @@ -54,7 +54,7 @@ pub struct CommandRunner { platform: Platform, cache_read: bool, cache_write: bool, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, warnings_behavior: RemoteCacheWarningsBehavior, read_errors_counter: Arc>>, write_errors_counter: Arc>>, @@ -74,7 +74,7 @@ impl CommandRunner { cache_read: bool, cache_write: bool, warnings_behavior: RemoteCacheWarningsBehavior, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, concurrency_limit: usize, read_timeout: Duration, ) -> Result { @@ -106,7 +106,7 @@ impl CommandRunner { platform, cache_read, cache_write, - eager_fetch, + cache_content_behavior, warnings_behavior, read_errors_counter: Arc::new(Mutex::new(BTreeMap::new())), write_errors_counter: Arc::new(Mutex::new(BTreeMap::new())), @@ -264,7 +264,7 @@ impl CommandRunner { &context, self.action_cache_client.clone(), self.store.clone(), - self.eager_fetch, + self.cache_content_behavior, self.read_timeout, ) .await; @@ -511,7 +511,7 @@ async fn check_action_cache( context: &Context, action_cache_client: Arc>, store: Store, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, timeout_duration: Duration, ) -> Result, ProcessError> { in_workunit!( @@ -555,29 +555,17 @@ async fn check_action_cache( .await .map_err(|e| Status::unavailable(format!("Output roots could not be loaded: {e}")))?; - if eager_fetch { - // NB: `ensure_local_has_file` and `ensure_local_has_recursive_directory` are internally - // retried. - let response = response.clone(); - in_workunit!( - "eager_fetch_action_cache", - Level::Trace, - desc = Some("eagerly fetching after action cache hit".to_owned()), - |_workunit| async move { - future::try_join_all(vec![ - store.ensure_local_has_file(response.stdout_digest).boxed(), - store.ensure_local_has_file(response.stderr_digest).boxed(), - store - .ensure_local_has_recursive_directory(response.output_directory) - .boxed(), - ]) - .await - } - ) + let cache_content_valid = check_cache_content(&response, &store, cache_content_behavior) .await - .map_err(|e| Status::unavailable(format!("Output content could not be loaded: {e}")))?; + .map_err(|e| { + Status::unavailable(format!("Output content could not be validated: {e}")) + })?; + + if cache_content_valid { + Ok(response) + } else { + Err(Status::not_found("")) } - Ok(response) }) .await; diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 9a5a4100ae5..012bcf0048f 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -20,9 +20,9 @@ use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; use crate::remote::{ensure_action_stored_locally, make_execute_request}; use crate::{ - CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, Platform, - Process, ProcessError, ProcessMetadata, ProcessResultMetadata, ProcessResultSource, - RemoteCacheWarningsBehavior, + CacheContentBehavior, CommandRunner as CommandRunnerTrait, Context, + FallibleProcessResultWithPlatform, Platform, Process, ProcessError, ProcessMetadata, + ProcessResultMetadata, ProcessResultSource, RemoteCacheWarningsBehavior, }; const CACHE_READ_TIMEOUT: Duration = Duration::from_secs(5); @@ -127,7 +127,7 @@ fn create_local_runner( fn create_cached_runner( local: Box, store_setup: &StoreSetup, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, ) -> Box { Box::new( crate::remote_cache::CommandRunner::new( @@ -142,7 +142,7 @@ fn create_cached_runner( true, true, RemoteCacheWarningsBehavior::FirstOnly, - eager_fetch, + cache_content_behavior, 256, CACHE_READ_TIMEOUT, ) @@ -170,7 +170,7 @@ async fn cache_read_success() { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; store_setup @@ -194,7 +194,7 @@ async fn cache_read_skipped_on_action_cache_errors() { let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 500); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; store_setup @@ -224,14 +224,14 @@ async fn cache_read_skipped_on_action_cache_errors() { assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); } -/// If the cache has any issues during reads from the store during eager_fetch, we should gracefully +/// If the cache cannot find a digest during a read from the store during fetch, we should gracefully /// fallback to the local runner. #[tokio::test] -async fn cache_read_skipped_on_store_errors() { +async fn cache_read_skipped_on_missing_digest() { let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 500); - let cache_runner = create_cached_runner(local_runner, &store_setup, true); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Fetch); // Claim that the process has a non-empty and not-persisted stdout digest. let (process, action_digest) = create_process(&store_setup).await; @@ -243,7 +243,9 @@ async fn cache_read_skipped_on_store_errors() { ); assert_eq!( - workunit_store.get_metrics().get("remote_cache_read_errors"), + workunit_store + .get_metrics() + .get("remote_cache_requests_uncached"), None ); assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -253,8 +255,10 @@ async fn cache_read_skipped_on_store_errors() { .unwrap(); assert_eq!(remote_result.exit_code, 1); assert_eq!( - workunit_store.get_metrics().get("remote_cache_read_errors"), - Some(&1) + workunit_store + .get_metrics() + .get("remote_cache_requests_uncached"), + Some(&1), ); assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); } @@ -266,10 +270,13 @@ async fn cache_read_skipped_on_store_errors() { async fn cache_read_eager_fetch() { let (_, mut workunit) = WorkunitStore::setup_for_tests(); - async fn run_process(eager_fetch: bool, workunit: &mut RunningWorkunit) -> (i32, usize) { + async fn run_process( + cache_content_behavior: CacheContentBehavior, + workunit: &mut RunningWorkunit, + ) -> (i32, usize) { let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000); - let cache_runner = create_cached_runner(local_runner, &store_setup, eager_fetch); + let cache_runner = create_cached_runner(local_runner, &store_setup, cache_content_behavior); let (process, action_digest) = create_process(&store_setup).await; store_setup.cas.action_cache.insert( @@ -289,11 +296,13 @@ async fn cache_read_eager_fetch() { (remote_result.exit_code, final_local_count) } - let (lazy_exit_code, lazy_local_call_count) = run_process(false, &mut workunit).await; + let (lazy_exit_code, lazy_local_call_count) = + run_process(CacheContentBehavior::Defer, &mut workunit).await; assert_eq!(lazy_exit_code, 0); assert_eq!(lazy_local_call_count, 0); - let (eager_exit_code, eager_local_call_count) = run_process(true, &mut workunit).await; + let (eager_exit_code, eager_local_call_count) = + run_process(CacheContentBehavior::Fetch, &mut workunit).await; assert_eq!(eager_exit_code, 1); assert_eq!(eager_local_call_count, 1); } @@ -314,7 +323,8 @@ async fn cache_read_speculation() { .build(), ); let (local_runner, local_runner_call_counter) = create_local_runner(1, local_delay_ms); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = + create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; if cache_hit { @@ -355,7 +365,7 @@ async fn cache_write_success() { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(0, 100); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -387,7 +397,7 @@ async fn cache_write_not_for_failures() { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 100); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, _action_digest) = create_process(&store_setup).await; assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -415,7 +425,7 @@ async fn cache_write_does_not_block() { .build(), ); let (local_runner, local_runner_call_counter) = create_local_runner(0, 100); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -608,7 +618,7 @@ async fn make_action_result_basic() { true, true, RemoteCacheWarningsBehavior::FirstOnly, - false, + CacheContentBehavior::Defer, 256, CACHE_READ_TIMEOUT, ) diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index dc0cdbc08ab..139d4c870b5 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -36,7 +36,8 @@ use std::time::Duration; use fs::{DirectoryDigest, Permissions, RelativePath}; use hashing::{Digest, Fingerprint}; use process_execution::{ - Context, ImmutableInputs, InputDigests, NamedCaches, Platform, ProcessCacheScope, ProcessMetadata, + CacheContentBehavior, Context, ImmutableInputs, InputDigests, NamedCaches, Platform, + ProcessCacheScope, ProcessMetadata, }; use prost::Message; use protos::gen::build::bazel::remote::execution::v2::{Action, Command}; @@ -318,7 +319,7 @@ async fn main() { true, true, process_execution::remote_cache::RemoteCacheWarningsBehavior::Backoff, - false, + CacheContentBehavior::Defer, args.cache_rpc_concurrency, Duration::from_secs(2), ) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 2e0ae6643af..f2f6316a11c 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -26,8 +26,8 @@ use hashing::Digest; use log::info; use parking_lot::Mutex; use process_execution::{ - self, bounded, local, nailgun, remote, remote_cache, CommandRunner, ImmutableInputs, NamedCaches, - Platform, ProcessMetadata, RemoteCacheWarningsBehavior, + self, bounded, local, nailgun, remote, remote_cache, CacheContentBehavior, CommandRunner, + ImmutableInputs, NamedCaches, Platform, ProcessMetadata, RemoteCacheWarningsBehavior, }; use protos::gen::build::bazel::remote::execution::v2::ServerCapabilities; use regex::Regex; @@ -93,7 +93,7 @@ pub struct RemotingOptions { pub store_rpc_concurrency: usize, pub store_batch_api_size_limit: usize, pub cache_warnings_behavior: RemoteCacheWarningsBehavior, - pub cache_eager_fetch: bool, + pub cache_content_behavior: CacheContentBehavior, pub cache_rpc_concurrency: usize, pub cache_read_timeout: Duration, pub execution_extra_platform_properties: Vec<(String, String)>, @@ -273,9 +273,13 @@ impl Core { local_cache_read: bool, local_cache_write: bool, ) -> Result, String> { - // TODO: Until we can deprecate letting the flag default, we implicitly disable - // eager_fetch when remote execution is in use. - let eager_fetch = remoting_opts.cache_eager_fetch && !remoting_opts.execution_enable; + // TODO: Until we can deprecate letting the flag default, we implicitly default + // cache_content_behavior when remote execution is in use. + let cache_content_behavior = if remoting_opts.execution_enable { + CacheContentBehavior::Defer + } else { + remoting_opts.cache_content_behavior + }; if remote_cache_read || remote_cache_write { runner = Arc::new(remote_cache::CommandRunner::new( runner, @@ -289,7 +293,7 @@ impl Core { remote_cache_read, remote_cache_write, remoting_opts.cache_warnings_behavior, - eager_fetch, + cache_content_behavior, remoting_opts.cache_rpc_concurrency, remoting_opts.cache_read_timeout, )?); @@ -301,7 +305,7 @@ impl Core { local_cache.clone(), full_store.clone(), local_cache_read, - eager_fetch, + cache_content_behavior, process_execution_metadata.clone(), )); } @@ -475,7 +479,7 @@ impl Core { )?; let store = if (exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write) - && remoting_opts.cache_eager_fetch + && remoting_opts.cache_content_behavior == CacheContentBehavior::Defer { // In remote cache mode with eager fetching, the only interaction with the remote CAS // should be through the remote cache code paths. Thus, the store seen by the rest of the diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 839f2953194..b22ce5b1462 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -29,7 +29,7 @@ use log::{self, debug, error, warn, Log}; use logging::logger::PANTS_LOGGER; use logging::{Logger, PythonLogLevel}; use petgraph::graph::{DiGraph, Graph}; -use process_execution::RemoteCacheWarningsBehavior; +use process_execution::{CacheContentBehavior, RemoteCacheWarningsBehavior}; use pyo3::exceptions::{PyException, PyIOError, PyKeyboardInterrupt, PyValueError}; use pyo3::prelude::{ pyclass, pyfunction, pymethods, pymodule, wrap_pyfunction, PyModule, PyObject, @@ -289,7 +289,7 @@ impl PyRemotingOptions { store_rpc_concurrency: usize, store_batch_api_size_limit: usize, cache_warnings_behavior: String, - cache_eager_fetch: bool, + cache_content_behavior: String, cache_rpc_concurrency: usize, cache_read_timeout_millis: u64, execution_extra_platform_properties: Vec<(String, String)>, @@ -312,7 +312,7 @@ impl PyRemotingOptions { store_batch_api_size_limit, cache_warnings_behavior: RemoteCacheWarningsBehavior::from_str(&cache_warnings_behavior) .unwrap(), - cache_eager_fetch, + cache_content_behavior: CacheContentBehavior::from_str(&cache_content_behavior).unwrap(), cache_rpc_concurrency, cache_read_timeout: Duration::from_millis(cache_read_timeout_millis), execution_extra_platform_properties, diff --git a/tests/python/pants_test/integration/remote_cache_integration_test.py b/tests/python/pants_test/integration/remote_cache_integration_test.py index 0ea60b4e4bd..f52560f33e1 100644 --- a/tests/python/pants_test/integration/remote_cache_integration_test.py +++ b/tests/python/pants_test/integration/remote_cache_integration_test.py @@ -5,6 +5,8 @@ import time +import pytest + from pants.core.util_rules import distdir from pants.core.util_rules.distdir import DistDir from pants.engine.fs import Digest, DigestContents, DigestEntries, FileDigest, FileEntry, Workspace @@ -12,7 +14,7 @@ from pants.engine.internals.native_engine import PyExecutor, PyStubCAS from pants.engine.process import Process, ProcessResult from pants.engine.rules import Get, SubsystemRule, goal_rule, rule -from pants.option.global_options import RemoteCacheWarningsBehavior +from pants.option.global_options import CacheContentBehavior, RemoteCacheWarningsBehavior from pants.testutil.pants_integration_test import run_pants from pants.testutil.rule_runner import QueryRule, RuleRunner from pants.util.logging import LogLevel @@ -113,7 +115,7 @@ def run() -> tuple[FileDigest, dict[str, int]]: rules=[entries_from_process, QueryRule(ProcessOutputEntries, [Process])], isolated_local_store=True, bootstrap_args=[ - "--no-remote-cache-eager-fetch", + "--cache-content-behavior=defer", "--no-local-cache", *remote_cache_args(cas.address), ], @@ -193,7 +195,7 @@ def run() -> dict[str, int]: rules=[mock_run, *distdir.rules(), SubsystemRule(MockRunSubsystem)], isolated_local_store=True, bootstrap_args=[ - "--no-remote-cache-eager-fetch", + "--cache-content-behavior=defer", "--no-local-cache", *remote_cache_args(cas.address), ], @@ -221,3 +223,47 @@ def run() -> dict[str, int]: assert metrics2["remote_cache_requests"] == 1 assert metrics2["remote_cache_requests_cached"] == 1 assert metrics2["backtrack_attempts"] == 1 + + +@pytest.mark.parametrize( + "cache_content_behavior", [CacheContentBehavior.validate, CacheContentBehavior.fetch] +) +def test_eager_validation(cache_content_behavior: CacheContentBehavior) -> None: + """Tests that --cache-content-behavior={validate,fetch} fail a lookup for missing content.""" + executor = PyExecutor(core_threads=2, max_threads=4) + cas = PyStubCAS.builder().build(executor) + + def run() -> dict[str, int]: + # Use an isolated store to ensure that the only content is in the remote/stub cache. + rule_runner = RuleRunner( + rules=[mock_run, *distdir.rules(), SubsystemRule(MockRunSubsystem)], + isolated_local_store=True, + bootstrap_args=[ + f"--cache-content-behavior={cache_content_behavior.value}", + "--no-local-cache", + *remote_cache_args(cas.address), + ], + ) + result = rule_runner.run_goal_rule(MockRun, args=[]) + assert result.exit_code == 0 + + # Wait for any async cache writes to complete. + time.sleep(1) + return rule_runner.scheduler.get_metrics() + + # Run once to populate the remote cache, and validate that there is one entry afterwards. + assert cas.action_cache_len() == 0 + metrics1 = run() + assert cas.action_cache_len() == 1 + assert metrics1["remote_cache_requests"] == 1 + assert metrics1["remote_cache_requests_uncached"] == 1 + + # Then, remove the content from the remote store and run again. + assert cas.remove( + FileDigest("434728a410a78f56fc1b5899c3593436e61ab0c731e9072d95e96db290205e53", 8) + ) + metrics2 = run() + # Validate that we missed the cache, and that we didn't backtrack. + assert metrics2["remote_cache_requests"] == 1 + assert metrics2["remote_cache_requests_uncached"] == 1 + assert "backtrack_attempts" not in metrics2