Skip to content

Commit

Permalink
Implement WorkerStateManager for simple scheduler
Browse files Browse the repository at this point in the history
Implementation of `WorkerStateManager` where the beginnings of being able to
tuck mutations behind a trait for `StateManager`. `StateManager` now wraps
the `StateManagerImpl` for inner state structure `StateManagerImpl`.
All mutations should be done on a lock on `inner` in the future.
Moving implementation code of `update_action` into `WorkerStateManager` trait.
  • Loading branch information
Adam Singer committed Jun 24, 2024
1 parent 97abbcd commit 57154a3
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 161 deletions.
2 changes: 1 addition & 1 deletion nativelink-scheduler/src/operation_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub trait WorkerStateManager {
/// did not change with a modified timestamp in order to prevent
/// the operation from being considered stale and being rescheduled.
async fn update_operation(
&self,
&mut self,
operation_id: OperationId,
worker_id: WorkerId,
action_stage: Result<ActionStage, Error>,
Expand Down
67 changes: 49 additions & 18 deletions nativelink-scheduler/src/scheduler_state/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,58 @@ pub(crate) struct Metrics {
pub(crate) add_action_joined_running_action: CounterWithTime,
pub(crate) add_action_joined_queued_action: CounterWithTime,
pub(crate) add_action_new_action_created: CounterWithTime,
pub(crate) update_action_missing_action_result: CounterWithTime,
pub(crate) update_action_from_wrong_worker: CounterWithTime,
pub(crate) update_action_no_more_listeners: CounterWithTime,
pub(crate) workers_evicted: CounterWithTime,
pub(crate) workers_evicted_with_running_action: CounterWithTime,
pub(crate) retry_action: CounterWithTime,
pub(crate) retry_action_max_attempts_reached: CounterWithTime,
pub(crate) retry_action_no_more_listeners: CounterWithTime,
pub(crate) retry_action_but_action_missing: CounterWithTime,
}

impl Metrics {
pub fn gather_metrics(&self, c: &mut CollectorState) {
c.publish_with_labels(
"add_action",
&self.add_action_joined_running_action,
"Stats about add_action().",
vec![("result".into(), "joined_running_action".into())],
);
c.publish_with_labels(
"add_action",
&self.add_action_joined_queued_action,
"Stats about add_action().",
vec![("result".into(), "joined_queued_action".into())],
);
c.publish_with_labels(
"add_action",
&self.add_action_new_action_created,
"Stats about add_action().",
vec![("result".into(), "new_action_created".into())],
);
{
c.publish_with_labels(
"add_action",
&self.add_action_joined_running_action,
"Stats about add_action().",
vec![("result".into(), "joined_running_action".into())],
);
c.publish_with_labels(
"add_action",
&self.add_action_joined_queued_action,
"Stats about add_action().",
vec![("result".into(), "joined_queued_action".into())],
);
c.publish_with_labels(
"add_action",
&self.add_action_new_action_created,
"Stats about add_action().",
vec![("result".into(), "new_action_created".into())],
);
}
{
c.publish_with_labels(
"update_action_errors",
&self.update_action_missing_action_result,
"Stats about errors when worker sends update_action() to scheduler. These errors are not complete, just the most common.",
vec![("result".into(), "missing_action_result".into())],
);
c.publish_with_labels(
"update_action_errors",
&self.update_action_from_wrong_worker,
"Stats about errors when worker sends update_action() to scheduler. These errors are not complete, just the most common.",
vec![("result".into(), "from_wrong_worker".into())],
);
c.publish_with_labels(
"update_action_errors",
&self.update_action_no_more_listeners,
"Stats about errors when worker sends update_action() to scheduler. These errors are not complete, just the most common.",
vec![("result".into(), "no_more_listeners".into())],
);
}
}
}
229 changes: 226 additions & 3 deletions nativelink-scheduler/src/scheduler_state/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,29 @@
use std::cmp;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::SystemTime;

use async_trait::async_trait;
use futures::stream;
use hashbrown::{HashMap, HashSet};
use nativelink_error::{Error, ResultExt};
use nativelink_util::action_messages::{ActionInfo, ActionStage, ActionState, OperationId};
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_util::action_messages::{
ActionInfo, ActionResult, ActionStage, ActionState, ExecutionMetadata, OperationId, WorkerId,
};
use tokio::sync::watch;
use tokio::sync::watch::error::SendError;
use tracing::{event, Level};

use crate::operation_state_manager::{
ActionStateResult, ActionStateResultStream, ClientStateManager, OperationFilter,
WorkerStateManager,
};
use crate::scheduler_state::awaited_action::AwaitedAction;
use crate::scheduler_state::client_action_state_result::ClientActionStateResult;
use crate::scheduler_state::completed_action::CompletedAction;
use crate::scheduler_state::metrics::Metrics;
use crate::scheduler_state::workers::Workers;
use crate::worker::WorkerUpdate;

#[repr(transparent)]
pub(crate) struct StateManager {
Expand All @@ -45,6 +52,7 @@ impl StateManager {
active_actions: HashMap<Arc<ActionInfo>, AwaitedAction>,
recently_completed_actions: HashSet<CompletedAction>,
metrics: Arc<Metrics>,
max_job_retries: usize,
) -> Self {
Self {
inner: StateManagerImpl {
Expand All @@ -54,6 +62,7 @@ impl StateManager {
active_actions,
recently_completed_actions,
metrics,
max_job_retries,
},
}
}
Expand Down Expand Up @@ -102,6 +111,110 @@ pub(crate) struct StateManagerImpl {
pub(crate) recently_completed_actions: HashSet<CompletedAction>,

pub(crate) metrics: Arc<Metrics>,

/// Default times a job can retry before failing.
pub(crate) max_job_retries: usize,
}

impl StateManager {
/// Modifies the `stage` of `current_state` within `AwaitedAction`. Sends notification channel
/// the new state.
///
///
/// # Discussion
///
/// The use of `Arc::make_mut` is potentially dangerous because it clones the data and
/// invalidates all weak references to it. However, in this context, it is considered
/// safe because the data is going to be re-sent back out. The primary reason for using
/// `Arc` is to reduce the number of copies, not to enforce read-only access. This approach
/// ensures that all downstream components receive the same pointer. If an update occurs
/// while another thread is operating on the data, it is acceptable, since the other thread
/// will receive another update with the new version.
///
fn mutate_stage(
awaited_action: &mut AwaitedAction,
action_stage: ActionStage,
) -> Result<(), SendError<Arc<ActionState>>> {
Arc::make_mut(&mut awaited_action.current_state).stage = action_stage;
awaited_action
.notify_channel
.send(awaited_action.current_state.clone())
}

/// Modifies the `priority` of `action_info` within `ActionInfo`.
///
fn mutate_priority(action_info: &mut Arc<ActionInfo>, priority: i32) {
Arc::make_mut(action_info).priority = priority;
}

/// Evicts the worker from the pool and puts items back into the queue if anything was being executed on it.
fn immediate_evict_worker(&mut self, worker_id: &WorkerId, err: Error) {
if let Some(mut worker) = self.inner.workers.remove_worker(worker_id) {
self.inner.metrics.workers_evicted.inc();
// We don't care if we fail to send message to worker, this is only a best attempt.
let _ = worker.notify_update(WorkerUpdate::Disconnect);
for action_info in worker.running_action_infos.drain() {
self.inner.metrics.workers_evicted_with_running_action.inc();
self.retry_action(&action_info, worker_id, err.clone());
}
}
}

fn retry_action(&mut self, action_info: &Arc<ActionInfo>, worker_id: &WorkerId, err: Error) {
match self.inner.active_actions.remove(action_info) {
Some(running_action) => {
let mut awaited_action: AwaitedAction = running_action;
let send_result = if awaited_action.attempts >= self.inner.max_job_retries {
self.inner.metrics.retry_action_max_attempts_reached.inc();
let action_stage_completed = ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: format!("{worker_id}"),
..ExecutionMetadata::default()
},
error: Some(err.merge(make_err!(
Code::Internal,
"Job cancelled because it attempted to execute too many times and failed"
))),
..ActionResult::default()
});
StateManager::mutate_stage(&mut awaited_action, action_stage_completed)
// Do not put the action back in the queue here, as this action attempted to run too many
// times.
} else {
self.inner.metrics.retry_action.inc();
let send_result =
StateManager::mutate_stage(&mut awaited_action, ActionStage::Queued);
self.inner.queued_actions_set.insert(action_info.clone());
self.inner
.queued_actions
.insert(action_info.clone(), awaited_action);
send_result
};

if send_result.is_err() {
self.inner.metrics.retry_action_no_more_listeners.inc();
// Don't remove this task, instead we keep them around for a bit just in case
// the client disconnected and will reconnect and ask for same job to be executed
// again.
event!(
Level::WARN,
?action_info,
?worker_id,
"Action has no more listeners during evict_worker()"
);
}
}
None => {
self.inner.metrics.retry_action_but_action_missing.inc();
event!(
Level::ERROR,
?action_info,
?worker_id,
"Worker stated it was running an action, but it was not in the active_actions"
);
}
}
}
}

#[async_trait]
Expand Down Expand Up @@ -132,7 +245,7 @@ impl ClientStateManager for StateManager {

// In the event our task is higher priority than the one already scheduled, increase
// the priority of the scheduled one.
Arc::make_mut(&mut arc_action_info).priority = new_priority;
StateManager::mutate_priority(&mut arc_action_info, new_priority);

let result = Arc::new(ClientActionStateResult::new(
queued_action.notify_channel.subscribe(),
Expand Down Expand Up @@ -201,3 +314,113 @@ impl ClientStateManager for StateManager {
Ok(Box::pin(stream::iter(action_result)))
}
}

#[async_trait]
impl WorkerStateManager for StateManager {
async fn update_operation(
&mut self,
operation_id: OperationId,
worker_id: WorkerId,
action_stage: Result<ActionStage, Error>,
) -> Result<(), Error> {
// TODO(adams): action_stage can be sent a error along the worker_api_server code path of
// inner_execution_response. update_operation which is indirectly called by update_action
// needs to support the code/logic that is implemented by update_action_with_internal_error
// here when action_stage is an error.
let action_stage = action_stage.expect("Unimplemented error in update_operation()");
let action_info_hash_key = operation_id.unique_qualifier;
if !action_stage.has_action_result() {
self.inner.metrics.update_action_missing_action_result.inc();
event!(
Level::ERROR,
?action_info_hash_key,
?worker_id,
?action_stage,
"Worker sent error while updating action. Removing worker"
);
let err = make_err!(
Code::Internal,
"Worker '{worker_id}' set the action_stage of running action {action_info_hash_key:?} to {action_stage:?}. Removing worker.",
);
self.immediate_evict_worker(&worker_id, err.clone());
return Err(err);
}

let (action_info, mut running_action) = self
.inner
.active_actions
.remove_entry(&action_info_hash_key)
.err_tip(|| {
format!("Could not find action info in active actions : {action_info_hash_key:?}")
})?;

if running_action.worker_id != Some(worker_id) {
self.inner.metrics.update_action_from_wrong_worker.inc();
let err = match running_action.worker_id {

Some(running_action_worker_id) => make_err!(
Code::Internal,
"Got a result from a worker that should not be running the action, Removing worker. Expected worker {running_action_worker_id} got worker {worker_id}",
),
None => make_err!(
Code::Internal,
"Got a result from a worker that should not be running the action, Removing worker. Expected action to be unassigned got worker {worker_id}",
),
};
event!(
Level::ERROR,
?action_info,
?worker_id,
?running_action.worker_id,
?err,
"Got a result from a worker that should not be running the action, Removing worker"
);
// First put it back in our active_actions or we will drop the task.
self.inner
.active_actions
.insert(action_info, running_action);
self.immediate_evict_worker(&worker_id, err.clone());
return Err(err);
}

let send_result = StateManager::mutate_stage(&mut running_action, action_stage);

if !running_action.current_state.stage.is_finished() {
if send_result.is_err() {
self.inner.metrics.update_action_no_more_listeners.inc();
event!(
Level::WARN,
?action_info,
?worker_id,
"Action has no more listeners during update_action()"
);
}
// If the operation is not finished it means the worker is still working on it, so put it
// back or else we will lose track of the task.
self.inner
.active_actions
.insert(action_info, running_action);
return Ok(());
}

// Keep in case this is asked for soon.
self.inner
.recently_completed_actions
.insert(CompletedAction {
completed_time: SystemTime::now(),
state: running_action.current_state,
});

let worker = self
.inner
.workers
.workers
.get_mut(&worker_id)
.ok_or_else(|| {
make_input_err!("WorkerId '{}' does not exist in workers map", worker_id)
})?;
worker.complete_action(&action_info);

Ok(())
}
}
Loading

0 comments on commit 57154a3

Please sign in to comment.