Skip to content

Commit

Permalink
Merge pull request #79 from alexcrichton/handle-eof
Browse files Browse the repository at this point in the history
Add timeouts to cache lookups and handle server eofs
  • Loading branch information
alexcrichton authored Mar 25, 2017
2 parents 57a2139 + b91b81a commit 9a3447f
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 44 deletions.
82 changes: 50 additions & 32 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use number_prefix::{
Prefixed,
Standalone,
};
use protobuf::RepeatedField;
use protobuf::{RepeatedField, ProtobufError};
use protocol::{
CacheStats,
ClientRequest,
Expand Down Expand Up @@ -545,43 +545,61 @@ fn handle_compile_response<T>(mut creator: T,
CompileResponse::CompileStarted(_) => {
debug!("Server sent CompileStarted");
// Wait for CompileFinished.
let mut res = conn.read_one_response().chain_err(|| {
//TODO: something better here?
"error reading compile response from server"
})?;
if res.has_compile_finished() {
trace!("Server sent CompileFinished");
handle_compile_finished(res.take_compile_finished(),
stdout, stderr)
} else {
bail!("unexpected response from server")
match conn.read_one_response() {
Ok(mut res) => {
if res.has_compile_finished() {
trace!("Server sent CompileFinished");
return handle_compile_finished(res.take_compile_finished(),
stdout, stderr)
} else {
bail!("unexpected response from server")
}
}

// Currently the shutdown behavior of the remote sccache server
// is to wait at most N seconds for all active connections to
// finish and then close everything. If we get unlucky and don't
// get a response then we just forge ahead locally and run the
// compilation ourselves.
Err(ProtobufError::IoError(ref e))
if e.kind() == io::ErrorKind::UnexpectedEof => {
writeln!(io::stderr(),
"warning: sccache server looks like it shut down \
unexpectedly, compiling locally instead").unwrap();
}

Err(e) => return Err(e).chain_err(|| {
//TODO: something better here?
"error reading compile response from server"
})
}
}
CompileResponse::UnhandledCompile(_) => {
debug!("Server sent UnhandledCompile");
//TODO: possibly capture output here for testing.
let mut cmd = creator.new_command_sync(exe);
cmd.args(&cmdline)
.current_dir(cwd);
if log_enabled!(Trace) {
trace!("running command: {:?}", cmd);
}
let output = try!(core.run(run_input_output(cmd, None)));
if !output.stdout.is_empty() {
try!(stdout.write_all(&output.stdout));
}
if !output.stderr.is_empty() {
try!(stderr.write_all(&output.stderr));
}
Ok(output.status.code().unwrap_or_else(|| {
if let Some(sig) = status_signal(output.status) {
println!("Compile terminated by signal {}", sig);
}
// Arbitrary.
2
}))
}
};

//TODO: possibly capture output here for testing.
let mut cmd = creator.new_command_sync(exe);
cmd.args(&cmdline)
.current_dir(cwd);
if log_enabled!(Trace) {
trace!("running command: {:?}", cmd);
}
let output = try!(core.run(run_input_output(cmd, None)));
if !output.stdout.is_empty() {
try!(stdout.write_all(&output.stdout));
}
if !output.stderr.is_empty() {
try!(stderr.write_all(&output.stderr));
}
Ok(output.status.code().unwrap_or_else(|| {
if let Some(sig) = status_signal(output.status) {
println!("Compile terminated by signal {}", sig);
}
// Arbitrary.
2
}))
}

/// Send a `Compile` request to the sccache server `conn`, and handle the response.
Expand Down
60 changes: 49 additions & 11 deletions src/compiler/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::time::{
};
use tempdir::TempDir;
use util::fmt_duration_as_secs;
use tokio_core::reactor::{Handle, Timeout};

use errors::*;

Expand Down Expand Up @@ -99,7 +100,8 @@ pub trait CompilerHasher<T>: fmt::Debug + Send + 'static
arguments: Vec<String>,
cwd: String,
cache_control: CacheControl,
pool: CpuPool)
pool: CpuPool,
handle: Handle)
-> SFuture<(CompileResult, process::Output)>
{
let out_file = self.output_file().into_owned();
Expand All @@ -126,6 +128,20 @@ pub trait CompilerHasher<T>: fmt::Debug + Send + 'static
storage.get(&key)
};

// Set a maximum time limit for the cache to respond before we forge
// ahead ourselves with a compilation.
let timeout = Duration::new(60, 0);
let timeout = Timeout::new(timeout, &handle).into_future().flatten();

