Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a GNU make jobserver implementation to Cargo #4110

Merged
merged 1 commit into from
Jun 2, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add a GNU make jobserver implementation to Cargo
This commit adds a GNU make jobserver implementation to Cargo, both as a client
of existing jobservers and also a creator of new jobservers. The jobserver is
actually just an IPC semaphore which manifests itself as a pipe with N bytes
of tokens on Unix and a literal IPC semaphore on Windows. The rough protocol
is then if you want to run a job you read acquire the semaphore (read a byte on
Unix or wait on the semaphore on Windows) and then you release it when you're
done.

All the hairy details of the jobserver implementation are housed in the
`jobserver` crate on crates.io instead of Cargo. This should hopefully make it
much easier for the compiler to also share a jobserver implementation
eventually.

The main tricky bit here is that on Unix and Windows acquiring a jobserver token
will block the calling thread. We need to either way for a running job to exit
or to acquire a new token when we want to spawn a new job. To handle this the
current implementation spawns a helper thread that does the blocking and sends a
message back to Cargo when it receives a token. It's a little trickier with
shutting down this thread gracefully as well but more details can be found in
the `jobserver` crate.

Unfortunately crates are unlikely to see an immediate benefit of this once
implemented. Most crates are run with a manual `make -jN` and this overrides the
jobserver in the environment, creating a new jobserver in the sub-make. If the
`-jN` argument is removed, however, then `make` will share Cargo's jobserver and
properly limit parallelism.

Closes #1744
  • Loading branch information
