Skip to content

Commit

Permalink
Prune stale servers in the scheduler when handling heartbeat or statu…
Browse files Browse the repository at this point in the history
…s rather than when allocating jobs. Fixes #538
  • Loading branch information
chmanchester committed Nov 6, 2019
1 parent 919e3a2 commit 871bc2a
Showing 1 changed file with 47 additions and 35 deletions.
82 changes: 47 additions & 35 deletions src/bin/sccache-dist/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::io::{self, Write};
use std::net::SocketAddr;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::sync::{Mutex, MutexGuard};
use std::time::{Duration, Instant};
use syslog::Facility;

Expand Down Expand Up @@ -518,6 +518,46 @@ impl Scheduler {
servers: Mutex::new(HashMap::new()),
}
}

fn prune_servers(
&self,
servers: &mut MutexGuard<HashMap<ServerId, ServerDetails>>,
jobs: &mut MutexGuard<BTreeMap<JobId, JobDetail>>,
) {
let now = Instant::now();

let mut dead_servers = Vec::new();

for (&server_id, details) in servers.iter_mut() {
if now.duration_since(details.last_seen) > dist::http::HEARTBEAT_TIMEOUT {
dead_servers.push(server_id);
}
}

for server_id in dead_servers {
warn!(
"Server {} appears to be dead, pruning it in the scheduler",
server_id.addr()
);
let server_details = servers
.remove(&server_id)
.expect("server went missing from map");
for job_id in server_details.jobs_assigned {
warn!(
"Non-terminated job {} was cleaned up in server pruning",
job_id
);
// A job may be missing here if it failed to allocate
// initially, so just warn if it's not present.
if jobs.remove(&job_id).is_none() {
warn!(
"Non-terminated job {} assignment originally failed.",
job_id
);
}
}
}
}
}

impl SchedulerIncoming for Scheduler {
Expand All @@ -529,21 +569,14 @@ impl SchedulerIncoming for Scheduler {
) -> Result<AllocJobResult> {
let (job_id, server_id, auth) = {
// LOCKS
let mut jobs = self.jobs.lock().unwrap();
let mut servers = self.servers.lock().unwrap();

// TODO: NLL would let us simplify this
let mut dead_servers = vec![];
let res = {
let mut best = None;
let mut best_err = None;
let mut best_load: f64 = MAX_PER_CORE_LOAD;
let now = Instant::now();
for (&server_id, details) in servers.iter_mut() {
if now.duration_since(details.last_seen) > dist::http::HEARTBEAT_TIMEOUT {
dead_servers.push(server_id);
continue;
}
let load = details.jobs_assigned.len() as f64 / details.num_cpus as f64;

if let Some(last_error) = details.last_error {
Expand Down Expand Up @@ -613,31 +646,6 @@ impl SchedulerIncoming for Scheduler {
}
};

// We iterated over all servers, prune any that haven't had a heartbeat for too long
for server_id in dead_servers {
warn!(
"Server {} appears to be dead, pruning it in the scheduler",
server_id.addr()
);
let server_details = servers
.remove(&server_id)
.expect("server went missing from map");
for job_id in server_details.jobs_assigned {
warn!(
"Non-terminated job {} was cleaned up in server pruning",
job_id
);
// A job may be missing here if it failed to allocate
// initially, so just warn if it's not present.
if jobs.remove(&job_id).is_none() {
warn!(
"Non-terminated job {} assignment originally failed.",
job_id
);
}
}
}

if let Some(res) = res {
res
} else {
Expand Down Expand Up @@ -706,6 +714,8 @@ impl SchedulerIncoming for Scheduler {
let mut jobs = self.jobs.lock().unwrap();
let mut servers = self.servers.lock().unwrap();

self.prune_servers(&mut servers, &mut jobs);

match servers.get_mut(&server_id) {
Some(ref mut details) if details.server_nonce == server_nonce => {
let now = Instant::now();
Expand Down Expand Up @@ -856,8 +866,10 @@ impl SchedulerIncoming for Scheduler {

fn handle_status(&self) -> Result<SchedulerStatusResult> {
// LOCKS
let jobs = self.jobs.lock().unwrap();
let servers = self.servers.lock().unwrap();
let mut jobs = self.jobs.lock().unwrap();
let mut servers = self.servers.lock().unwrap();

self.prune_servers(&mut servers, &mut jobs);

Ok(SchedulerStatusResult {
num_servers: servers.len(),
Expand Down

0 comments on commit 871bc2a

Please sign in to comment.