let cache_status = cache_status.map(Some);
let timeout = timeout.map(|_| None).chain_err(|| "timeout error");
let cache_status = cache_status.select(timeout).then(|r| {
match r {
Ok((e, _other)) => Ok(e),
Err((e, _other)) => Err(e),
}
});

// Check the result of the cache lookup.
Box::new(cache_status.and_then(move |result| {
let duration = start.elapsed();
Expand All @@ -135,7 +151,7 @@ pub trait CompilerHasher<T>: fmt::Debug + Send + 'static
.collect::<HashMap<_, _>>();

let miss_type = match result {
Cache::Hit(mut entry) => {
Some(Cache::Hit(mut entry)) => {
debug!("[{}]: Cache hit in {}", out_file, fmt_duration_as_secs(&duration));
let mut stdout = io::Cursor::new(vec!());
let mut stderr = io::Cursor::new(vec!());
Expand All @@ -161,14 +177,18 @@ pub trait CompilerHasher<T>: fmt::Debug + Send + 'static
(result, output)
})) as SFuture<_>
}
Cache::Miss => {
Some(Cache::Miss) => {
debug!("[{}]: Cache miss", out_file);
MissType::Normal
}
Cache::Recache => {
Some(Cache::Recache) => {
debug!("[{}]: Cache recache", out_file);
MissType::ForcedRecache
}
None => {
debug!("[{}]: Cache timed out", out_file);
MissType::TimedOut
}
};

// Cache miss, so compile it.
Expand Down Expand Up @@ -296,6 +316,8 @@ pub enum MissType {
Normal,
/// Cache lookup was overridden, recompilation was forced.
ForcedRecache,
/// Cache took too long to respond.
TimedOut,
}

/// Information about a successful cache write.
Expand Down Expand Up @@ -619,6 +641,7 @@ mod test {
use std::time::Duration;
use std::usize;
use test::utils::*;
use tokio_core::reactor::Core;

#[test]
fn test_detect_compiler_kind_gcc() {
Expand Down Expand Up @@ -717,6 +740,8 @@ mod test {
let creator = new_creator();
let f = TestFixture::new();
let pool = CpuPool::new(1);
let core = Core::new().unwrap();
let handle = core.handle();
let storage = DiskCache::new(&f.tempdir.path().join("cache"),
usize::MAX,
&pool);
Expand Down Expand Up @@ -753,7 +778,8 @@ mod test {
arguments.clone(),
cwd.clone(),
CacheControl::Default,
pool.clone()).wait().unwrap();
pool.clone(),
handle.clone()).wait().unwrap();
// Ensure that the object file was created.
assert_eq!(true, fs::metadata(&obj).and_then(|m| Ok(m.len() > 0)).unwrap());
match cached {
Expand All @@ -776,7 +802,8 @@ mod test {
arguments,
cwd,
CacheControl::Default,
pool.clone()).wait().unwrap();
pool.clone(),
handle).wait().unwrap();
// Ensure that the object file was created.
assert_eq!(true, fs::metadata(&obj).and_then(|m| Ok(m.len() > 0)).unwrap());
assert_eq!(CompileResult::CacheHit(Duration::new(0, 0)), cached);
Expand All @@ -792,6 +819,8 @@ mod test {
let creator = new_creator();
let f = TestFixture::new();
let pool = CpuPool::new(1);
let core = Core::new().unwrap();
let handle = core.handle();
let storage = DiskCache::new(&f.tempdir.path().join("cache"),
usize::MAX,
&pool);
Expand Down Expand Up @@ -828,7 +857,8 @@ mod test {
arguments.clone(),
cwd.clone(),
CacheControl::Default,
pool.clone()).wait().unwrap();
pool.clone(),
handle.clone()).wait().unwrap();
// Ensure that the object file was created.
assert_eq!(true, fs::metadata(&obj).and_then(|m| Ok(m.len() > 0)).unwrap());
match cached {
Expand All @@ -852,7 +882,8 @@ mod test {
arguments,
cwd,
CacheControl::Default,
pool).wait().unwrap();
pool,
handle).wait().unwrap();
// Ensure that the object file was created.
assert_eq!(true, fs::metadata(&obj).and_then(|m| Ok(m.len() > 0)).unwrap());
assert_eq!(CompileResult::CacheHit(Duration::new(0, 0)), cached);
Expand All @@ -868,6 +899,8 @@ mod test {
let creator = new_creator();
let f = TestFixture::new();
let pool = CpuPool::new(1);
let core = Core::new().unwrap();
let handle = core.handle();
let storage = DiskCache::new(&f.tempdir.path().join("cache"),
usize::MAX,
&pool);
Expand Down Expand Up @@ -908,7 +941,8 @@ mod test {
arguments.clone(),
cwd.clone(),
CacheControl::Default,
pool.clone()).wait().unwrap();
pool.clone(),
handle.clone()).wait().unwrap();
// Ensure that the object file was created.
assert_eq!(true, fs::metadata(&obj).and_then(|m| Ok(m.len() > 0)).unwrap());
match cached {
Expand All @@ -928,7 +962,8 @@ mod test {
arguments,
cwd,
CacheControl::ForceRecache,
pool).wait().unwrap();
pool,
handle).wait().unwrap();
// Ensure that the object file was created.
assert_eq!(true, fs::metadata(&obj).and_then(|m| Ok(m.len() > 0)).unwrap());
match cached {
Expand All @@ -950,6 +985,8 @@ mod test {
let creator = new_creator();
let f = TestFixture::new();
let pool = CpuPool::new(1);
let core = Core::new().unwrap();
let handle = core.handle();
let storage = DiskCache::new(&f.tempdir.path().join("cache"),
usize::MAX,
&pool);
Expand All @@ -973,7 +1010,8 @@ mod test {
arguments,
cwd,
CacheControl::Default,
pool).wait().unwrap();
pool,
handle).wait().unwrap();
assert_eq!(cached, CompileResult::Error);
assert_eq!(exit_status(1), res.status);
// Shouldn't get anything on stdout, since that would just be preprocessor spew!
Expand Down
25 changes: 24 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,15 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
handle: handle.clone(),
timeout_dur: timeout,
};
let shutdown_idle = shutdown_idle.map(|a| {
info!("shutting down due to being idle");
a
});

let shutdown = shutdown.map(|a| {
info!("shutting down due to explicit signal");
a
});

let server = future::select_all(vec![
Box::new(server) as Box<Future<Item=_, Error=_>>,
Expand All @@ -255,6 +264,9 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
core.run(server)
.map_err(|p| p.0)?;

info!("moving into the shutdown phase now, waiting at most 10 seconds \
for all client requests to complete");

// Once our server has shut down either due to inactivity or a manual
// request we still need to give a bit of time for all active
// connections to finish. This `wait` future will resolve once all
Expand All @@ -265,6 +277,8 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
core.run(wait.select(Timeout::new(Duration::new(10, 0), &handle)?))
.map_err(|p| p.0)?;

info!("ok, fully shutting down now");

Ok(())
}
}
Expand Down Expand Up @@ -543,7 +557,8 @@ impl<C> SccacheService<C>
arguments,
cwd,
cache_control,
self.pool.clone());
self.pool.clone(),
self.handle.clone());
let me = self.clone();
let task = result.then(move |result| {
let mut res = ServerResponse::new();
Expand All @@ -569,6 +584,10 @@ impl<C> SccacheService<C>
stats.cache_misses += 1;
stats.forced_recaches += 1;
}
MissType::TimedOut => {
stats.cache_misses += 1;
stats.cache_timeouts += 1;
}
}
stats.cache_read_miss_duration += duration;
cache_write = Some(future);
Expand Down Expand Up @@ -649,6 +668,8 @@ struct ServerStats {
pub cache_hits: u64,
/// The count of cache misses for handled compile requests.
pub cache_misses: u64,
/// The count of cache misses because the cache took too long to respond.
pub cache_timeouts: u64,
/// The count of compilations which were successful but couldn't be cached.
pub non_cacheable_compilations: u64,
/// The count of compilations which forcibly ignored the cache.
Expand Down Expand Up @@ -678,6 +699,7 @@ impl Default for ServerStats {
cache_errors: u64::default(),
cache_hits: u64::default(),
cache_misses: u64::default(),
cache_timeouts: u64::default(),
non_cacheable_compilations: u64::default(),
forced_recaches: u64::default(),
cache_write_errors: u64::default(),
Expand Down Expand Up @@ -720,6 +742,7 @@ impl ServerStats {
set_stat!(stats_vec, self.requests_executed, "Compile requests executed");
set_stat!(stats_vec, self.cache_hits, "Cache hits");
set_stat!(stats_vec, self.cache_misses, "Cache misses");
set_stat!(stats_vec, self.cache_timeouts, "Cache timeouts");
set_stat!(stats_vec, self.forced_recaches, "Forced recaches");
set_stat!(stats_vec, self.cache_write_errors, "Cache write errors");
set_stat!(stats_vec, self.compile_fails, "Compilation failures");
Expand Down

0 comments on commit 9a3447f

Please sign in to comment.