Skip to content

Commit

Permalink
Optionally increase the size of the pipe buffer
Browse files Browse the repository at this point in the history
This commit attempts to address rust-lang/cargo#9739 by optionally
increasing the capacity of pipes created on Linux. This may be required
if the pipe initially starts out with only one page of buffer which
apparently means that normal jobserver usage might cause the pipe to
deadlock where all writers are blocked.
  • Loading branch information
alexcrichton committed Jul 29, 2021
1 parent 5756d02 commit 865b7e3
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
106 changes: 106 additions & 0 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct Acquired {
impl Client {
pub fn new(limit: usize) -> io::Result<Client> {
let client = unsafe { Client::mk()? };
client.configure_capacity(limit)?;
// I don't think the character written here matters, but I could be
// wrong!
for _ in 0..limit {
Expand Down Expand Up @@ -63,6 +64,70 @@ impl Client {
Ok(Client::from_fds(pipes[0], pipes[1]))
}

fn configure_capacity(&self, required_capacity: usize) -> io::Result<()> {
// On Linux we may need to increase the capacity of the pipe for the
// jobserver to work correctly. Linux seems to exhibit behavior where it
// implements a ring-buffer internally but apparently the ring-ness of
// the ring-buffer is connected to *pages* of the ring buffer rather
// than actual bytes of the ring buffer. This means that if the pipe has
// only one page of capacity we can hit a possible deadlock situation
// where a bunch of threads are writing to the pipe but they're all
// blocked, despite the current used capacity of the pipe being less
// than a page.
//
// This was first discovered in rust-lang/cargo#9739 where a system with
// a large amount of concurrency would hang in `cargo build` when the
// jobserver pipe only had one page of capacity. This was reduced to a
// reproduction program [1] which indeed showed that the system would
// deadlock if the capacity of the pipe was just one page.
//
// To fix this issue, on Linux only, we may increase the capacity of the
// pipe. The main thing here is that if the capacity of the pipe is a
// single page we try to increase it to two pages, otherwise we fail
// because a deadlock might happen. While we're at it this goes ahead
// and factors in the `required_capacity` requested by the client to
// this calculation as well. If for some reason you want 10_000 units of
// concurrency in the pipe that means we'll need more than 2 pages
// (typically 8192 bytes), so we round that up to 3 pages as well.
//
// Someone with more understanding of linux pipes and how they buffer
// internally should probably review this at some point. The exact cause
// of the deadlock seems a little uncertain and it's not clear why the
// example program [1] deadlocks and why simply adding another page
// fixes things. Is this a kernel bug? Do we need to always guarantee at
// least one free page? I'm not sure! Hopefully for now this is enough
// to fix the problem until machines start having more than 4k cores,
// which seems like it might be awhile.
//
// [1]: https://github.com/rust-lang/cargo/issues/9739#issuecomment-889183009
#[cfg(target_os = "linux")]
unsafe {
let page_size = libc::sysconf(libc::_SC_PAGESIZE);
let actual_capacity = cvt(libc::fcntl(self.write.as_raw_fd(), libc::F_GETPIPE_SZ))?;

if let Some(c) = calculate_capacity(
required_capacity,
actual_capacity as usize,
page_size as usize,
) {
cvt(libc::fcntl(self.write.as_raw_fd(), libc::F_SETPIPE_SZ, c)).map_err(|e| {
io::Error::new(
e.kind(),
format!(
"failed to increase jobserver pipe capacity from {} to {}; \
jobserver otherwise might deadlock",
actual_capacity, c,
),
)

// ...
})?;
}
}

Ok(())
}

pub unsafe fn open(s: &str) -> Option<Client> {
let mut parts = s.splitn(2, ',');
let read = parts.next().unwrap();
Expand Down Expand Up @@ -337,3 +402,44 @@ extern "C" fn sigusr1_handler(
) {
// nothing to do
}

#[allow(dead_code)]
fn calculate_capacity(
required_capacity: usize,
actual_capacity: usize,
page_size: usize,
) -> Option<usize> {
if actual_capacity < required_capacity {
let mut rounded_capacity = round_up_to(required_capacity, page_size);
if rounded_capacity < page_size * 2 {
rounded_capacity += page_size;
}
return Some(rounded_capacity);
}

if actual_capacity <= page_size {
return Some(page_size * 2);
}

return None;

fn round_up_to(a: usize, b: usize) -> usize {
assert!(b.is_power_of_two());
(a + (b - 1)) & (!(b - 1))
}
}

#[cfg(test)]
mod tests {
use super::calculate_capacity;

#[test]
fn test_calculate_capacity() {
assert_eq!(calculate_capacity(1, 65536, 4096), None);
assert_eq!(calculate_capacity(500, 65536, 4096), None);
assert_eq!(calculate_capacity(5000, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(1, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(4096, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(8192, 4096, 4096), Some(8192));
}
}
22 changes: 22 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,25 @@ fn zero_client() {
assert!(rx.try_recv().is_err());
}
}

#[test]
fn highly_concurrent() {
const N: usize = 10000;

let client = t!(Client::new(80));

let threads = (0..80)
.map(|_| {
let client = client.clone();
std::thread::spawn(move || {
for _ in 0..N {
drop(client.acquire().unwrap());
}
})
})
.collect::<Vec<_>>();

for t in threads {
t.join().unwrap();
}
}

0 comments on commit 865b7e3

Please sign in to comment.