Skip to content

Commit

Permalink
Merge pull request #34 from alexcrichton/fix-linux-deadlock
Browse files Browse the repository at this point in the history
Optionally increase the size of the pipe buffer
  • Loading branch information
alexcrichton authored Aug 4, 2021
2 parents 5756d02 + 865b7e3 commit 8bfabd1
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
106 changes: 106 additions & 0 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct Acquired {
impl Client {
pub fn new(limit: usize) -> io::Result<Client> {
let client = unsafe { Client::mk()? };
client.configure_capacity(limit)?;
// I don't think the character written here matters, but I could be
// wrong!
for _ in 0..limit {
Expand Down Expand Up @@ -63,6 +64,70 @@ impl Client {
Ok(Client::from_fds(pipes[0], pipes[1]))
}

fn configure_capacity(&self, required_capacity: usize) -> io::Result<()> {
// On Linux we may need to increase the capacity of the pipe for the
// jobserver to work correctly. Linux seems to exhibit behavior where it
// implements a ring-buffer internally but apparently the ring-ness of
// the ring-buffer is connected to *pages* of the ring buffer rather
// than actual bytes of the ring buffer. This means that if the pipe has
// only one page of capacity we can hit a possible deadlock situation
// where a bunch of threads are writing to the pipe but they're all
// blocked, despite the current used capacity of the pipe being less
// than a page.
//
// This was first discovered in rust-lang/cargo#9739 where a system with
// a large amount of concurrency would hang in `cargo build` when the
// jobserver pipe only had one page of capacity. This was reduced to a
// reproduction program [1] which indeed showed that the system would
// deadlock if the capacity of the pipe was just one page.
//
// To fix this issue, on Linux only, we may increase the capacity of the
// pipe. The main thing here is that if the capacity of the pipe is a
// single page we try to increase it to two pages, otherwise we fail
// because a deadlock might happen. While we're at it this goes ahead
// and factors in the `required_capacity` requested by the client to
// this calculation as well. If for some reason you want 10_000 units of
// concurrency in the pipe that means we'll need more than 2 pages
// (typically 8192 bytes), so we round that up to 3 pages as well.
//
// Someone with more understanding of linux pipes and how they buffer
// internally should probably review this at some point. The exact cause
// of the deadlock seems a little uncertain and it's not clear why the
// example program [1] deadlocks and why simply adding another page
// fixes things. Is this a kernel bug? Do we need to always guarantee at
// least one free page? I'm not sure! Hopefully for now this is enough
// to fix the problem until machines start having more than 4k cores,
// which seems like it might be awhile.
//
// [1]: https://github.com/rust-lang/cargo/issues/9739#issuecomment-889183009
#[cfg(target_os = "linux")]
unsafe {
let page_size = libc::sysconf(libc::_SC_PAGESIZE);
let actual_capacity = cvt(libc::fcntl(self.write.as_raw_fd(), libc::F_GETPIPE_SZ))?;

if let Some(c) = calculate_capacity(
required_capacity,
actual_capacity as usize,
page_size as usize,
) {
cvt(libc::fcntl(self.write.as_raw_fd(), libc::F_SETPIPE_SZ, c)).map_err(|e| {
io::Error::new(
e.kind(),
format!(
"failed to increase jobserver pipe capacity from {} to {}; \
jobserver otherwise might deadlock",
actual_capacity, c,
),
)

// ...
})?;
}
}

Ok(())
}

pub unsafe fn open(s: &str) -> Option<Client> {
let mut parts = s.splitn(2, ',');
let read = parts.next().unwrap();
Expand Down Expand Up @@ -337,3 +402,44 @@ extern "C" fn sigusr1_handler(
) {
// nothing to do
}

#[allow(dead_code)]
fn calculate_capacity(
required_capacity: usize,
actual_capacity: usize,
page_size: usize,
) -> Option<usize> {
if actual_capacity < required_capacity {
let mut rounded_capacity = round_up_to(required_capacity, page_size);
if rounded_capacity < page_size * 2 {
rounded_capacity += page_size;
}
return Some(rounded_capacity);
}

if actual_capacity <= page_size {
return Some(page_size * 2);
}

return None;

fn round_up_to(a: usize, b: usize) -> usize {
assert!(b.is_power_of_two());
(a + (b - 1)) & (!(b - 1))
}
}

#[cfg(test)]
mod tests {
use super::calculate_capacity;

#[test]
fn test_calculate_capacity() {
assert_eq!(calculate_capacity(1, 65536, 4096), None);
assert_eq!(calculate_capacity(500, 65536, 4096), None);
assert_eq!(calculate_capacity(5000, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(1, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(4096, 4096, 4096), Some(8192));
assert_eq!(calculate_capacity(8192, 4096, 4096), Some(8192));
}
}
22 changes: 22 additions & 0 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,25 @@ fn zero_client() {
assert!(rx.try_recv().is_err());
}
}

#[test]
fn highly_concurrent() {
const N: usize = 10000;

let client = t!(Client::new(80));

let threads = (0..80)
.map(|_| {
let client = client.clone();
std::thread::spawn(move || {
for _ in 0..N {
drop(client.acquire().unwrap());
}
})
})
.collect::<Vec<_>>();

for t in threads {
t.join().unwrap();
}
}

0 comments on commit 8bfabd1

Please sign in to comment.