Skip to content

Commit

Permalink
Fix scheduler so platform properties are properly restored
Browse files Browse the repository at this point in the history
There was a bug where platform properties were not being restored
on tasks after they were completed and worker<->task match making
was not being triggered on completed tasks.
  • Loading branch information
allada committed Jun 17, 2022
1 parent bb010f2 commit 059b0ef
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 2 deletions.
21 changes: 19 additions & 2 deletions cas/scheduler/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ impl SchedulerImpl {
return Err(make_input_err!("{}", msg));
}

let did_complete = match action_stage {
ActionStage::Completed(_) => true,
_ => false,
};

Arc::make_mut(&mut running_action.action.current_state).stage = action_stage;

let send_result = running_action
Expand All @@ -355,6 +360,16 @@ impl SchedulerImpl {
);
}

if did_complete {
let worker = self
.workers
.workers
.get_mut(worker_id)
.ok_or_else(|| make_input_err!("WorkerId '{}' does not exist in workers map", worker_id))?;
worker.restore_platform_properties(&action_info.platform_properties);
self.do_try_match();
}

// TODO(allada) We should probably hold a small queue of recent actions for debugging.
// Right now it will drop the action which also disconnects listeners here.
Ok(())
Expand Down Expand Up @@ -469,11 +484,13 @@ impl Scheduler {
}
})
.collect();
for worker_id in worker_ids_to_remove {
for worker_id in worker_ids_to_remove.as_slice() {
log::warn!("Worker {} timed out, removing from pool", worker_id);
inner.immediate_evict_worker(&worker_id);
}
inner.do_try_match();
if !worker_ids_to_remove.is_empty() {
inner.do_try_match();
}
Ok(())
}

Expand Down
125 changes: 125 additions & 0 deletions cas/scheduler/tests/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,4 +709,129 @@ mod scheduler_tests {

Ok(())
}

/// This tests to ensure that platform property restrictions allow jobs to continue to run after
/// a job finished on a specific worker (eg: restore platform properties).
#[tokio::test]
async fn run_two_jobs_on_same_worker_with_platform_properties_restrictions() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x123456789111);

let scheduler = Scheduler::new(&SchedulerConfig::default());
let action_digest1 = DigestInfo::new([11u8; 32], 512);
let action_digest2 = DigestInfo::new([99u8; 32], 512);

let mut properties = HashMap::new();
properties.insert("prop1".to_string(), PlatformPropertyValue::Minimum(1));
let platform_properties = PlatformProperties { properties };
let mut rx_from_worker = setup_new_worker(&scheduler, WORKER_ID, platform_properties.clone()).await?;
let mut client1_rx = setup_action(&scheduler, action_digest1.clone(), platform_properties.clone()).await?;
let mut client2_rx = setup_action(&scheduler, action_digest2.clone(), platform_properties).await?;

match rx_from_worker.recv().await.unwrap().update {
Some(update_for_worker::Update::StartAction(_)) => { /* Success */ }
v => assert!(false, "Expected StartAction, got : {:?}", v),
}
{
// First client should be in an Executing state.
assert_eq!(client1_rx.borrow_and_update().stage, ActionStage::Executing);
// Second client should be in a queued state.
assert_eq!(client2_rx.borrow_and_update().stage, ActionStage::Queued);
}

let action_result = ActionResult {
output_files: Default::default(),
output_folders: Default::default(),
output_file_symlinks: Default::default(),
output_directory_symlinks: Default::default(),
exit_code: 0,
stdout_digest: DigestInfo::new([6u8; 32], 19),
stderr_digest: DigestInfo::new([7u8; 32], 20),
execution_metadata: ExecutionMetadata {
worker: WORKER_ID.to_string(),
queued_timestamp: make_system_time(5),
worker_start_timestamp: make_system_time(6),
worker_completed_timestamp: make_system_time(7),
input_fetch_start_timestamp: make_system_time(8),
input_fetch_completed_timestamp: make_system_time(9),
execution_start_timestamp: make_system_time(10),
execution_completed_timestamp: make_system_time(11),
output_upload_start_timestamp: make_system_time(12),
output_upload_completed_timestamp: make_system_time(13),
},
server_logs: Default::default(),
};

// Tell scheduler our first task is completed.
scheduler
.update_action(
&WORKER_ID,
&ActionInfoHashKey {
digest: action_digest1.clone(),
salt: 0,
},
ActionStage::Completed(action_result.clone()),
)
.await?;

// Ensure client did not get notified.
assert!(
client1_rx.changed().await.is_ok(),
"Client should have been notified of event"
);

{
// First action should now be completed.
let action_state = client1_rx.borrow_and_update();
let mut expected_action_state = ActionState {
// Name is a random string, so we ignore it and just make it the same.
name: action_state.name.clone(),
action_digest: action_digest1.clone(),
stage: ActionStage::Completed(action_result.clone()),
};
// We now know the name of the action so populate it.
expected_action_state.name = action_state.name.clone();
assert_eq!(action_state.as_ref(), &expected_action_state);
}

// At this stage it should have added back any platform_properties and the next
// task should be executing on the same worker.

{
// Our second client should now executing.
match rx_from_worker.recv().await.unwrap().update {
Some(update_for_worker::Update::StartAction(_)) => { /* Success */ }
v => assert!(false, "Expected StartAction, got : {:?}", v),
}
// Other tests check full data. We only care if client thinks we are Executing.
assert_eq!(client2_rx.borrow_and_update().stage, ActionStage::Executing);
}

// Tell scheduler our second task is completed.
scheduler
.update_action(
&WORKER_ID,
&ActionInfoHashKey {
digest: action_digest2.clone(),
salt: 0,
},
ActionStage::Completed(action_result.clone()),
)
.await?;

{
// Our second client should be notified it completed.
let action_state = client2_rx.borrow_and_update();
let mut expected_action_state = ActionState {
// Name is a random string, so we ignore it and just make it the same.
name: action_state.name.clone(),
action_digest: action_digest2.clone(),
stage: ActionStage::Completed(action_result.clone()),
};
// We now know the name of the action so populate it.
expected_action_state.name = action_state.name.clone();
assert_eq!(action_state.as_ref(), &expected_action_state);
}

Ok(())
}
}
11 changes: 11 additions & 0 deletions cas/scheduler/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ impl Worker {
}
}
}

pub fn restore_platform_properties(&mut self, props: &PlatformProperties) {
for (property, prop_value) in &props.properties {
if let PlatformPropertyValue::Minimum(value) = prop_value {
let worker_props = &mut self.platform_properties.properties;
if let PlatformPropertyValue::Minimum(worker_value) = worker_props.get_mut(property).unwrap() {
*worker_value += value;
}
}
}
}
}

impl PartialEq for Worker {
Expand Down

0 comments on commit 059b0ef

Please sign in to comment.