diff --git a/pants.toml b/pants.toml index 1653e397ba4..a0a34394368 100644 --- a/pants.toml +++ b/pants.toml @@ -71,8 +71,10 @@ unmatched_build_file_globs = "error" remote_store_address = "grpcs://cache.toolchain.com:443" remote_instance_name = "main" remote_auth_plugin = "toolchain.pants.auth.plugin:toolchain_auth_plugin" -# TODO: See https://github.com/pantsbuild/pants/issues/16096. -remote_cache_eager_fetch = true +# TODO: May cause tests which experience missing digests to hang. If you experience an +# issue, please change this to `fetch` and then report an issue on: +# https://github.com/pantsbuild/pants/issues/16096. +cache_content_behavior = "validate" [anonymous-telemetry] enabled = true diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index b088c284cdb..e1315812e94 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -180,7 +180,7 @@ def __init__( store_rpc_concurrency=execution_options.remote_store_rpc_concurrency, store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit, cache_warnings_behavior=execution_options.remote_cache_warnings.value, - cache_eager_fetch=execution_options.remote_cache_eager_fetch, + cache_content_behavior=execution_options.cache_content_behavior.value, cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency, cache_read_timeout_millis=execution_options.remote_cache_read_timeout_millis, execution_extra_platform_properties=tuple( diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 24b70de8e2e..49b0eb5bb89 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -23,7 +23,7 @@ is_in_container, pants_version, ) -from pants.base.deprecated import warn_or_error +from pants.base.deprecated import resolve_conflicting_options, warn_or_error from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior from pants.engine.environment import CompleteEnvironment from pants.engine.internals.native_engine import PyExecutor @@ -112,6 +112,13 @@ class RemoteCacheWarningsBehavior(Enum): backoff = "backoff" +@enum.unique +class CacheContentBehavior(Enum): + fetch = "fetch" + validate = "validate" + defer = "defer" + + @enum.unique class AuthPluginState(Enum): OK = "ok" @@ -353,6 +360,7 @@ class ExecutionOptions: process_execution_remote_parallelism: int process_execution_cache_namespace: str | None process_execution_graceful_shutdown_timeout: int + cache_content_behavior: CacheContentBehavior process_total_child_memory_usage: int | None process_per_child_memory_usage: int @@ -365,7 +373,6 @@ class ExecutionOptions: remote_store_rpc_concurrency: int remote_store_batch_api_size_limit: int - remote_cache_eager_fetch: bool remote_cache_warnings: RemoteCacheWarningsBehavior remote_cache_rpc_concurrency: int remote_cache_read_timeout_millis: int @@ -398,6 +405,7 @@ def from_options( process_execution_cache_namespace=bootstrap_options.process_execution_cache_namespace, process_execution_graceful_shutdown_timeout=bootstrap_options.process_execution_graceful_shutdown_timeout, process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun, + cache_content_behavior=GlobalOptions.resolve_cache_content_behavior(bootstrap_options), process_total_child_memory_usage=bootstrap_options.process_total_child_memory_usage, process_per_child_memory_usage=bootstrap_options.process_per_child_memory_usage, # Remote store setup. @@ -409,7 +417,6 @@ def from_options( remote_store_rpc_concurrency=dynamic_remote_options.store_rpc_concurrency, remote_store_batch_api_size_limit=bootstrap_options.remote_store_batch_api_size_limit, # Remote cache setup. - remote_cache_eager_fetch=bootstrap_options.remote_cache_eager_fetch, remote_cache_warnings=bootstrap_options.remote_cache_warnings, remote_cache_rpc_concurrency=dynamic_remote_options.cache_rpc_concurrency, remote_cache_read_timeout_millis=bootstrap_options.remote_cache_read_timeout_millis, @@ -482,6 +489,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions: process_execution_cache_namespace=None, process_cleanup=True, local_cache=True, + cache_content_behavior=CacheContentBehavior.fetch, process_execution_local_enable_nailgun=True, process_execution_graceful_shutdown_timeout=3, # Remote store setup. @@ -495,7 +503,6 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions: remote_store_rpc_concurrency=128, remote_store_batch_api_size_limit=4194304, # Remote cache setup. - remote_cache_eager_fetch=True, remote_cache_warnings=RemoteCacheWarningsBehavior.backoff, remote_cache_rpc_concurrency=128, remote_cache_read_timeout_millis=1500, @@ -1079,6 +1086,33 @@ class BootstrapOptions: """ ), ) + cache_content_behavior = EnumOption( + "--cache-content-behavior", + advanced=True, + default=DEFAULT_EXECUTION_OPTIONS.cache_content_behavior, + help=softwrap( + """ + Controls how the content of cache entries is handled during process execution. + + When using a remote cache, the `fetch` behavior will fetch remote cache content from the + remote store before considering the cache lookup a hit, while the `validate` behavior + will only validate (for either a local or remote cache) that the content exists, without + fetching it. + + The `defer` behavior, on the other hand, will neither fetch nor validate the cache + content before calling a cache hit a hit. This "defers" actually consuming the cache + entry until a consumer consumes it. + + The `defer` mode is the most network efficient (because it will completely skip network + requests in many cases), followed by the `validate` mode (since it can still skip + fetching the content if no consumer ends up needing it). But both the `validate` and + `defer` modes rely on an experimental feature called "backtracking" to attempt to + recover if content later turns out to be missing (`validate` has a much narrower window + for backtracking though, since content would need to disappear between validation and + consumption: generally, within one `pantsd` session). + """ + ), + ) ca_certs_path = StrOption( "--ca-certs-path", advanced=True, @@ -1374,7 +1408,9 @@ class BootstrapOptions: remote_cache_eager_fetch = BoolOption( "--remote-cache-eager-fetch", advanced=True, - default=DEFAULT_EXECUTION_OPTIONS.remote_cache_eager_fetch, + default=(DEFAULT_EXECUTION_OPTIONS.cache_content_behavior != CacheContentBehavior.defer), + removal_version="2.15.0.dev1", + removal_hint="Use the `cache_content_behavior` option instead.", help=softwrap( """ Eagerly fetch relevant content from the remote store instead of lazily fetching. @@ -1887,6 +1923,29 @@ def create_py_executor(bootstrap_options: OptionValueContainer) -> PyExecutor: core_threads=bootstrap_options.rule_threads_core, max_threads=rule_threads_max ) + @staticmethod + def resolve_cache_content_behavior( + bootstrap_options: OptionValueContainer, + ) -> CacheContentBehavior: + resolved_value = resolve_conflicting_options( + old_option="remote_cache_eager_fetch", + new_option="cache_content_behavior", + old_scope="", + new_scope="", + old_container=bootstrap_options, + new_container=bootstrap_options, + ) + + if isinstance(resolved_value, bool): + # Is `remote_cache_eager_fetch`. + return CacheContentBehavior.fetch if resolved_value else CacheContentBehavior.defer + elif isinstance(resolved_value, CacheContentBehavior): + return resolved_value + else: + raise TypeError( + f"Unexpected option value for `cache_content_behavior`: {resolved_value}" + ) + @staticmethod def compute_pants_ignore(buildroot, global_options): """Computes the merged value of the `--pants-ignore` flag. diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index e5d038e7b68..6cf14e84535 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -35,7 +35,7 @@ mod snapshot_ops_tests; mod snapshot_tests; pub use crate::snapshot_ops::{SnapshotOps, SubsetParams}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{self, Debug, Display}; use std::fs::OpenOptions; use std::future::Future; @@ -749,8 +749,11 @@ impl Store { if Store::upload_is_faster_than_checking_whether_to_upload(&ingested_digests) { ingested_digests.keys().cloned().collect() } else { - let request = remote.find_missing_blobs_request(ingested_digests.keys()); - remote.list_missing_digests(request).await? + remote + .list_missing_digests( + remote.find_missing_blobs_request(ingested_digests.keys().cloned()), + ) + .await? }; future::try_join_all( @@ -853,6 +856,69 @@ impl Store { .await } + /// + /// Return true if the given directory and file digests are loadable from either the local or remote + /// Store, without downloading any file content. + /// + /// The given directory digests will be recursively expanded, so it is not necessary to + /// explicitly list their file digests in the file digests list. + /// + pub async fn exists_recursive( + &self, + directory_digests: impl IntoIterator, + file_digests: impl IntoIterator, + ) -> Result { + // Load directories, which implicitly validates that they exist. + let digest_tries = future::try_join_all( + directory_digests + .into_iter() + .map(|dd| self.load_digest_trie(dd)), + ) + .await?; + + // Collect all file digests. + let mut file_digests = file_digests.into_iter().collect::>(); + for digest_trie in digest_tries { + digest_trie.walk(&mut |_, entry| match entry { + directory::Entry::File(f) => { + file_digests.insert(f.digest()); + } + directory::Entry::Directory(_) => (), + }); + } + + // Filter out file digests that exist locally. + // TODO: Implement a local batch API: see https://github.com/pantsbuild/pants/issues/16400. + let local_file_exists = future::try_join_all( + file_digests + .iter() + .map(|file_digest| self.local.exists(EntryType::File, *file_digest)) + .collect::>(), + ) + .await?; + let missing_locally = local_file_exists + .into_iter() + .zip(file_digests.into_iter()) + .filter_map(|(exists, digest)| if exists { None } else { Some(digest) }) + .collect::>(); + + // If there are any digests which don't exist locally, check remotely. + if missing_locally.is_empty() { + return Ok(true); + } + let remote = if let Some(remote) = self.remote.clone() { + remote + } else { + return Ok(false); + }; + let missing = remote + .store + .list_missing_digests(remote.store.find_missing_blobs_request(missing_locally)) + .await?; + + Ok(missing.is_empty()) + } + /// /// Ensure that a directory is locally loadable, which will download it from the Remote store as /// a sideeffect (if one is configured). diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index 0180c6ca35b..9fca35d9135 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -338,6 +338,20 @@ impl ByteStore { .await } + pub async fn exists(&self, entry_type: EntryType, digest: Digest) -> Result { + if digest == EMPTY_DIGEST { + // Avoid I/O for this case. This allows some client-provided operations (like merging + // snapshots) to work without needing to first store the empty snapshot. + return Ok(true); + } + + let dbs = match entry_type { + EntryType::Directory => self.inner.directory_dbs.clone(), + EntryType::File => self.inner.file_dbs.clone(), + }?; + dbs.exists(digest.hash).await + } + /// /// Loads bytes from the underlying LMDB store using the given function. Because the database is /// blocking, this accepts a function that views a slice rather than returning a clone of the diff --git a/src/rust/engine/fs/store/src/remote.rs b/src/rust/engine/fs/store/src/remote.rs index e4db49cc353..fb533f7c784 100644 --- a/src/rust/engine/fs/store/src/remote.rs +++ b/src/rust/engine/fs/store/src/remote.rs @@ -480,13 +480,13 @@ impl ByteStore { } } - pub fn find_missing_blobs_request<'a, Digests: Iterator>( + pub fn find_missing_blobs_request( &self, - digests: Digests, + digests: impl IntoIterator, ) -> remexec::FindMissingBlobsRequest { remexec::FindMissingBlobsRequest { instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(), - blob_digests: digests.map(|d| d.into()).collect::>(), + blob_digests: digests.into_iter().map(|d| d.into()).collect::>(), } } diff --git a/src/rust/engine/fs/store/src/remote_tests.rs b/src/rust/engine/fs/store/src/remote_tests.rs index 52363edc543..87dba970f95 100644 --- a/src/rust/engine/fs/store/src/remote_tests.rs +++ b/src/rust/engine/fs/store/src/remote_tests.rs @@ -259,9 +259,7 @@ async fn list_missing_digests_none_missing() { let store = new_byte_store(&cas); assert_eq!( store - .list_missing_digests( - store.find_missing_blobs_request(vec![TestData::roland().digest()].iter()), - ) + .list_missing_digests(store.find_missing_blobs_request(vec![TestData::roland().digest()]),) .await, Ok(HashSet::new()) ); @@ -281,7 +279,7 @@ async fn list_missing_digests_some_missing() { assert_eq!( store - .list_missing_digests(store.find_missing_blobs_request(vec![digest].iter()),) + .list_missing_digests(store.find_missing_blobs_request(vec![digest])) .await, Ok(digest_set) ); @@ -295,9 +293,7 @@ async fn list_missing_digests_error() { let store = new_byte_store(&cas); let error = store - .list_missing_digests( - store.find_missing_blobs_request(vec![TestData::roland().digest()].iter()), - ) + .list_missing_digests(store.find_missing_blobs_request(vec![TestData::roland().digest()])) .await .expect_err("Want error"); assert!( diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index 02f22dc4803..3fb5f94c97b 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -5,7 +5,6 @@ use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; use cache::PersistentCache; -use futures::{future, FutureExt}; use log::{debug, warn}; use prost::Message; use protos::gen::build::bazel::remote::execution::v2 as remexec; @@ -17,8 +16,8 @@ use workunit_store::{ }; use crate::{ - Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, - ProcessMetadata, ProcessResultSource, + check_cache_content, CacheContentBehavior, Context, FallibleProcessResultWithPlatform, Platform, + Process, ProcessCacheScope, ProcessError, ProcessMetadata, ProcessResultSource, }; // TODO: Consider moving into protobuf as a CacheValue type. @@ -34,7 +33,7 @@ pub struct CommandRunner { cache: PersistentCache, file_store: Store, cache_read: bool, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, metadata: ProcessMetadata, } @@ -44,7 +43,7 @@ impl CommandRunner { cache: PersistentCache, file_store: Store, cache_read: bool, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, metadata: ProcessMetadata, ) -> CommandRunner { CommandRunner { @@ -52,7 +51,7 @@ impl CommandRunner { cache, file_store, cache_read, - eager_fetch, + cache_content_behavior, metadata, } } @@ -205,28 +204,11 @@ impl CommandRunner { return Ok(None); }; - // If eager_fetch is enabled, ensure that all digests in the result are loadable, erroring - // if any are not. If eager_fetch is disabled, a Digest which is discovered to be missing later - // on during execution will cause backtracking. - if self.eager_fetch { - let _ = future::try_join_all(vec![ - self - .file_store - .ensure_local_has_file(result.stdout_digest) - .boxed(), - self - .file_store - .ensure_local_has_file(result.stderr_digest) - .boxed(), - self - .file_store - .ensure_local_has_recursive_directory(result.output_directory.clone()) - .boxed(), - ]) - .await?; + if check_cache_content(&result, &self.file_store, self.cache_content_behavior).await? { + Ok(Some(result)) + } else { + Ok(None) } - - Ok(Some(result)) } async fn store( diff --git a/src/rust/engine/process_execution/src/cache_tests.rs b/src/rust/engine/process_execution/src/cache_tests.rs index 028f5bb2f7b..68a8b07ab3e 100644 --- a/src/rust/engine/process_execution/src/cache_tests.rs +++ b/src/rust/engine/process_execution/src/cache_tests.rs @@ -11,8 +11,9 @@ use testutil::relative_paths; use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::{ - CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, ImmutableInputs, - NamedCaches, Process, ProcessError, ProcessMetadata, + CacheContentBehavior, CommandRunner as CommandRunnerTrait, Context, + FallibleProcessResultWithPlatform, ImmutableInputs, NamedCaches, Process, ProcessError, + ProcessMetadata, }; struct RoundtripResults { @@ -59,7 +60,7 @@ fn create_cached_runner( cache, store, true, - true, + CacheContentBehavior::Fetch, ProcessMetadata::default(), )); diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index df9c74fd182..953fe120837 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -44,7 +44,7 @@ use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ExecutedActionMetadata; use serde::{Deserialize, Serialize}; use store::{SnapshotOps, Store, StoreError}; -use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; +use workunit_store::{in_workunit, Level, RunId, RunningWorkunit, WorkunitStore}; pub mod bounded; #[cfg(test)] @@ -755,6 +755,67 @@ impl From for &'static str { } } +#[derive(Clone, Copy, Debug, PartialEq, strum_macros::EnumString)] +#[strum(serialize_all = "snake_case")] +pub enum CacheContentBehavior { + Fetch, + Validate, + Defer, +} + +/// +/// Optionally validate that all digests in the result are loadable, returning false if any are not. +/// +/// If content loading is deferred, a Digest which is discovered to be missing later on during +/// execution will cause backtracking. +/// +pub(crate) async fn check_cache_content( + response: &FallibleProcessResultWithPlatform, + store: &Store, + cache_content_behavior: CacheContentBehavior, +) -> Result { + match cache_content_behavior { + CacheContentBehavior::Fetch => { + let response = response.clone(); + let fetch_result = in_workunit!( + "eager_fetch_action_cache", + Level::Trace, + |_workunit| async move { + try_join_all(vec![ + store.ensure_local_has_file(response.stdout_digest).boxed(), + store.ensure_local_has_file(response.stderr_digest).boxed(), + store + .ensure_local_has_recursive_directory(response.output_directory) + .boxed(), + ]) + .await + } + ) + .await; + match fetch_result { + Err(StoreError::MissingDigest { .. }) => Ok(false), + Ok(_) => Ok(true), + Err(e) => Err(e), + } + } + CacheContentBehavior::Validate => { + let directory_digests = vec![response.output_directory.clone()]; + let file_digests = vec![response.stdout_digest, response.stderr_digest]; + in_workunit!( + "eager_validate_action_cache", + Level::Trace, + |_workunit| async move { + store + .exists_recursive(directory_digests, file_digests) + .await + } + ) + .await + } + CacheContentBehavior::Defer => Ok(true), + } +} + #[derive(Clone)] pub struct Context { workunit_store: WorkunitStore, diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index 8162813bb6f..2f7091671fc 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; use async_trait::async_trait; use fs::{directory, DigestTrie, RelativePath}; -use futures::future::{self, BoxFuture, TryFutureExt}; +use futures::future::{BoxFuture, TryFutureExt}; use futures::FutureExt; use grpc_util::retry::{retry_call, status_is_retryable}; use grpc_util::{headers_to_http_header_map, layered_service, status_to_str, LayeredService}; @@ -23,8 +23,8 @@ use workunit_store::{ use crate::remote::{apply_headers, make_execute_request, populate_fallible_execution_result}; use crate::{ - Context, FallibleProcessResultWithPlatform, Platform, Process, ProcessCacheScope, ProcessError, - ProcessMetadata, ProcessResultSource, + check_cache_content, CacheContentBehavior, Context, FallibleProcessResultWithPlatform, Platform, + Process, ProcessCacheScope, ProcessError, ProcessMetadata, ProcessResultSource, }; use tonic::{Code, Request, Status}; @@ -54,7 +54,7 @@ pub struct CommandRunner { platform: Platform, cache_read: bool, cache_write: bool, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, warnings_behavior: RemoteCacheWarningsBehavior, read_errors_counter: Arc>>, write_errors_counter: Arc>>, @@ -74,7 +74,7 @@ impl CommandRunner { cache_read: bool, cache_write: bool, warnings_behavior: RemoteCacheWarningsBehavior, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, concurrency_limit: usize, read_timeout: Duration, ) -> Result { @@ -106,7 +106,7 @@ impl CommandRunner { platform, cache_read, cache_write, - eager_fetch, + cache_content_behavior, warnings_behavior, read_errors_counter: Arc::new(Mutex::new(BTreeMap::new())), write_errors_counter: Arc::new(Mutex::new(BTreeMap::new())), @@ -264,7 +264,7 @@ impl CommandRunner { &context, self.action_cache_client.clone(), self.store.clone(), - self.eager_fetch, + self.cache_content_behavior, self.read_timeout, ) .await; @@ -511,7 +511,7 @@ async fn check_action_cache( context: &Context, action_cache_client: Arc>, store: Store, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, timeout_duration: Duration, ) -> Result, ProcessError> { in_workunit!( @@ -555,29 +555,17 @@ async fn check_action_cache( .await .map_err(|e| Status::unavailable(format!("Output roots could not be loaded: {e}")))?; - if eager_fetch { - // NB: `ensure_local_has_file` and `ensure_local_has_recursive_directory` are internally - // retried. - let response = response.clone(); - in_workunit!( - "eager_fetch_action_cache", - Level::Trace, - desc = Some("eagerly fetching after action cache hit".to_owned()), - |_workunit| async move { - future::try_join_all(vec![ - store.ensure_local_has_file(response.stdout_digest).boxed(), - store.ensure_local_has_file(response.stderr_digest).boxed(), - store - .ensure_local_has_recursive_directory(response.output_directory) - .boxed(), - ]) - .await - } - ) + let cache_content_valid = check_cache_content(&response, &store, cache_content_behavior) .await - .map_err(|e| Status::unavailable(format!("Output content could not be loaded: {e}")))?; + .map_err(|e| { + Status::unavailable(format!("Output content could not be validated: {e}")) + })?; + + if cache_content_valid { + Ok(response) + } else { + Err(Status::not_found("")) } - Ok(response) }) .await; diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 9a5a4100ae5..012bcf0048f 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -20,9 +20,9 @@ use workunit_store::{RunId, RunningWorkunit, WorkunitStore}; use crate::remote::{ensure_action_stored_locally, make_execute_request}; use crate::{ - CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, Platform, - Process, ProcessError, ProcessMetadata, ProcessResultMetadata, ProcessResultSource, - RemoteCacheWarningsBehavior, + CacheContentBehavior, CommandRunner as CommandRunnerTrait, Context, + FallibleProcessResultWithPlatform, Platform, Process, ProcessError, ProcessMetadata, + ProcessResultMetadata, ProcessResultSource, RemoteCacheWarningsBehavior, }; const CACHE_READ_TIMEOUT: Duration = Duration::from_secs(5); @@ -127,7 +127,7 @@ fn create_local_runner( fn create_cached_runner( local: Box, store_setup: &StoreSetup, - eager_fetch: bool, + cache_content_behavior: CacheContentBehavior, ) -> Box { Box::new( crate::remote_cache::CommandRunner::new( @@ -142,7 +142,7 @@ fn create_cached_runner( true, true, RemoteCacheWarningsBehavior::FirstOnly, - eager_fetch, + cache_content_behavior, 256, CACHE_READ_TIMEOUT, ) @@ -170,7 +170,7 @@ async fn cache_read_success() { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; store_setup @@ -194,7 +194,7 @@ async fn cache_read_skipped_on_action_cache_errors() { let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 500); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; store_setup @@ -224,14 +224,14 @@ async fn cache_read_skipped_on_action_cache_errors() { assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); } -/// If the cache has any issues during reads from the store during eager_fetch, we should gracefully +/// If the cache cannot find a digest during a read from the store during fetch, we should gracefully /// fallback to the local runner. #[tokio::test] -async fn cache_read_skipped_on_store_errors() { +async fn cache_read_skipped_on_missing_digest() { let (workunit_store, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 500); - let cache_runner = create_cached_runner(local_runner, &store_setup, true); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Fetch); // Claim that the process has a non-empty and not-persisted stdout digest. let (process, action_digest) = create_process(&store_setup).await; @@ -243,7 +243,9 @@ async fn cache_read_skipped_on_store_errors() { ); assert_eq!( - workunit_store.get_metrics().get("remote_cache_read_errors"), + workunit_store + .get_metrics() + .get("remote_cache_requests_uncached"), None ); assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -253,8 +255,10 @@ async fn cache_read_skipped_on_store_errors() { .unwrap(); assert_eq!(remote_result.exit_code, 1); assert_eq!( - workunit_store.get_metrics().get("remote_cache_read_errors"), - Some(&1) + workunit_store + .get_metrics() + .get("remote_cache_requests_uncached"), + Some(&1), ); assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 1); } @@ -266,10 +270,13 @@ async fn cache_read_skipped_on_store_errors() { async fn cache_read_eager_fetch() { let (_, mut workunit) = WorkunitStore::setup_for_tests(); - async fn run_process(eager_fetch: bool, workunit: &mut RunningWorkunit) -> (i32, usize) { + async fn run_process( + cache_content_behavior: CacheContentBehavior, + workunit: &mut RunningWorkunit, + ) -> (i32, usize) { let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 1000); - let cache_runner = create_cached_runner(local_runner, &store_setup, eager_fetch); + let cache_runner = create_cached_runner(local_runner, &store_setup, cache_content_behavior); let (process, action_digest) = create_process(&store_setup).await; store_setup.cas.action_cache.insert( @@ -289,11 +296,13 @@ async fn cache_read_eager_fetch() { (remote_result.exit_code, final_local_count) } - let (lazy_exit_code, lazy_local_call_count) = run_process(false, &mut workunit).await; + let (lazy_exit_code, lazy_local_call_count) = + run_process(CacheContentBehavior::Defer, &mut workunit).await; assert_eq!(lazy_exit_code, 0); assert_eq!(lazy_local_call_count, 0); - let (eager_exit_code, eager_local_call_count) = run_process(true, &mut workunit).await; + let (eager_exit_code, eager_local_call_count) = + run_process(CacheContentBehavior::Fetch, &mut workunit).await; assert_eq!(eager_exit_code, 1); assert_eq!(eager_local_call_count, 1); } @@ -314,7 +323,8 @@ async fn cache_read_speculation() { .build(), ); let (local_runner, local_runner_call_counter) = create_local_runner(1, local_delay_ms); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = + create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; if cache_hit { @@ -355,7 +365,7 @@ async fn cache_write_success() { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(0, 100); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -387,7 +397,7 @@ async fn cache_write_not_for_failures() { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); let (local_runner, local_runner_call_counter) = create_local_runner(1, 100); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, _action_digest) = create_process(&store_setup).await; assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -415,7 +425,7 @@ async fn cache_write_does_not_block() { .build(), ); let (local_runner, local_runner_call_counter) = create_local_runner(0, 100); - let cache_runner = create_cached_runner(local_runner, &store_setup, false); + let cache_runner = create_cached_runner(local_runner, &store_setup, CacheContentBehavior::Defer); let (process, action_digest) = create_process(&store_setup).await; assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -608,7 +618,7 @@ async fn make_action_result_basic() { true, true, RemoteCacheWarningsBehavior::FirstOnly, - false, + CacheContentBehavior::Defer, 256, CACHE_READ_TIMEOUT, ) diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index dc0cdbc08ab..139d4c870b5 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -36,7 +36,8 @@ use std::time::Duration; use fs::{DirectoryDigest, Permissions, RelativePath}; use hashing::{Digest, Fingerprint}; use process_execution::{ - Context, ImmutableInputs, InputDigests, NamedCaches, Platform, ProcessCacheScope, ProcessMetadata, + CacheContentBehavior, Context, ImmutableInputs, InputDigests, NamedCaches, Platform, + ProcessCacheScope, ProcessMetadata, }; use prost::Message; use protos::gen::build::bazel::remote::execution::v2::{Action, Command}; @@ -318,7 +319,7 @@ async fn main() { true, true, process_execution::remote_cache::RemoteCacheWarningsBehavior::Backoff, - false, + CacheContentBehavior::Defer, args.cache_rpc_concurrency, Duration::from_secs(2), ) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 2e0ae6643af..f2f6316a11c 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -26,8 +26,8 @@ use hashing::Digest; use log::info; use parking_lot::Mutex; use process_execution::{ - self, bounded, local, nailgun, remote, remote_cache, CommandRunner, ImmutableInputs, NamedCaches, - Platform, ProcessMetadata, RemoteCacheWarningsBehavior, + self, bounded, local, nailgun, remote, remote_cache, CacheContentBehavior, CommandRunner, + ImmutableInputs, NamedCaches, Platform, ProcessMetadata, RemoteCacheWarningsBehavior, }; use protos::gen::build::bazel::remote::execution::v2::ServerCapabilities; use regex::Regex; @@ -93,7 +93,7 @@ pub struct RemotingOptions { pub store_rpc_concurrency: usize, pub store_batch_api_size_limit: usize, pub cache_warnings_behavior: RemoteCacheWarningsBehavior, - pub cache_eager_fetch: bool, + pub cache_content_behavior: CacheContentBehavior, pub cache_rpc_concurrency: usize, pub cache_read_timeout: Duration, pub execution_extra_platform_properties: Vec<(String, String)>, @@ -273,9 +273,13 @@ impl Core { local_cache_read: bool, local_cache_write: bool, ) -> Result, String> { - // TODO: Until we can deprecate letting the flag default, we implicitly disable - // eager_fetch when remote execution is in use. - let eager_fetch = remoting_opts.cache_eager_fetch && !remoting_opts.execution_enable; + // TODO: Until we can deprecate letting the flag default, we implicitly default + // cache_content_behavior when remote execution is in use. + let cache_content_behavior = if remoting_opts.execution_enable { + CacheContentBehavior::Defer + } else { + remoting_opts.cache_content_behavior + }; if remote_cache_read || remote_cache_write { runner = Arc::new(remote_cache::CommandRunner::new( runner, @@ -289,7 +293,7 @@ impl Core { remote_cache_read, remote_cache_write, remoting_opts.cache_warnings_behavior, - eager_fetch, + cache_content_behavior, remoting_opts.cache_rpc_concurrency, remoting_opts.cache_read_timeout, )?); @@ -301,7 +305,7 @@ impl Core { local_cache.clone(), full_store.clone(), local_cache_read, - eager_fetch, + cache_content_behavior, process_execution_metadata.clone(), )); } @@ -475,7 +479,7 @@ impl Core { )?; let store = if (exec_strategy_opts.remote_cache_read || exec_strategy_opts.remote_cache_write) - && remoting_opts.cache_eager_fetch + && remoting_opts.cache_content_behavior == CacheContentBehavior::Defer { // In remote cache mode with eager fetching, the only interaction with the remote CAS // should be through the remote cache code paths. Thus, the store seen by the rest of the diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 839f2953194..b22ce5b1462 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -29,7 +29,7 @@ use log::{self, debug, error, warn, Log}; use logging::logger::PANTS_LOGGER; use logging::{Logger, PythonLogLevel}; use petgraph::graph::{DiGraph, Graph}; -use process_execution::RemoteCacheWarningsBehavior; +use process_execution::{CacheContentBehavior, RemoteCacheWarningsBehavior}; use pyo3::exceptions::{PyException, PyIOError, PyKeyboardInterrupt, PyValueError}; use pyo3::prelude::{ pyclass, pyfunction, pymethods, pymodule, wrap_pyfunction, PyModule, PyObject, @@ -289,7 +289,7 @@ impl PyRemotingOptions { store_rpc_concurrency: usize, store_batch_api_size_limit: usize, cache_warnings_behavior: String, - cache_eager_fetch: bool, + cache_content_behavior: String, cache_rpc_concurrency: usize, cache_read_timeout_millis: u64, execution_extra_platform_properties: Vec<(String, String)>, @@ -312,7 +312,7 @@ impl PyRemotingOptions { store_batch_api_size_limit, cache_warnings_behavior: RemoteCacheWarningsBehavior::from_str(&cache_warnings_behavior) .unwrap(), - cache_eager_fetch, + cache_content_behavior: CacheContentBehavior::from_str(&cache_content_behavior).unwrap(), cache_rpc_concurrency, cache_read_timeout: Duration::from_millis(cache_read_timeout_millis), execution_extra_platform_properties, diff --git a/tests/python/pants_test/integration/remote_cache_integration_test.py b/tests/python/pants_test/integration/remote_cache_integration_test.py index 0ea60b4e4bd..f52560f33e1 100644 --- a/tests/python/pants_test/integration/remote_cache_integration_test.py +++ b/tests/python/pants_test/integration/remote_cache_integration_test.py @@ -5,6 +5,8 @@ import time +import pytest + from pants.core.util_rules import distdir from pants.core.util_rules.distdir import DistDir from pants.engine.fs import Digest, DigestContents, DigestEntries, FileDigest, FileEntry, Workspace @@ -12,7 +14,7 @@ from pants.engine.internals.native_engine import PyExecutor, PyStubCAS from pants.engine.process import Process, ProcessResult from pants.engine.rules import Get, SubsystemRule, goal_rule, rule -from pants.option.global_options import RemoteCacheWarningsBehavior +from pants.option.global_options import CacheContentBehavior, RemoteCacheWarningsBehavior from pants.testutil.pants_integration_test import run_pants from pants.testutil.rule_runner import QueryRule, RuleRunner from pants.util.logging import LogLevel @@ -113,7 +115,7 @@ def run() -> tuple[FileDigest, dict[str, int]]: rules=[entries_from_process, QueryRule(ProcessOutputEntries, [Process])], isolated_local_store=True, bootstrap_args=[ - "--no-remote-cache-eager-fetch", + "--cache-content-behavior=defer", "--no-local-cache", *remote_cache_args(cas.address), ], @@ -193,7 +195,7 @@ def run() -> dict[str, int]: rules=[mock_run, *distdir.rules(), SubsystemRule(MockRunSubsystem)], isolated_local_store=True, bootstrap_args=[ - "--no-remote-cache-eager-fetch", + "--cache-content-behavior=defer", "--no-local-cache", *remote_cache_args(cas.address), ], @@ -221,3 +223,47 @@ def run() -> dict[str, int]: assert metrics2["remote_cache_requests"] == 1 assert metrics2["remote_cache_requests_cached"] == 1 assert metrics2["backtrack_attempts"] == 1 + + +@pytest.mark.parametrize( + "cache_content_behavior", [CacheContentBehavior.validate, CacheContentBehavior.fetch] +) +def test_eager_validation(cache_content_behavior: CacheContentBehavior) -> None: + """Tests that --cache-content-behavior={validate,fetch} fail a lookup for missing content.""" + executor = PyExecutor(core_threads=2, max_threads=4) + cas = PyStubCAS.builder().build(executor) + + def run() -> dict[str, int]: + # Use an isolated store to ensure that the only content is in the remote/stub cache. + rule_runner = RuleRunner( + rules=[mock_run, *distdir.rules(), SubsystemRule(MockRunSubsystem)], + isolated_local_store=True, + bootstrap_args=[ + f"--cache-content-behavior={cache_content_behavior.value}", + "--no-local-cache", + *remote_cache_args(cas.address), + ], + ) + result = rule_runner.run_goal_rule(MockRun, args=[]) + assert result.exit_code == 0 + + # Wait for any async cache writes to complete. + time.sleep(1) + return rule_runner.scheduler.get_metrics() + + # Run once to populate the remote cache, and validate that there is one entry afterwards. + assert cas.action_cache_len() == 0 + metrics1 = run() + assert cas.action_cache_len() == 1 + assert metrics1["remote_cache_requests"] == 1 + assert metrics1["remote_cache_requests_uncached"] == 1 + + # Then, remove the content from the remote store and run again. + assert cas.remove( + FileDigest("434728a410a78f56fc1b5899c3593436e61ab0c731e9072d95e96db290205e53", 8) + ) + metrics2 = run() + # Validate that we missed the cache, and that we didn't backtrack. + assert metrics2["remote_cache_requests"] == 1 + assert metrics2["remote_cache_requests_uncached"] == 1 + assert "backtrack_attempts" not in metrics2