diff --git a/cas/scheduler/scheduler.rs b/cas/scheduler/scheduler.rs index 662141158..534d001b3 100644 --- a/cas/scheduler/scheduler.rs +++ b/cas/scheduler/scheduler.rs @@ -342,6 +342,11 @@ impl SchedulerImpl { return Err(make_input_err!("{}", msg)); } + let did_complete = match action_stage { + ActionStage::Completed(_) => true, + _ => false, + }; + Arc::make_mut(&mut running_action.action.current_state).stage = action_stage; let send_result = running_action @@ -355,6 +360,16 @@ impl SchedulerImpl { ); } + if did_complete { + let worker = self + .workers + .workers + .get_mut(worker_id) + .ok_or_else(|| make_input_err!("WorkerId '{}' does not exist in workers map", worker_id))?; + worker.restore_platform_properties(&action_info.platform_properties); + self.do_try_match(); + } + // TODO(allada) We should probably hold a small queue of recent actions for debugging. // Right now it will drop the action which also disconnects listeners here. Ok(()) @@ -469,11 +484,13 @@ impl Scheduler { } }) .collect(); - for worker_id in worker_ids_to_remove { + for worker_id in worker_ids_to_remove.as_slice() { log::warn!("Worker {} timed out, removing from pool", worker_id); inner.immediate_evict_worker(&worker_id); } - inner.do_try_match(); + if !worker_ids_to_remove.is_empty() { + inner.do_try_match(); + } Ok(()) } diff --git a/cas/scheduler/tests/scheduler_test.rs b/cas/scheduler/tests/scheduler_test.rs index 4a8032215..968b9d246 100644 --- a/cas/scheduler/tests/scheduler_test.rs +++ b/cas/scheduler/tests/scheduler_test.rs @@ -709,4 +709,129 @@ mod scheduler_tests { Ok(()) } + + /// This tests to ensure that platform property restrictions allow jobs to continue to run after + /// a job finished on a specific worker (eg: restore platform properties). + #[tokio::test] + async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> Result<(), Error> { + const WORKER_ID: WorkerId = WorkerId(0x123456789111); + + let scheduler = Scheduler::new(&SchedulerConfig::default()); + let action_digest1 = DigestInfo::new([11u8; 32], 512); + let action_digest2 = DigestInfo::new([99u8; 32], 512); + + let mut properties = HashMap::new(); + properties.insert("prop1".to_string(), PlatformPropertyValue::Minimum(1)); + let platform_properties = PlatformProperties { properties }; + let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, platform_properties.clone()).await?; + let mut client1_rx = setup_action(&scheduler, action_digest1.clone(), platform_properties.clone()).await?; + let mut client2_rx = setup_action(&scheduler, action_digest2.clone(), platform_properties).await?; + + match rx_from_worker.recv().await.unwrap().update { + Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } + v => assert!(false, "Expected StartAction, got : {:?}", v), + } + { + // First client should be in an Executing state. + assert_eq!(client1_rx.borrow_and_update().stage, ActionStage::Executing); + // Second client should be in a queued state. + assert_eq!(client2_rx.borrow_and_update().stage, ActionStage::Queued); + } + + let action_result = ActionResult { + output_files: Default::default(), + output_folders: Default::default(), + output_file_symlinks: Default::default(), + output_directory_symlinks: Default::default(), + exit_code: 0, + stdout_digest: DigestInfo::new([6u8; 32], 19), + stderr_digest: DigestInfo::new([7u8; 32], 20), + execution_metadata: ExecutionMetadata { + worker: WORKER_ID.to_string(), + queued_timestamp: make_system_time(5), + worker_start_timestamp: make_system_time(6), + worker_completed_timestamp: make_system_time(7), + input_fetch_start_timestamp: make_system_time(8), + input_fetch_completed_timestamp: make_system_time(9), + execution_start_timestamp: make_system_time(10), + execution_completed_timestamp: make_system_time(11), + output_upload_start_timestamp: make_system_time(12), + output_upload_completed_timestamp: make_system_time(13), + }, + server_logs: Default::default(), + }; + + // Tell scheduler our first task is completed. + scheduler + .update_action( + &WORKER_ID, + &ActionInfoHashKey { + digest: action_digest1.clone(), + salt: 0, + }, + ActionStage::Completed(action_result.clone()), + ) + .await?; + + // Ensure client did not get notified. + assert!( + client1_rx.changed().await.is_ok(), + "Client should have been notified of event" + ); + + { + // First action should now be completed. + let action_state = client1_rx.borrow_and_update(); + let mut expected_action_state = ActionState { + // Name is a random string, so we ignore it and just make it the same. + name: action_state.name.clone(), + action_digest: action_digest1.clone(), + stage: ActionStage::Completed(action_result.clone()), + }; + // We now know the name of the action so populate it. + expected_action_state.name = action_state.name.clone(); + assert_eq!(action_state.as_ref(), &expected_action_state); + } + + // At this stage it should have added back any platform_properties and the next + // task should be executing on the same worker. + + { + // Our second client should now executing. + match rx_from_worker.recv().await.unwrap().update { + Some(update_for_worker::Update::StartAction(_)) => { /* Success */ } + v => assert!(false, "Expected StartAction, got : {:?}", v), + } + // Other tests check full data. We only care if client thinks we are Executing. + assert_eq!(client2_rx.borrow_and_update().stage, ActionStage::Executing); + } + + // Tell scheduler our second task is completed. + scheduler + .update_action( + &WORKER_ID, + &ActionInfoHashKey { + digest: action_digest2.clone(), + salt: 0, + }, + ActionStage::Completed(action_result.clone()), + ) + .await?; + + { + // Our second client should be notified it completed. + let action_state = client2_rx.borrow_and_update(); + let mut expected_action_state = ActionState { + // Name is a random string, so we ignore it and just make it the same. + name: action_state.name.clone(), + action_digest: action_digest2.clone(), + stage: ActionStage::Completed(action_result.clone()), + }; + // We now know the name of the action so populate it. + expected_action_state.name = action_state.name.clone(); + assert_eq!(action_state.as_ref(), &expected_action_state); + } + + Ok(()) + } } diff --git a/cas/scheduler/worker.rs b/cas/scheduler/worker.rs index fa5f77ebd..32384f525 100644 --- a/cas/scheduler/worker.rs +++ b/cas/scheduler/worker.rs @@ -147,6 +147,17 @@ impl Worker { } } } + + pub fn restore_platform_properties(&mut self, props: &PlatformProperties) { + for (property, prop_value) in &props.properties { + if let PlatformPropertyValue::Minimum(value) = prop_value { + let worker_props = &mut self.platform_properties.properties; + if let PlatformPropertyValue::Minimum(worker_value) = worker_props.get_mut(property).unwrap() { + *worker_value += value; + } + } + } + } } impl PartialEq for Worker {