Skip to content

Commit

Permalink
Remove old actions with no listeners
Browse files Browse the repository at this point in the history
Implement scheduler side removal of actions with no listeners. Adds
disconnect_timeout_s configuration field with default of 60s. If the
client waiting on a given action is disconnected for longer than this
duration without reconnecting the scheduler will stop tracking it. This
does not remove it from the worker if the job has already been
dispatched.

fixes #338
  • Loading branch information
Zach Birenbaum committed Apr 4, 2024
1 parent 6b9e68e commit 2e287e5
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 1 deletion.
6 changes: 6 additions & 0 deletions nativelink-config/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ pub struct SimpleScheduler {
/// The strategy used to assign workers jobs.
#[serde(default)]
pub allocation_strategy: WorkerAllocationStrategy,

/// Remove action from queue after this much time has elapsed without a listener
/// amount of time in seconds.
/// Default: 60 (seconds)
#[serde(default, deserialize_with = "convert_numeric_with_shellexpand")]
pub disconnect_timeout_s: u64,
}

/// A scheduler that simply forwards requests to an upstream scheduler. This
Expand Down
82 changes: 81 additions & 1 deletion nativelink-scheduler/src/simple_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use std::time::{Instant, SystemTime, UNIX_EPOCH};

use async_trait::async_trait;
use futures::Future;
Expand Down Expand Up @@ -57,6 +57,10 @@ const DEFAULT_RETAIN_COMPLETED_FOR_S: u64 = 60;
/// If this changes, remember to change the documentation in the config.
const DEFAULT_MAX_JOB_RETRIES: usize = 3;

/// Default timeout for actions without any listeners
/// If this changes, remember to change the documentation in the config.
const DEFAULT_DISCONNECT_TIMEOUT_S: u64 = 60;

