From 692e4de6c44ce070b448235428736d9d73eea997 Mon Sep 17 00:00:00 2001 From: Blizzardc0der <111754819+blizzardc0der@users.noreply.github.com> Date: Sat, 6 Apr 2024 01:04:11 +0800 Subject: [PATCH] [Breaking] Remove completeness checking logic in CacheLookupScheduler Since the CompletenessCheckingStore had landed, users should now use this store if they need CompletenessChecking for their action cache store. (#826) Breaking change only affects CacheLookupScheduler. --- nativelink-config/src/schedulers.rs | 8 +- .../src/cache_lookup_scheduler.rs | 62 ++++---------- .../src/default_scheduler_factory.rs | 4 - .../tests/cache_lookup_scheduler_test.rs | 82 +------------------ 4 files changed, 21 insertions(+), 135 deletions(-) diff --git a/nativelink-config/src/schedulers.rs b/nativelink-config/src/schedulers.rs index 09f79d6e0..ff447dca4 100644 --- a/nativelink-config/src/schedulers.rs +++ b/nativelink-config/src/schedulers.rs @@ -145,15 +145,11 @@ pub struct GrpcScheduler { #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct CacheLookupScheduler { - /// The reference to the action cache store to use to returned cached + /// The reference to the action cache store used to return cached /// actions from rather than running them again. + /// To prevent unintended issues, this store should probably be a CompletenessCheckingStore. pub ac_store: StoreRefName, - /// The reference to the CAS which contains the outputs from the cached - /// actions to verify that the outputs still exist before returning a - /// cached result. - pub cas_store: StoreRefName, - /// The nested scheduler to use if cache lookup fails. pub scheduler: Box, } diff --git a/nativelink-scheduler/src/cache_lookup_scheduler.rs b/nativelink-scheduler/src/cache_lookup_scheduler.rs index b1e60cab0..c9e0dd154 100644 --- a/nativelink-scheduler/src/cache_lookup_scheduler.rs +++ b/nativelink-scheduler/src/cache_lookup_scheduler.rs @@ -35,6 +35,7 @@ use tokio::select; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tonic::Request; +use tracing::warn; use crate::action_scheduler::ActionScheduler; use crate::platform_property_manager::PlatformPropertyManager; @@ -45,10 +46,8 @@ use crate::platform_property_manager::PlatformPropertyManager; type CheckActions = HashMap>>>; pub struct CacheLookupScheduler { - /// A reference to the CAS which is used to validate all the outputs of a - /// cached ActionResult still exist. - cas_store: Arc, /// A reference to the AC to find existing actions in. + /// To prevent unintended issues, this store should probably be a CompletenessCheckingStore. ac_store: Arc, /// The "real" scheduler to use to perform actions if they were not found /// in the action cache. @@ -85,40 +84,6 @@ async fn get_action_from_store( } } -async fn validate_outputs_exist( - cas_store: &Arc, - action_result: &ProtoActionResult, -) -> bool { - // Verify that output_files and output_directories are available in the cas. - let mut required_digests = Vec::with_capacity( - action_result.output_files.len() + action_result.output_directories.len(), - ); - for digest in action_result - .output_files - .iter() - .filter_map(|output_file| output_file.digest.as_ref()) - .chain( - action_result - .output_directories - .iter() - .filter_map(|output_file| output_file.tree_digest.as_ref()), - ) - { - let Ok(digest) = DigestInfo::try_from(digest) else { - return false; - }; - required_digests.push(digest); - } - - let Ok(sizes) = Pin::new(cas_store.as_ref()) - .has_many(&required_digests) - .await - else { - return false; - }; - sizes.into_iter().all(|size| size.is_some()) -} - fn subscribe_to_existing_action( cache_check_actions: &MutexGuard, unique_qualifier: &ActionInfoHashKey, @@ -136,12 +101,10 @@ fn subscribe_to_existing_action( impl CacheLookupScheduler { pub fn new( - cas_store: Arc, ac_store: Arc, action_scheduler: Arc, ) -> Result { Ok(Self { - cas_store, ac_store, action_scheduler, cache_check_actions: Default::default(), @@ -193,7 +156,6 @@ impl ActionScheduler for CacheLookupScheduler { }; let ac_store = self.ac_store.clone(); - let cas_store = self.cas_store.clone(); let action_scheduler = self.action_scheduler.clone(); // We need this spawn because we are returning a stream and this spawn will populate the stream's data. tokio::spawn(async move { @@ -207,12 +169,20 @@ impl ActionScheduler for CacheLookupScheduler { get_action_from_store(Pin::new(ac_store.as_ref()), *action_digest, instance_name) .await { - if validate_outputs_exist(&cas_store, &action_result).await { - // Found in the cache, return the result immediately. - Arc::make_mut(&mut current_state).stage = - ActionStage::CompletedFromCache(action_result); - let _ = tx.send(current_state); - return; + match Pin::new(ac_store.clone().as_ref()) + .has(*action_digest) + .await + { + Ok(Some(_)) => { + Arc::make_mut(&mut current_state).stage = + ActionStage::CompletedFromCache(action_result); + let _ = tx.send(current_state); + return; + } + Err(err) => { + warn!("Error while calling `has` on `ac_store` in `CacheLookupScheduler`'s `add_action` function: {}", err); + } + _ => {} } } // Not in cache, forward to upstream and proxy state. diff --git a/nativelink-scheduler/src/default_scheduler_factory.rs b/nativelink-scheduler/src/default_scheduler_factory.rs index 6bd29c934..8ebb7233b 100644 --- a/nativelink-scheduler/src/default_scheduler_factory.rs +++ b/nativelink-scheduler/src/default_scheduler_factory.rs @@ -61,9 +61,6 @@ fn inner_scheduler_factory( } SchedulerConfig::grpc(config) => (Some(Arc::new(GrpcScheduler::new(config)?)), None), SchedulerConfig::cache_lookup(config) => { - let cas_store = store_manager - .get_store(&config.cas_store) - .err_tip(|| format!("'cas_store': '{}' does not exist", config.cas_store))?; let ac_store = store_manager .get_store(&config.ac_store) .err_tip(|| format!("'ac_store': '{}' does not exist", config.ac_store))?; @@ -71,7 +68,6 @@ fn inner_scheduler_factory( inner_scheduler_factory(&config.scheduler, store_manager, None, visited_schedulers) .err_tip(|| "In nested CacheLookupScheduler construction")?; let cache_lookup_scheduler = Arc::new(CacheLookupScheduler::new( - cas_store, ac_store, action_scheduler.err_tip(|| "Nested scheduler is not an action scheduler")?, )?); diff --git a/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs b/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs index f5dcf8602..011717352 100644 --- a/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs +++ b/nativelink-scheduler/tests/cache_lookup_scheduler_test.rs @@ -23,22 +23,18 @@ mod utils { } use futures::join; -use nativelink_error::{Error, ResultExt}; +use nativelink_error::Error; use nativelink_proto::build::bazel::remote::execution::v2::ActionResult as ProtoActionResult; use nativelink_scheduler::action_scheduler::ActionScheduler; use nativelink_scheduler::cache_lookup_scheduler::CacheLookupScheduler; use nativelink_scheduler::platform_property_manager::PlatformPropertyManager; use nativelink_store::memory_store::MemoryStore; -use nativelink_util::action_messages::{ - ActionInfoHashKey, ActionResult, ActionStage, ActionState, DirectoryInfo, -}; +use nativelink_util::action_messages::{ActionInfoHashKey, ActionResult, ActionStage, ActionState}; use nativelink_util::common::DigestInfo; use nativelink_util::store_trait::Store; use prost::Message; use tokio::sync::watch; use tokio::{self}; -use tokio_stream::wrappers::WatchStream; -use tokio_stream::StreamExt; use utils::mock_scheduler::MockActionScheduler; use utils::scheduler_utils::{make_base_action_info, INSTANCE_NAME}; @@ -50,14 +46,10 @@ struct TestContext { fn make_cache_scheduler() -> Result { let mock_scheduler = Arc::new(MockActionScheduler::new()); - let cas_store = Arc::new(MemoryStore::new( - &nativelink_config::stores::MemoryStore::default(), - )); let ac_store = Arc::new(MemoryStore::new( &nativelink_config::stores::MemoryStore::default(), )); - let cache_scheduler = - CacheLookupScheduler::new(cas_store, ac_store.clone(), mock_scheduler.clone())?; + let cache_scheduler = CacheLookupScheduler::new(ac_store.clone(), mock_scheduler.clone())?; Ok(TestContext { mock_scheduler, ac_store, @@ -92,74 +84,6 @@ mod cache_lookup_scheduler_tests { Ok(()) } - #[tokio::test] - async fn add_action_does_cache_lookup() -> Result<(), Error> { - let context = make_cache_scheduler()?; - let action_info = make_base_action_info(UNIX_EPOCH); - let action_result = ProtoActionResult::from(ActionResult::default()); - let store_pin = Pin::new(context.ac_store.as_ref()); - store_pin - .update_oneshot(*action_info.digest(), action_result.encode_to_vec().into()) - .await?; - let watch_channel = context - .cache_scheduler - .add_action(action_info.clone()) - .await?; - let mut watch_stream = WatchStream::new(watch_channel); - if watch_stream - .next() - .await - .err_tip(|| "Getting initial state")? - .stage - != ActionStage::CacheCheck - { - panic!("Not performing a cache check"); - } - let cached_action_state = watch_stream - .next() - .await - .err_tip(|| "Getting post-cache result")?; - let ActionStage::CompletedFromCache(proto_action_result) = - cached_action_state.stage.clone() - else { - panic!("Did not complete from cache"); - }; - assert_eq!(action_info.digest(), cached_action_state.action_digest()); - assert_eq!(action_result, proto_action_result); - Ok(()) - } - - #[tokio::test] - async fn add_action_validates_outputs() -> Result<(), Error> { - let context = make_cache_scheduler()?; - let action_info = make_base_action_info(UNIX_EPOCH); - let mut action_result = ActionResult::default(); - action_result.output_folders.push(DirectoryInfo { - path: "".to_string(), - tree_digest: DigestInfo { - size_bytes: 1, - packed_hash: [8; 32], - }, - }); - let action_result = ProtoActionResult::from(action_result); - let store_pin = Pin::new(context.ac_store.as_ref()); - store_pin - .update_oneshot(*action_info.digest(), action_result.encode_to_vec().into()) - .await?; - let (_forward_watch_channel_tx, forward_watch_channel_rx) = - watch::channel(Arc::new(ActionState { - unique_qualifier: action_info.unique_qualifier.clone(), - stage: ActionStage::Queued, - })); - let _ = join!( - context.cache_scheduler.add_action(action_info), - context - .mock_scheduler - .expect_add_action(Ok(forward_watch_channel_rx)) - ); - Ok(()) - } - #[tokio::test] async fn add_action_handles_skip_cache() -> Result<(), Error> { let context = make_cache_scheduler()?;