Skip to content

Commit

Permalink
Local process cache validates that digests exist locally before hitti…
Browse files Browse the repository at this point in the history
…ng (pantsbuild#10789)

When hitting the local process cache, ensure that all of the process outputs exist (and as a sideffect, that they are downloaded locally if a remote cache is configured). Added and fixed a test for this case.

An alternative implementation would have been to guarantee that a cache entry must exist only if all of the digests it requires are transitively reachable. But the local cache and the filesystem store use two different LMDB stores, which means that we cannot transactionally update them in a way that would rule out a cache entry existing even though its file content had been garbage collected... and it's not clear that merging those stores is desirable.

Fixes pantsbuild#10719. In addition to the test, I lowered the lease time and garbage collection times and validated that the case described on pantsbuild#10719 is no longer reproducible.
  • Loading branch information
stuhood committed Sep 29, 2020
1 parent 31cdbc8 commit 888e071
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 71 deletions.
8 changes: 4 additions & 4 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ def _run_and_return_roots(self, session, execution_request):
def lease_files_in_graph(self, session):
self._native.lib.lease_files_in_graph(self._scheduler, session)

def garbage_collect_store(self):
self._native.lib.garbage_collect_store(self._scheduler)
def garbage_collect_store(self, target_size_bytes: int) -> None:
self._native.lib.garbage_collect_store(self._scheduler, target_size_bytes)

def new_session(
self,
Expand Down Expand Up @@ -613,5 +613,5 @@ def materialize_directories(
def lease_files_in_graph(self):
self._scheduler.lease_files_in_graph(self._session)

def garbage_collect_store(self):
self._scheduler.garbage_collect_store()
def garbage_collect_store(self, target_size_bytes: int) -> None:
self._scheduler.garbage_collect_store(target_size_bytes)
12 changes: 9 additions & 3 deletions src/python/pants/pantsd/service/store_gc_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ class StoreGCService(PantsService):
This service both ensures that in-use files continue to be present in the engine's Store, and
performs occasional garbage collection to bound the size of the engine's Store.
NB: The lease extension interval should be significantly less than the rust-side
sharded_lmdb::DEFAULT_LEASE_TIME to ensure that valid leases are extended well before they
might expire.
"""

def __init__(
self,
scheduler: Scheduler,
period_secs=10,
lease_extension_interval_secs=(30 * 60),
gc_interval_secs=(4 * 60 * 60),
lease_extension_interval_secs=(15 * 60),
gc_interval_secs=(1 * 60 * 60),
target_size_bytes=(4 * 1024 * 1024 * 1024),
):
super().__init__()
self._scheduler_session = scheduler.new_session(
Expand All @@ -31,6 +36,7 @@ def __init__(
self._period_secs = period_secs
self._lease_extension_interval_secs = lease_extension_interval_secs
self._gc_interval_secs = gc_interval_secs
self._target_size_bytes = target_size_bytes

self._set_next_gc()
self._set_next_lease_extension()
Expand All @@ -53,7 +59,7 @@ def _maybe_garbage_collect(self):
if time.time() < self._next_gc:
return
self._logger.info("Garbage collecting store")
self._scheduler_session.garbage_collect_store()
self._scheduler_session.garbage_collect_store(self._target_size_bytes)
self._logger.info("Done garbage collecting store")
self._set_next_gc()

Expand Down
52 changes: 37 additions & 15 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,13 @@ impl Store {
}
}

///
/// Remove a file locally, returning true if it existed, or false otherwise.
///
pub async fn remove_file(&self, digest: Digest) -> Result<bool, String> {
self.local.remove(EntryType::File, digest).await
}

///
/// Store a file locally.
///
Expand Down Expand Up @@ -539,8 +546,8 @@ impl Store {
}

///
/// Download a directory from Remote ByteStore recursively to the local one. Called only with the
/// Digest of a Directory.
/// Ensure that a directory is locally loadable, which will download it from the Remote store as
/// a sideeffect (if one is configured). Called only with the Digest of a Directory.
///
pub fn ensure_local_has_recursive_directory(&self, dir_digest: Digest) -> BoxFuture<(), String> {
let loaded_directory = {
Expand All @@ -564,13 +571,9 @@ impl Store {
.map(|file_node| {
let file_digest = try_future!(file_node.get_digest().try_into());
let store = store.clone();
Box::pin(async move {
store
.load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(()))
.await
})
.compat()
.to_boxed()
Box::pin(async move { store.ensure_local_has_file(file_digest).await })
.compat()
.to_boxed()
})
.collect::<Vec<_>>();

Expand All @@ -590,6 +593,24 @@ impl Store {
.to_boxed()
}

///
/// Ensure that a file is locally loadable, which will download it from the Remote store as
/// a sideeffect (if one is configured). Called only with the Digest of a File.
///
pub async fn ensure_local_has_file(&self, file_digest: Digest) -> Result<(), String> {
let result = self
.load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(()))
.await?;
if result.is_some() {
Ok(())
} else {
Err(format!(
"File {:?} did not exist in the store.",
file_digest
))
}
}

pub async fn lease_all_recursively<'a, Ds: Iterator<Item = &'a Digest>>(
&self,
digests: Ds,
Expand All @@ -609,13 +630,14 @@ impl Store {
match self.local.shrink(target_size_bytes, shrink_behavior) {
Ok(size) => {
if size > target_size_bytes {
Err(format!(
"Garbage collection attempted to target {} bytes but could only shrink to {} bytes",
target_size_bytes, size
))
} else {
Ok(())
log::warn!(
"Garbage collection attempted to shrink the store to {} bytes but {} bytes \
are currently in use.",
target_size_bytes,
size
)
}
Ok(())
}
Err(err) => Err(format!("Garbage collection failed: {:?}", err)),
}
Expand Down
8 changes: 8 additions & 0 deletions src/rust/engine/fs/store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,14 @@ impl ByteStore {
Ok(())
}

pub async fn remove(&self, entry_type: EntryType, digest: Digest) -> Result<bool, String> {
let dbs = match entry_type {
EntryType::Directory => self.inner.directory_dbs.clone(),
EntryType::File => self.inner.file_dbs.clone(),
};
dbs?.remove(digest.0).await
}

pub async fn store_bytes(
&self,
entry_type: EntryType,
Expand Down
37 changes: 29 additions & 8 deletions src/rust/engine/process_execution/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use futures::compat::Future01CompatExt;
use futures01::Future;
use futures::{future as future03, FutureExt};
use log::{debug, warn};
use protobuf::Message;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -66,7 +66,7 @@ impl crate::CommandRunner for CommandRunner {
match self.lookup(key).await {
Ok(Some(result)) => return Ok(result),
Err(err) => {
warn!(
debug!(
"Error loading process execution result from local cache: {} - continuing to execute",
err
);
Expand All @@ -80,7 +80,7 @@ impl crate::CommandRunner for CommandRunner {
let result = command_runner.underlying.run(req, context).await?;
if result.exit_code == 0 {
if let Err(err) = command_runner.store(key, &result).await {
debug!(
warn!(
"Error storing process execution result to local cache: {} - ignoring and continuing",
err
);
Expand All @@ -97,6 +97,7 @@ impl CommandRunner {
) -> Result<Option<FallibleProcessResultWithPlatform>, String> {
use bazel_protos::remote_execution::ExecuteResponse;

// See whether there is a cache entry.
let maybe_execute_response: Option<(ExecuteResponse, Platform)> = self
.process_execution_store
.load_bytes_with(fingerprint, move |bytes| {
Expand All @@ -114,19 +115,39 @@ impl CommandRunner {
})
.await?;

if let Some((execute_response, platform)) = maybe_execute_response {
// Deserialize the cache entry if it existed.
let result = if let Some((execute_response, platform)) = maybe_execute_response {
crate::remote::populate_fallible_execution_result(
self.file_store.clone(),
execute_response,
vec![],
platform,
)
.map(Some)
.compat()
.await
.await?
} else {
Ok(None)
}
return Ok(None);
};

// Ensure that all digests in the result are loadable, erroring if any are not.
let _ = future03::try_join_all(vec![
self
.file_store
.ensure_local_has_file(result.stdout_digest)
.boxed(),
self
.file_store
.ensure_local_has_file(result.stderr_digest)
.boxed(),
self
.file_store
.ensure_local_has_recursive_directory(result.output_directory)
.compat()
.boxed(),
])
.await?;

Ok(Some(result))
}

async fn store(
Expand Down
Loading

0 comments on commit 888e071

Please sign in to comment.