Skip to content

Commit

Permalink
Implement a validate mode to reduce network usage for remote caches (
Browse files Browse the repository at this point in the history
…pantsbuild#16398)

As reported in pantsbuild#16096, backtracking still has some kinks to work out, and is unlikely to be fully stable before `2.13.0` ships.

To gain "most" of the network-usage benefit of deferring fetching cache content while significantly narrowing the window in which backtracking is necessary, this change replaces the `remote_cache_eager_fetch` flag with a `cache_content_behavior` ternary option. As described in the `help` for the new option, the modes are:
* `fetch`: Eagerly fetches cache content: the default. Equivalent to `eager_fetch=True`, the previous default.
* `validate`: Validates that cache content exists either locally or remotely, without actually fetching files. This cannot be the default (without changing behavior), because it can still require backtracking if cache content expires during the lifetime of `pantsd` or a single run.
* `defer`: Does not validate or fetch content: equivalent to `eager_fetch=False`. Hopefully can eventually be made the default once all issues like pantsbuild#16096 are resolved.

Because `validate` narrows the window in which we backtrack to cases where content expires during a run, it should be relatively safe for use even within the `2.13.x` release.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored and cczona committed Sep 1, 2022
1 parent 1ab01d2 commit 10f6386
Show file tree
Hide file tree
Showing 16 changed files with 349 additions and 122 deletions.
8 changes: 4 additions & 4 deletions pants.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ build_ignore.add = [

unmatched_build_file_globs = "error"

# NB: Users must still set `--remote-cache-{read,write}` to enable the remote cache.
# TODO: May cause tests which experience missing digests to hang.
# See https://github.com/pantsbuild/pants/issues/16096.
remote_cache_eager_fetch = false
# TODO: May cause tests which experience missing digests to hang. If you experience an
# issue, please change this to `fetch` and then report an issue on:
# https://github.com/pantsbuild/pants/issues/16096.
cache_content_behavior = "validate"

[anonymous-telemetry]
enabled = true
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def __init__(
store_rpc_concurrency=execution_options.remote_store_rpc_concurrency,
store_batch_api_size_limit=execution_options.remote_store_batch_api_size_limit,
cache_warnings_behavior=execution_options.remote_cache_warnings.value,
cache_eager_fetch=execution_options.remote_cache_eager_fetch,
cache_content_behavior=execution_options.cache_content_behavior.value,
cache_rpc_concurrency=execution_options.remote_cache_rpc_concurrency,
cache_read_timeout_millis=execution_options.remote_cache_read_timeout_millis,
execution_extra_platform_properties=tuple(
Expand Down
68 changes: 63 additions & 5 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
is_in_container,
pants_version,
)
from pants.base.deprecated import deprecated_conditional
from pants.base.deprecated import deprecated_conditional, resolve_conflicting_options
from pants.base.glob_match_error_behavior import GlobMatchErrorBehavior
from pants.engine.environment import CompleteEnvironment
from pants.engine.internals.native_engine import PyExecutor
Expand Down Expand Up @@ -112,6 +112,13 @@ class RemoteCacheWarningsBehavior(Enum):
backoff = "backoff"


@enum.unique
class CacheContentBehavior(Enum):
fetch = "fetch"
validate = "validate"
defer = "defer"


@enum.unique
class AuthPluginState(Enum):
OK = "ok"
Expand Down Expand Up @@ -473,6 +480,7 @@ class ExecutionOptions:
process_execution_remote_parallelism: int
process_execution_cache_namespace: str | None
process_execution_graceful_shutdown_timeout: int
cache_content_behavior: CacheContentBehavior

process_total_child_memory_usage: int | None
process_per_child_memory_usage: int
Expand All @@ -485,7 +493,6 @@ class ExecutionOptions:
remote_store_rpc_concurrency: int
remote_store_batch_api_size_limit: int

remote_cache_eager_fetch: bool
remote_cache_warnings: RemoteCacheWarningsBehavior
remote_cache_rpc_concurrency: int
remote_cache_read_timeout_millis: int
Expand Down Expand Up @@ -518,6 +525,7 @@ def from_options(
process_execution_cache_namespace=bootstrap_options.process_execution_cache_namespace,
process_execution_graceful_shutdown_timeout=bootstrap_options.process_execution_graceful_shutdown_timeout,
process_execution_local_enable_nailgun=bootstrap_options.process_execution_local_enable_nailgun,
cache_content_behavior=GlobalOptions.resolve_cache_content_behavior(bootstrap_options),
process_total_child_memory_usage=bootstrap_options.process_total_child_memory_usage,
process_per_child_memory_usage=bootstrap_options.process_per_child_memory_usage,
# Remote store setup.
Expand All @@ -529,7 +537,6 @@ def from_options(
remote_store_rpc_concurrency=dynamic_remote_options.store_rpc_concurrency,
remote_store_batch_api_size_limit=bootstrap_options.remote_store_batch_api_size_limit,
# Remote cache setup.
remote_cache_eager_fetch=bootstrap_options.remote_cache_eager_fetch,
remote_cache_warnings=bootstrap_options.remote_cache_warnings,
remote_cache_rpc_concurrency=dynamic_remote_options.cache_rpc_concurrency,
remote_cache_read_timeout_millis=bootstrap_options.remote_cache_read_timeout_millis,
Expand Down Expand Up @@ -602,6 +609,7 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
process_execution_cache_namespace=None,
process_cleanup=True,
local_cache=True,
cache_content_behavior=CacheContentBehavior.fetch,
process_execution_local_enable_nailgun=True,
process_execution_graceful_shutdown_timeout=3,
# Remote store setup.
Expand All @@ -615,7 +623,6 @@ def from_options(cls, options: OptionValueContainer) -> LocalStoreOptions:
remote_store_rpc_concurrency=128,
remote_store_batch_api_size_limit=4194304,
# Remote cache setup.
remote_cache_eager_fetch=True,
remote_cache_warnings=RemoteCacheWarningsBehavior.backoff,
remote_cache_rpc_concurrency=128,
remote_cache_read_timeout_millis=1500,
Expand Down Expand Up @@ -1148,6 +1155,32 @@ class BootstrapOptions:
"""
),
)
cache_content_behavior = EnumOption(
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.cache_content_behavior,
help=softwrap(
"""
Controls how the content of cache entries is handled during process execution.
When using a remote cache, the `fetch` behavior will fetch remote cache content from the
remote store before considering the cache lookup a hit, while the `validate` behavior
will only validate (for either a local or remote cache) that the content exists, without
fetching it.
The `defer` behavior, on the other hand, will neither fetch nor validate the cache
content before calling a cache hit a hit. This "defers" actually consuming the cache
entry until a consumer consumes it.
The `defer` mode is the most network efficient (because it will completely skip network
requests in many cases), followed by the `validate` mode (since it can still skip
fetching the content if no consumer ends up needing it). But both the `validate` and
`defer` modes rely on an experimental feature called "backtracking" to attempt to
recover if content later turns out to be missing (`validate` has a much narrower window
for backtracking though, since content would need to disappear between validation and
consumption: generally, within one `pantsd` session).
"""
),
)
ca_certs_path = StrOption(
advanced=True,
default=None,
Expand Down Expand Up @@ -1416,7 +1449,9 @@ class BootstrapOptions:
)
remote_cache_eager_fetch = BoolOption(
advanced=True,
default=DEFAULT_EXECUTION_OPTIONS.remote_cache_eager_fetch,
default=(DEFAULT_EXECUTION_OPTIONS.cache_content_behavior != CacheContentBehavior.defer),
removal_version="2.15.0.dev1",
removal_hint="Use the `cache_content_behavior` option instead.",
help=softwrap(
"""
Eagerly fetch relevant content from the remote store instead of lazily fetching.
Expand Down Expand Up @@ -1835,6 +1870,29 @@ def create_py_executor(bootstrap_options: OptionValueContainer) -> PyExecutor:
core_threads=bootstrap_options.rule_threads_core, max_threads=rule_threads_max
)

@staticmethod
def resolve_cache_content_behavior(
bootstrap_options: OptionValueContainer,
) -> CacheContentBehavior:
resolved_value = resolve_conflicting_options(
old_option="remote_cache_eager_fetch",
new_option="cache_content_behavior",
old_scope="",
new_scope="",
old_container=bootstrap_options,
new_container=bootstrap_options,
)

if isinstance(resolved_value, bool):
# Is `remote_cache_eager_fetch`.
return CacheContentBehavior.fetch if resolved_value else CacheContentBehavior.defer
elif isinstance(resolved_value, CacheContentBehavior):
return resolved_value
else:
raise TypeError(
f"Unexpected option value for `cache_content_behavior`: {resolved_value}"
)

@staticmethod
def compute_pants_ignore(buildroot, global_options):
"""Computes the merged value of the `--pants-ignore` flag.
Expand Down
72 changes: 69 additions & 3 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod snapshot_ops_tests;
mod snapshot_tests;
pub use crate::snapshot_ops::{SnapshotOps, SubsetParams};

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{self, Debug, Display};
use std::fs::OpenOptions;
use std::future::Future;
Expand Down Expand Up @@ -749,8 +749,11 @@ impl Store {
if Store::upload_is_faster_than_checking_whether_to_upload(&ingested_digests) {
ingested_digests.keys().cloned().collect()
} else {
let request = remote.find_missing_blobs_request(ingested_digests.keys());
remote.list_missing_digests(request).await?
remote
.list_missing_digests(
remote.find_missing_blobs_request(ingested_digests.keys().cloned()),
)
.await?
};

future::try_join_all(
Expand Down Expand Up @@ -853,6 +856,69 @@ impl Store {
.await
}

///
/// Return true if the given directory and file digests are loadable from either the local or remote
/// Store, without downloading any file content.
///
/// The given directory digests will be recursively expanded, so it is not necessary to
/// explicitly list their file digests in the file digests list.
///
pub async fn exists_recursive(
&self,
directory_digests: impl IntoIterator<Item = DirectoryDigest>,
file_digests: impl IntoIterator<Item = Digest>,
) -> Result<bool, StoreError> {
// Load directories, which implicitly validates that they exist.
let digest_tries = future::try_join_all(
directory_digests
.into_iter()
.map(|dd| self.load_digest_trie(dd)),
)
.await?;

// Collect all file digests.
let mut file_digests = file_digests.into_iter().collect::<HashSet<_>>();
for digest_trie in digest_tries {
digest_trie.walk(&mut |_, entry| match entry {
directory::Entry::File(f) => {
file_digests.insert(f.digest());
}
directory::Entry::Directory(_) => (),
});
}

// Filter out file digests that exist locally.
// TODO: Implement a local batch API: see https://github.com/pantsbuild/pants/issues/16400.
let local_file_exists = future::try_join_all(
file_digests
.iter()
.map(|file_digest| self.local.exists(EntryType::File, *file_digest))
.collect::<Vec<_>>(),
)
.await?;
let missing_locally = local_file_exists
.into_iter()
.zip(file_digests.into_iter())
.filter_map(|(exists, digest)| if exists { None } else { Some(digest) })
.collect::<Vec<_>>();

// If there are any digests which don't exist locally, check remotely.
if missing_locally.is_empty() {
return Ok(true);
}
let remote = if let Some(remote) = self.remote.clone() {
remote
} else {
return Ok(false);
};
let missing = remote
.store
.list_missing_digests(remote.store.find_missing_blobs_request(missing_locally))
.await?;

Ok(missing.is_empty())
}

///
/// Ensure that a directory is locally loadable, which will download it from the Remote store as
/// a sideeffect (if one is configured).
Expand Down
14 changes: 14 additions & 0 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,20 @@ impl ByteStore {
.await
}

pub async fn exists(&self, entry_type: EntryType, digest: Digest) -> Result<bool, String> {
if digest == EMPTY_DIGEST {
// Avoid I/O for this case. This allows some client-provided operations (like merging
// snapshots) to work without needing to first store the empty snapshot.
return Ok(true);
}

let dbs = match entry_type {
EntryType::Directory => self.inner.directory_dbs.clone(),
EntryType::File => self.inner.file_dbs.clone(),
}?;
dbs.exists(digest.hash).await
}

///
/// Loads bytes from the underlying LMDB store using the given function. Because the database is
/// blocking, this accepts a function that views a slice rather than returning a clone of the
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,13 @@ impl ByteStore {
}
}

pub fn find_missing_blobs_request<'a, Digests: Iterator<Item = &'a Digest>>(
pub fn find_missing_blobs_request(
&self,
digests: Digests,
digests: impl IntoIterator<Item = Digest>,
) -> remexec::FindMissingBlobsRequest {
remexec::FindMissingBlobsRequest {
instance_name: self.instance_name.as_ref().cloned().unwrap_or_default(),
blob_digests: digests.map(|d| d.into()).collect::<Vec<_>>(),
blob_digests: digests.into_iter().map(|d| d.into()).collect::<Vec<_>>(),
}
}

Expand Down
10 changes: 3 additions & 7 deletions src/rust/engine/fs/store/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,7 @@ async fn list_missing_digests_none_missing() {
let store = new_byte_store(&cas);
assert_eq!(
store
.list_missing_digests(
store.find_missing_blobs_request(vec![TestData::roland().digest()].iter()),
)
.list_missing_digests(store.find_missing_blobs_request(vec![TestData::roland().digest()]),)
.await,
Ok(HashSet::new())
);
Expand All @@ -281,7 +279,7 @@ async fn list_missing_digests_some_missing() {

assert_eq!(
store
.list_missing_digests(store.find_missing_blobs_request(vec![digest].iter()),)
.list_missing_digests(store.find_missing_blobs_request(vec![digest]))
.await,
Ok(digest_set)
);
Expand All @@ -295,9 +293,7 @@ async fn list_missing_digests_error() {
let store = new_byte_store(&cas);

let error = store
.list_missing_digests(
store.find_missing_blobs_request(vec![TestData::roland().digest()].iter()),
)
.list_missing_digests(store.find_missing_blobs_request(vec![TestData::roland().digest()]))
.await
.expect_err("Want error");
assert!(
Expand Down
Loading

0 comments on commit 10f6386

Please sign in to comment.