/// An action that is being awaited on and last known state.
struct AwaitedAction {
action_info: Arc<ActionInfo>,
Expand All @@ -71,6 +75,19 @@ struct AwaitedAction {

/// Worker that is currently running this action, None if unassigned.
worker_id: Option<WorkerId>,

/// Updated on every client connect and periodically while it has listeners.
last_update_timestamp: Arc<AtomicU64>,
}

impl AwaitedAction {
pub fn set_last_update_timestamp(&self, timestamp: u64) {
self.last_update_timestamp
.store(timestamp, Ordering::Relaxed);
}
pub fn get_last_update_timestamp(&self) -> u64 {
self.last_update_timestamp.load(Ordering::Relaxed)
}
}

struct Workers {
Expand Down Expand Up @@ -227,6 +244,8 @@ struct SimpleSchedulerImpl {
/// Notify task<->worker matching engine that work needs to be done.
tasks_or_workers_change_notify: Arc<Notify>,
metrics: Arc<Metrics>,
/// How long the server will wait for a client to reconnect before removing the action from the queue.
disconnect_timeout_s: u64,
}

impl SimpleSchedulerImpl {
Expand Down Expand Up @@ -305,6 +324,7 @@ impl SimpleSchedulerImpl {
attempts: 0,
last_error: None,
worker_id: None,
last_update_timestamp: Arc::new(AtomicU64::new(0)),
},
);

Expand Down Expand Up @@ -340,6 +360,20 @@ impl SimpleSchedulerImpl {
.map(Self::subscribe_to_channel)
}

fn set_action_last_update_for_test(
&self,
unique_qualifier: &ActionInfoHashKey,
timestamp: u64,
) {
let awaited_action = self
.queued_actions_set
.get(unique_qualifier)
.and_then(|action_info| self.queued_actions.get(action_info))
.or_else(|| self.active_actions.get(unique_qualifier))
.expect("Could not find action");
awaited_action.set_last_update_timestamp(timestamp)
}

fn retry_action(&mut self, action_info: &Arc<ActionInfo>, worker_id: &WorkerId, err: Error) {
match self.active_actions.remove(action_info) {
Some(running_action) => {
Expand Down Expand Up @@ -434,6 +468,22 @@ impl SimpleSchedulerImpl {
let action_infos: Vec<Arc<ActionInfo>> =
self.queued_actions.keys().rev().cloned().collect();
for action_info in action_infos {
// add update to queued action update timestamp here
let action = self.queued_actions.get_mut(&action_info).unwrap();
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
if action.notify_channel.receiver_count() > 0 {
action.set_last_update_timestamp(now);
} else if action.get_last_update_timestamp() + self.disconnect_timeout_s < now {
warn!(
"Client disconnect timeout elapsed - Removing action with digest hash {}",
action_info.unique_qualifier.digest.hash_str()
);
self.queued_actions_set.remove(&action_info);
self.queued_actions.remove(&action_info);
}
let Some(awaited_action) = self.queued_actions.get(action_info.as_ref()) else {
error!(
"queued_actions out of sync with itself for action {}",
Expand Down Expand Up @@ -490,6 +540,21 @@ impl SimpleSchedulerImpl {
awaited_action.attempts += 1;
self.active_actions.insert(action_info, awaited_action);
}

let mut remove_actions = Vec::new();
for running_action in self.active_actions.values() {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
if running_action.notify_channel.receiver_count() > 0 {
running_action.set_last_update_timestamp(now);
} else if running_action.get_last_update_timestamp() + self.disconnect_timeout_s < now {
remove_actions.push(running_action.action_info.clone())
}
}
self.active_actions
.retain(|x, _| !remove_actions.contains(x));
}

fn update_action_with_internal_error(
Expand Down Expand Up @@ -688,6 +753,11 @@ impl SimpleScheduler {
max_job_retries = DEFAULT_MAX_JOB_RETRIES;
}

let mut disconnect_timeout_s = scheduler_cfg.disconnect_timeout_s;
if disconnect_timeout_s == 0 {
disconnect_timeout_s = DEFAULT_DISCONNECT_TIMEOUT_S;
}

let tasks_or_workers_change_notify = Arc::new(Notify::new());

let metrics = Arc::new(Metrics::default());
Expand All @@ -703,6 +773,7 @@ impl SimpleScheduler {
max_job_retries,
tasks_or_workers_change_notify: tasks_or_workers_change_notify.clone(),
metrics: metrics.clone(),
disconnect_timeout_s,
}));
let weak_inner = Arc::downgrade(&inner);
Self {
Expand Down Expand Up @@ -735,6 +806,15 @@ impl SimpleScheduler {
}
}

pub fn set_action_last_update_for_test(
&self,
unique_qualifier: &ActionInfoHashKey,
timestamp: u64,
) {
let inner = self.get_inner_lock();
inner.set_action_last_update_for_test(unique_qualifier, timestamp)
}

/// Checks to see if the worker exists in the worker pool. Should only be used in unit tests.
#[must_use]
pub fn contains_worker_for_test(&self, worker_id: &WorkerId) -> bool {
Expand Down
75 changes: 75 additions & 0 deletions nativelink-scheduler/tests/simple_scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1602,4 +1602,79 @@ mod scheduler_tests {

Ok(())
}

#[tokio::test]
async fn ensure_actions_with_disconnected_clients_are_dropped() -> Result<(), Error> {
const WORKER_ID: WorkerId = WorkerId(0x1234_5678_9111);
const DISCONNECT_TIMEOUT_S: u64 = 1;

let scheduler = SimpleScheduler::new_with_callback(
&nativelink_config::schedulers::SimpleScheduler {
disconnect_timeout_s: DISCONNECT_TIMEOUT_S,
..Default::default()
},
|| async move {},
);
let action1_digest = DigestInfo::new([98u8; 32], 512);
let action2_digest = DigestInfo::new([99u8; 32], 512);

let mut rx_from_worker =
setup_new_worker(&scheduler, WORKER_ID, PlatformProperties::default()).await?;
let insert_timestamp = make_system_time(1);

let client_rx = setup_action(
&scheduler,
action1_digest,
PlatformProperties::default(),
insert_timestamp,
)
.await?;

// Drop our receiver.
let unique_qualifier = client_rx.borrow().unique_qualifier.clone();
drop(client_rx);

// Allow task<->worker matcher to run.
tokio::task::yield_now().await;

// Set last update timestamp to be older than DISCONNECT_TIMEOUT_S.
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
- (DISCONNECT_TIMEOUT_S + 1);
scheduler.set_action_last_update_for_test(&unique_qualifier, timestamp);

{
// Other tests check full data. We only care if we got StartAction.
match rx_from_worker.recv().await.unwrap().update {
Some(update_for_worker::Update::StartAction(_)) => { /* Success */ }
v => panic!("Expected StartAction, got : {v:?}"),
}
}

// Setup a second action so matching engine is scheduled to rerun.
let client_rx = setup_action(
&scheduler,
action2_digest,
PlatformProperties::default(),
insert_timestamp,
)
.await?;
drop(client_rx);

// Allow task<->worker matcher to run.
tokio::task::yield_now().await;

// Check to make sure that the action was removed.
assert!(
scheduler
.find_existing_action(&unique_qualifier)
.await
.is_none(),
"Expected action to be removed"
);

Ok(())
}
}

0 comments on commit 2e287e5

Please sign in to comment.