alexcrichton committed Jun 2, 2017
commit cbf25a9b0ae5ac6f5b6da96e645b7fa6a75dc245
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@ fs2 = "0.4"
git2 = "0.6"
git2-curl = "0.7"
glob = "0.2"
jobserver = "0.1.2"
libc = "0.2"
libgit2-sys = "0.6"
log = "0.3"
@@ -37,8 +38,8 @@ rustc-serialize = "0.3"
semver = { version = "0.7.0", features = ["serde"] }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_ignored = "0.0.3"
serde_json = "1.0"
shell-escape = "0.1"
tar = { version = "0.4", default-features = false }
tempdir = "0.3"
1 change: 1 addition & 0 deletions src/cargo/lib.rs
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ extern crate flate2;
extern crate fs2;
extern crate git2;
extern crate glob;
extern crate jobserver;
extern crate libc;
extern crate libgit2_sys;
extern crate num_cpus;
1 change: 1 addition & 0 deletions src/cargo/ops/cargo_clean.rs
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ pub fn clean(ws: &Workspace, opts: &CleanOptions) -> CargoResult<()> {
host_triple: host_triple,
requested_target: opts.target.map(|s| s.to_owned()),
release: opts.release,
jobs: 1,
..BuildConfig::default()
},
profiles)?;
5 changes: 5 additions & 0 deletions src/cargo/ops/cargo_compile.rs
Original file line number Diff line number Diff line change
@@ -639,6 +639,11 @@ fn scrape_build_config(config: &Config,
jobs: Option<u32>,
target: Option<String>)
-> CargoResult<ops::BuildConfig> {
if jobs.is_some() && config.jobserver_from_env().is_some() {
config.shell().warn("a `-j` argument was passed to Cargo but Cargo is \
also configured with an external jobserver in \
its environment, ignoring the `-j` parameter")?;
}
let cfg_jobs = match config.get_i64("build.jobs")? {
Some(v) => {
if v.val <= 0 {
19 changes: 19 additions & 0 deletions src/cargo/ops/cargo_rustc/context.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,8 @@ use std::path::{Path, PathBuf};
use std::str::{self, FromStr};
use std::sync::Arc;

use jobserver::Client;

use core::{Package, PackageId, PackageSet, Resolve, Target, Profile};
use core::{TargetKind, Profiles, Dependency, Workspace};
use core::dependency::Kind as DepKind;
@@ -43,6 +45,7 @@ pub struct Context<'a, 'cfg: 'a> {
pub build_scripts: HashMap<Unit<'a>, Arc<BuildScripts>>,
pub links: Links<'a>,
pub used_in_plugin: HashSet<Unit<'a>>,
pub jobserver: Client,

host: Layout,
target: Option<Layout>,
@@ -94,6 +97,21 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
config.rustc()?.verbose_version.contains("-dev");
let incremental_enabled = incremental_enabled && is_nightly;

// Load up the jobserver that we'll use to manage our parallelism. This
// is the same as the GNU make implementation of a jobserver, and
// intentionally so! It's hoped that we can interact with GNU make and
// all share the same jobserver.
//
// Note that if we don't have a jobserver in our environment then we
// create our own, and we create it with `n-1` tokens because one token
// is ourself, a running process.
let jobserver = match config.jobserver_from_env() {
Some(c) => c.clone(),
None => Client::new(build_config.jobs as usize - 1).chain_err(|| {
"failed to create jobserver"
})?,
};

Ok(Context {
ws: ws,
host: host_layout,
@@ -114,6 +132,7 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
links: Links::new(),
used_in_plugin: HashSet::new(),
incremental_enabled: incremental_enabled,
jobserver: jobserver,
})
}

3 changes: 2 additions & 1 deletion src/cargo/ops/cargo_rustc/custom_build.rs
Original file line number Diff line number Diff line change
@@ -115,7 +115,8 @@ fn build_work<'a, 'cfg>(cx: &mut Context<'a, 'cfg>, unit: &Unit<'a>)
.env("PROFILE", if cx.build_config.release { "release" } else { "debug" })
.env("HOST", cx.host_triple())
.env("RUSTC", &cx.config.rustc()?.path)
.env("RUSTDOC", &*cx.config.rustdoc()?);
.env("RUSTDOC", &*cx.config.rustdoc()?)
.inherit_jobserver(&cx.jobserver);

if let Some(links) = unit.pkg.manifest().links() {
cmd.env("CARGO_MANIFEST_LINKS", links);
132 changes: 92 additions & 40 deletions src/cargo/ops/cargo_rustc/job_queue.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use std::collections::HashSet;
use std::collections::hash_map::HashMap;
use std::fmt;
use std::io::Write;
use std::io::{self, Write};
use std::mem;
use std::sync::mpsc::{channel, Sender, Receiver};

use crossbeam::{self, Scope};
use jobserver::{Acquired, HelperThread};
use term::color::YELLOW;

use core::{PackageId, Target, Profile};
use util::{Config, DependencyQueue, Fresh, Dirty, Freshness};
use util::{CargoResult, ProcessBuilder, profile, internal};
use util::{CargoResult, ProcessBuilder, profile, internal, CargoResultExt};
use {handle_error};

use super::{Context, Kind, Unit};
@@ -21,10 +23,9 @@ use super::job::Job;
/// actual compilation step of each package. Packages enqueue units of work and
/// then later on the entire graph is processed and compiled.
pub struct JobQueue<'a> {
jobs: usize,
queue: DependencyQueue<Key<'a>, Vec<(Job, Freshness)>>,
tx: Sender<(Key<'a>, Message)>,
rx: Receiver<(Key<'a>, Message)>,
tx: Sender<Message<'a>>,
rx: Receiver<Message<'a>>,
active: usize,
pending: HashMap<Key<'a>, PendingBuild>,
compiled: HashSet<&'a PackageId>,
@@ -51,36 +52,35 @@ struct Key<'a> {
}

pub struct JobState<'a> {
tx: Sender<(Key<'a>, Message)>,
key: Key<'a>,
tx: Sender<Message<'a>>,
}

enum Message {
enum Message<'a> {
Run(String),
Stdout(String),
Stderr(String),
Finish(CargoResult<()>),
Token(io::Result<Acquired>),
Finish(Key<'a>, CargoResult<()>),
}

impl<'a> JobState<'a> {
pub fn running(&self, cmd: &ProcessBuilder) {
let _ = self.tx.send((self.key, Message::Run(cmd.to_string())));
let _ = self.tx.send(Message::Run(cmd.to_string()));
}

pub fn stdout(&self, out: &str) {
let _ = self.tx.send((self.key, Message::Stdout(out.to_string())));
let _ = self.tx.send(Message::Stdout(out.to_string()));
}

pub fn stderr(&self, err: &str) {
let _ = self.tx.send((self.key, Message::Stderr(err.to_string())));
let _ = self.tx.send(Message::Stderr(err.to_string()));
}
}

impl<'a> JobQueue<'a> {
pub fn new<'cfg>(cx: &Context<'a, 'cfg>) -> JobQueue<'a> {
let (tx, rx) = channel();
JobQueue {
jobs: cx.jobs() as usize,
queue: DependencyQueue::new(),
tx: tx,
rx: rx,
@@ -113,56 +113,100 @@ impl<'a> JobQueue<'a> {
pub fn execute(&mut self, cx: &mut Context) -> CargoResult<()> {
let _p = profile::start("executing the job graph");

// We need to give a handle to the send half of our message queue to the
// jobserver helper thrad. Unfortunately though we need the handle to be
// `'static` as that's typically what's required when spawning a
// thread!
//
// To work around this we transmute the `Sender` to a static lifetime.
// we're only sending "longer living" messages and we should also
// destroy all references to the channel before this function exits as
// the destructor for the `helper` object will ensure the associated
// thread i sno longer running.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/i sno/is no

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, looks like this transmute is always safe, because Sender is contravariant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I originally thought yeah but actually I don't think so (unfortunately). I believe Drop for Sender may run destructors for items in the internal queue (not received yet), which means that if you persist a Sender beyond the lifetime of the item I think it'll access data outside of its lifetime.

(not in this case though, the Sender should always go away with the stack frame.

//
// As a result, this `transmute` to a longer lifetime should be safe in
// practice.
let tx = self.tx.clone();
let tx = unsafe {
mem::transmute::<Sender<Message<'a>>, Sender<Message<'static>>>(tx)
};
let helper = cx.jobserver.clone().into_helper_thread(move |token| {
drop(tx.send(Message::Token(token)));
}).chain_err(|| {
"failed to create helper thread for jobserver management"
})?;

crossbeam::scope(|scope| {
self.drain_the_queue(cx, scope)
self.drain_the_queue(cx, scope, &helper)
})
}

fn drain_the_queue(&mut self, cx: &mut Context, scope: &Scope<'a>)
fn drain_the_queue(&mut self,
cx: &mut Context,
scope: &Scope<'a>,
jobserver_helper: &HelperThread)
-> CargoResult<()> {
use std::time::Instant;

let mut tokens = Vec::new();
let mut queue = Vec::new();
trace!("queue: {:#?}", self.queue);

// Iteratively execute the entire dependency graph. Each turn of the
// loop starts out by scheduling as much work as possible (up to the
// maximum number of parallel jobs). A local queue is maintained
// separately from the main dependency queue as one dequeue may actually
// dequeue quite a bit of work (e.g. 10 binaries in one project).
// maximum number of parallel jobs we have tokens for). A local queue
// is maintained separately from the main dependency queue as one
// dequeue may actually dequeue quite a bit of work (e.g. 10 binaries
// in one project).
//
// After a job has finished we update our internal state if it was
// successful and otherwise wait for pending work to finish if it failed
// and then immediately return.
let mut error = None;
let start_time = Instant::now();
loop {
while error.is_none() && self.active < self.jobs {
if !queue.is_empty() {
let (key, job, fresh) = queue.remove(0);
self.run(key, fresh, job, cx.config, scope)?;
} else if let Some((fresh, key, jobs)) = self.queue.dequeue() {
let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
f.combine(fresh)
});
self.pending.insert(key, PendingBuild {
amt: jobs.len(),
fresh: total_fresh,
});
queue.extend(jobs.into_iter().map(|(job, f)| {
(key, job, f.combine(fresh))
}));
} else {
break
// Dequeue as much work as we can, learning about everything
// possible that can run. Note that this is also the point where we
// start requesting job tokens. Each job after the first needs to
// request a token.
while let Some((fresh, key, jobs)) = self.queue.dequeue() {
let total_fresh = jobs.iter().fold(fresh, |fresh, &(_, f)| {
f.combine(fresh)
});
self.pending.insert(key, PendingBuild {
amt: jobs.len(),
fresh: total_fresh,
});
for (job, f) in jobs {
queue.push((key, job, f.combine(fresh)));
if self.active + queue.len() > 0 {
jobserver_helper.request_token();
}
}
}

// Now that we've learned of all possible work that we can execute
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
while error.is_none() && self.active < tokens.len() + 1 && !queue.is_empty() {
let (key, job, fresh) = queue.remove(0);
self.run(key, fresh, job, cx.config, scope)?;
}

// If after all that we're not actually running anything then we're
// done!
if self.active == 0 {
break
}

let (key, msg) = self.rx.recv().unwrap();
// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
tokens.truncate(self.active - 1);

match msg {
match self.rx.recv().unwrap() {
Message::Run(cmd) => {
cx.config.shell().verbose(|c| c.status("Running", &cmd))?;
}
@@ -176,9 +220,13 @@ impl<'a> JobQueue<'a> {
writeln!(cx.config.shell().err(), "{}", err)?;
}
}
Message::Finish(result) => {
Message::Finish(key, result) => {
info!("end: {:?}", key);
self.active -= 1;
if self.active > 0 {
assert!(tokens.len() > 0);
drop(tokens.pop());
}
match result {
Ok(()) => self.finish(key, cx)?,
Err(e) => {
@@ -198,6 +246,11 @@ impl<'a> JobQueue<'a> {
}
}
}
Message::Token(acquired_token) => {
tokens.push(acquired_token.chain_err(|| {
"failed to acquire jobserver token"
})?);
}
}
}

@@ -244,9 +297,8 @@ impl<'a> JobQueue<'a> {
scope.spawn(move || {
let res = job.run(fresh, &JobState {
tx: my_tx.clone(),
key: key,
});
my_tx.send((key, Message::Finish(res))).unwrap();
my_tx.send(Message::Finish(key, res)).unwrap();
});

// Print out some nice progress information
Loading