From 888e071c8c14074f0c51293240c79a10d5d3ed66 Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Wed, 16 Sep 2020 10:45:32 -0700 Subject: [PATCH] Local process cache validates that digests exist locally before hitting (#10789) When hitting the local process cache, ensure that all of the process outputs exist (and as a sideffect, that they are downloaded locally if a remote cache is configured). Added and fixed a test for this case. An alternative implementation would have been to guarantee that a cache entry must exist only if all of the digests it requires are transitively reachable. But the local cache and the filesystem store use two different LMDB stores, which means that we cannot transactionally update them in a way that would rule out a cache entry existing even though its file content had been garbage collected... and it's not clear that merging those stores is desirable. Fixes #10719. In addition to the test, I lowered the lease time and garbage collection times and validated that the case described on #10719 is no longer reproducible. --- .../pants/engine/internals/scheduler.py | 8 +- .../pants/pantsd/service/store_gc_service.py | 12 +- src/rust/engine/fs/store/src/lib.rs | 52 +++++-- src/rust/engine/fs/store/src/local.rs | 8 + .../engine/process_execution/src/cache.rs | 37 ++++- .../process_execution/src/cache_tests.rs | 145 +++++++++++++----- src/rust/engine/sharded_lmdb/src/lib.rs | 25 +++ src/rust/engine/src/externs/interface.rs | 16 +- 8 files changed, 232 insertions(+), 71 deletions(-) diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index 1276e436625..1c306ef4475 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -283,8 +283,8 @@ def _run_and_return_roots(self, session, execution_request): def lease_files_in_graph(self, session): self._native.lib.lease_files_in_graph(self._scheduler, session) - def garbage_collect_store(self): - self._native.lib.garbage_collect_store(self._scheduler) + def garbage_collect_store(self, target_size_bytes: int) -> None: + self._native.lib.garbage_collect_store(self._scheduler, target_size_bytes) def new_session( self, @@ -613,5 +613,5 @@ def materialize_directories( def lease_files_in_graph(self): self._scheduler.lease_files_in_graph(self._session) - def garbage_collect_store(self): - self._scheduler.garbage_collect_store() + def garbage_collect_store(self, target_size_bytes: int) -> None: + self._scheduler.garbage_collect_store(target_size_bytes) diff --git a/src/python/pants/pantsd/service/store_gc_service.py b/src/python/pants/pantsd/service/store_gc_service.py index a3995f434e6..00859911c06 100644 --- a/src/python/pants/pantsd/service/store_gc_service.py +++ b/src/python/pants/pantsd/service/store_gc_service.py @@ -13,14 +13,19 @@ class StoreGCService(PantsService): This service both ensures that in-use files continue to be present in the engine's Store, and performs occasional garbage collection to bound the size of the engine's Store. + + NB: The lease extension interval should be significantly less than the rust-side + sharded_lmdb::DEFAULT_LEASE_TIME to ensure that valid leases are extended well before they + might expire. """ def __init__( self, scheduler: Scheduler, period_secs=10, - lease_extension_interval_secs=(30 * 60), - gc_interval_secs=(4 * 60 * 60), + lease_extension_interval_secs=(15 * 60), + gc_interval_secs=(1 * 60 * 60), + target_size_bytes=(4 * 1024 * 1024 * 1024), ): super().__init__() self._scheduler_session = scheduler.new_session( @@ -31,6 +36,7 @@ def __init__( self._period_secs = period_secs self._lease_extension_interval_secs = lease_extension_interval_secs self._gc_interval_secs = gc_interval_secs + self._target_size_bytes = target_size_bytes self._set_next_gc() self._set_next_lease_extension() @@ -53,7 +59,7 @@ def _maybe_garbage_collect(self): if time.time() < self._next_gc: return self._logger.info("Garbage collecting store") - self._scheduler_session.garbage_collect_store() + self._scheduler_session.garbage_collect_store(self._target_size_bytes) self._logger.info("Done garbage collecting store") self._set_next_gc() diff --git a/src/rust/engine/fs/store/src/lib.rs b/src/rust/engine/fs/store/src/lib.rs index b33b3024c3e..17cff68f81c 100644 --- a/src/rust/engine/fs/store/src/lib.rs +++ b/src/rust/engine/fs/store/src/lib.rs @@ -277,6 +277,13 @@ impl Store { } } + /// + /// Remove a file locally, returning true if it existed, or false otherwise. + /// + pub async fn remove_file(&self, digest: Digest) -> Result { + self.local.remove(EntryType::File, digest).await + } + /// /// Store a file locally. /// @@ -539,8 +546,8 @@ impl Store { } /// - /// Download a directory from Remote ByteStore recursively to the local one. Called only with the - /// Digest of a Directory. + /// Ensure that a directory is locally loadable, which will download it from the Remote store as + /// a sideeffect (if one is configured). Called only with the Digest of a Directory. /// pub fn ensure_local_has_recursive_directory(&self, dir_digest: Digest) -> BoxFuture<(), String> { let loaded_directory = { @@ -564,13 +571,9 @@ impl Store { .map(|file_node| { let file_digest = try_future!(file_node.get_digest().try_into()); let store = store.clone(); - Box::pin(async move { - store - .load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(())) - .await - }) - .compat() - .to_boxed() + Box::pin(async move { store.ensure_local_has_file(file_digest).await }) + .compat() + .to_boxed() }) .collect::>(); @@ -590,6 +593,24 @@ impl Store { .to_boxed() } + /// + /// Ensure that a file is locally loadable, which will download it from the Remote store as + /// a sideeffect (if one is configured). Called only with the Digest of a File. + /// + pub async fn ensure_local_has_file(&self, file_digest: Digest) -> Result<(), String> { + let result = self + .load_bytes_with(EntryType::File, file_digest, |_| Ok(()), |_| Ok(())) + .await?; + if result.is_some() { + Ok(()) + } else { + Err(format!( + "File {:?} did not exist in the store.", + file_digest + )) + } + } + pub async fn lease_all_recursively<'a, Ds: Iterator>( &self, digests: Ds, @@ -609,13 +630,14 @@ impl Store { match self.local.shrink(target_size_bytes, shrink_behavior) { Ok(size) => { if size > target_size_bytes { - Err(format!( - "Garbage collection attempted to target {} bytes but could only shrink to {} bytes", - target_size_bytes, size - )) - } else { - Ok(()) + log::warn!( + "Garbage collection attempted to shrink the store to {} bytes but {} bytes \ + are currently in use.", + target_size_bytes, + size + ) } + Ok(()) } Err(err) => Err(format!("Garbage collection failed: {:?}", err)), } diff --git a/src/rust/engine/fs/store/src/local.rs b/src/rust/engine/fs/store/src/local.rs index 9a885015be7..0d026cd1f3f 100644 --- a/src/rust/engine/fs/store/src/local.rs +++ b/src/rust/engine/fs/store/src/local.rs @@ -240,6 +240,14 @@ impl ByteStore { Ok(()) } + pub async fn remove(&self, entry_type: EntryType, digest: Digest) -> Result { + let dbs = match entry_type { + EntryType::Directory => self.inner.directory_dbs.clone(), + EntryType::File => self.inner.file_dbs.clone(), + }; + dbs?.remove(digest.0).await + } + pub async fn store_bytes( &self, entry_type: EntryType, diff --git a/src/rust/engine/process_execution/src/cache.rs b/src/rust/engine/process_execution/src/cache.rs index a8e39d0489d..038465613af 100644 --- a/src/rust/engine/process_execution/src/cache.rs +++ b/src/rust/engine/process_execution/src/cache.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; use futures::compat::Future01CompatExt; -use futures01::Future; +use futures::{future as future03, FutureExt}; use log::{debug, warn}; use protobuf::Message; use serde::{Deserialize, Serialize}; @@ -66,7 +66,7 @@ impl crate::CommandRunner for CommandRunner { match self.lookup(key).await { Ok(Some(result)) => return Ok(result), Err(err) => { - warn!( + debug!( "Error loading process execution result from local cache: {} - continuing to execute", err ); @@ -80,7 +80,7 @@ impl crate::CommandRunner for CommandRunner { let result = command_runner.underlying.run(req, context).await?; if result.exit_code == 0 { if let Err(err) = command_runner.store(key, &result).await { - debug!( + warn!( "Error storing process execution result to local cache: {} - ignoring and continuing", err ); @@ -97,6 +97,7 @@ impl CommandRunner { ) -> Result, String> { use bazel_protos::remote_execution::ExecuteResponse; + // See whether there is a cache entry. let maybe_execute_response: Option<(ExecuteResponse, Platform)> = self .process_execution_store .load_bytes_with(fingerprint, move |bytes| { @@ -114,19 +115,39 @@ impl CommandRunner { }) .await?; - if let Some((execute_response, platform)) = maybe_execute_response { + // Deserialize the cache entry if it existed. + let result = if let Some((execute_response, platform)) = maybe_execute_response { crate::remote::populate_fallible_execution_result( self.file_store.clone(), execute_response, vec![], platform, ) - .map(Some) .compat() - .await + .await? } else { - Ok(None) - } + return Ok(None); + }; + + // Ensure that all digests in the result are loadable, erroring if any are not. + let _ = future03::try_join_all(vec![ + self + .file_store + .ensure_local_has_file(result.stdout_digest) + .boxed(), + self + .file_store + .ensure_local_has_file(result.stderr_digest) + .boxed(), + self + .file_store + .ensure_local_has_recursive_directory(result.output_directory) + .compat() + .boxed(), + ]) + .await?; + + Ok(Some(result)) } async fn store( diff --git a/src/rust/engine/process_execution/src/cache_tests.rs b/src/rust/engine/process_execution/src/cache_tests.rs index 2287f3a1ea6..744df675453 100644 --- a/src/rust/engine/process_execution/src/cache_tests.rs +++ b/src/rust/engine/process_execution/src/cache_tests.rs @@ -2,10 +2,12 @@ use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, NamedCaches, Process, ProcessMetadata, }; + +use std::convert::TryInto; use std::io::Write; use std::path::PathBuf; -use std::sync::Arc; +use futures::compat::Future01CompatExt; use sharded_lmdb::{ShardedLmdb, DEFAULT_LEASE_TIME}; use store::Store; use tempfile::TempDir; @@ -17,20 +19,55 @@ struct RoundtripResults { maybe_cached: Result, } -async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { +fn create_local_runner() -> (Box, Store, TempDir) { let runtime = task_executor::Executor::new(Handle::current()); - let work_dir = TempDir::new().unwrap(); - let named_cache_dir = TempDir::new().unwrap(); - let store_dir = TempDir::new().unwrap(); - let store = Store::local_only(runtime.clone(), store_dir.path()).unwrap(); - let local = crate::local::CommandRunner::new( + let base_dir = TempDir::new().unwrap(); + let named_cache_dir = base_dir.path().join("named_cache_dir"); + let store_dir = base_dir.path().join("store_dir"); + let store = Store::local_only(runtime.clone(), store_dir).unwrap(); + let runner = Box::new(crate::local::CommandRunner::new( store.clone(), runtime.clone(), - work_dir.path().to_owned(), - NamedCaches::new(named_cache_dir.path().to_owned()), + base_dir.path().to_owned(), + NamedCaches::new(named_cache_dir), true, - ); + )); + (runner, store, base_dir) +} + +fn create_cached_runner( + local: Box, + store: Store, +) -> (Box, TempDir) { + let runtime = task_executor::Executor::new(Handle::current()); + let cache_dir = TempDir::new().unwrap(); + let max_lmdb_size = 50 * 1024 * 1024; //50 MB - I didn't pick that number but it seems reasonable. + + let process_execution_store = ShardedLmdb::new( + cache_dir.path().to_owned(), + max_lmdb_size, + runtime.clone(), + DEFAULT_LEASE_TIME, + ) + .unwrap(); + + let metadata = ProcessMetadata { + instance_name: None, + cache_key_gen_version: None, + platform_properties: vec![], + }; + let runner = Box::new(crate::cache::CommandRunner::new( + local.into(), + process_execution_store, + store, + metadata, + )); + + (runner, cache_dir) +} + +fn create_script(script_exit_code: i8) -> (Process, PathBuf, TempDir) { let script_dir = TempDir::new().unwrap(); let script_path = script_dir.path().join("script"); std::fs::File::create(&script_path) @@ -44,40 +81,25 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { }) .unwrap(); - let request = Process::new(vec![ + let process = Process::new(vec![ testutil::path::find_bash(), format!("{}", script_path.display()), ]) .output_files(vec![PathBuf::from("roland")].into_iter().collect()); - let local_result = local.run(request.clone().into(), Context::default()).await; - - let cache_dir = TempDir::new().unwrap(); - let max_lmdb_size = 50 * 1024 * 1024; //50 MB - I didn't pick that number but it seems reasonable. + (process, script_path, script_dir) +} - let process_execution_store = ShardedLmdb::new( - cache_dir.path().to_owned(), - max_lmdb_size, - runtime.clone(), - DEFAULT_LEASE_TIME, - ) - .unwrap(); +async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { + let (local, store, _local_runner_dir) = create_local_runner(); + let (process, script_path, _script_dir) = create_script(script_exit_code); - let metadata = ProcessMetadata { - instance_name: None, - cache_key_gen_version: None, - platform_properties: vec![], - }; + let local_result = local.run(process.clone().into(), Context::default()).await; - let caching = crate::cache::CommandRunner::new( - Arc::new(local), - process_execution_store, - store.clone(), - metadata, - ); + let (caching, _cache_dir) = create_cached_runner(local, store.clone()); let uncached_result = caching - .run(request.clone().into(), Context::default()) + .run(process.clone().into(), Context::default()) .await; assert_eq!(local_result, uncached_result); @@ -86,7 +108,7 @@ async fn run_roundtrip(script_exit_code: i8) -> RoundtripResults { // fail due to a FileNotFound error. So, If the second run succeeds, that implies that the // cache was successfully used. std::fs::remove_file(&script_path).unwrap(); - let maybe_cached_result = caching.run(request.into(), Context::default()).await; + let maybe_cached_result = caching.run(process.into(), Context::default()).await; RoundtripResults { uncached: uncached_result, @@ -107,3 +129,56 @@ async fn failures_not_cached() { assert_eq!(results.uncached.unwrap().exit_code, 1); assert_eq!(results.maybe_cached.unwrap().exit_code, 127); // aka the return code for file not found } + +#[tokio::test] +async fn recover_from_missing_store_contents() { + let (local, store, _local_runner_dir) = create_local_runner(); + let (caching, _cache_dir) = create_cached_runner(local, store.clone()); + let (process, _script_path, _script_dir) = create_script(0); + + // Run once to cache the process. + let first_result = caching + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + + // Delete the first child of the output directory parent to confirm that we ensure that more + // than just the root of the output is present when hitting the cache. + { + let output_dir_digest = first_result.output_directory; + let (output_dir, _) = store + .load_directory(output_dir_digest) + .await + .unwrap() + .unwrap(); + let output_child_digest = output_dir + .get_files() + .first() + .unwrap() + .get_digest() + .try_into() + .unwrap(); + let removed = store.remove_file(output_child_digest).await.unwrap(); + assert!(removed); + assert!(store + .contents_for_directory(output_dir_digest) + .compat() + .await + .err() + .is_some()) + } + + // Ensure that we don't fail if we re-run. + let second_result = caching + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + + // And that the entire output directory can be loaded. + assert!(store + .contents_for_directory(second_result.output_directory) + .compat() + .await + .ok() + .is_some()) +} diff --git a/src/rust/engine/sharded_lmdb/src/lib.rs b/src/rust/engine/sharded_lmdb/src/lib.rs index bf6a617e225..bfd71288865 100644 --- a/src/rust/engine/sharded_lmdb/src/lib.rs +++ b/src/rust/engine/sharded_lmdb/src/lib.rs @@ -242,6 +242,31 @@ impl ShardedLmdb { self.lmdbs.values().cloned().collect() } + pub async fn remove(&self, fingerprint: Fingerprint) -> Result { + let store = self.clone(); + self + .executor + .spawn_blocking(move || { + let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::schema_version()); + let (env, db, _lease_database) = store.get(&fingerprint); + let del_res = env.begin_rw_txn().and_then(|mut txn| { + txn.del(db, &effective_key, None)?; + txn.commit() + }); + + match del_res { + Ok(()) => Ok(true), + Err(lmdb::Error::NotFound) => Ok(false), + Err(err) => Err(format!( + "Error removing versioned key {:?}: {}", + effective_key.to_hex(), + err + )), + } + }) + .await + } + pub async fn exists(&self, fingerprint: Fingerprint) -> Result { let store = self.clone(); let effective_key = VersionedFingerprint::new(fingerprint, ShardedLmdb::schema_version()); diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 29a1f49db03..872d87352f5 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -202,7 +202,7 @@ py_module_initializer!(native_engine, |py, m| { m.add( py, "garbage_collect_store", - py_fn!(py, garbage_collect_store(a: PyScheduler)), + py_fn!(py, garbage_collect_store(a: PyScheduler, b: usize)), )?; m.add( py, @@ -1228,13 +1228,17 @@ fn set_panic_handler(_: Python) -> PyUnitResult { Ok(None) } -fn garbage_collect_store(py: Python, scheduler_ptr: PyScheduler) -> PyUnitResult { +fn garbage_collect_store( + py: Python, + scheduler_ptr: PyScheduler, + target_size_bytes: usize, +) -> PyUnitResult { with_scheduler(py, scheduler_ptr, |scheduler| { py.allow_threads(|| { - scheduler.core.store().garbage_collect( - store::DEFAULT_LOCAL_STORE_GC_TARGET_BYTES, - store::ShrinkBehavior::Fast, - ) + scheduler + .core + .store() + .garbage_collect(target_size_bytes, store::ShrinkBehavior::Fast) }) .map_err(|e| PyErr::new::(py, (e,))) .map(|()| None)