From 5926023f7e6fff92ee9fc7ef7cd7e36502792b81 Mon Sep 17 00:00:00 2001 From: Josh Stone Date: Tue, 16 Jan 2018 17:25:26 -0800 Subject: [PATCH] Stream the parallel xz/gz tarball generation This melds the serial-`Tee` and parallel-batched approaches from before and after commit adea17e. Now we can get the same multithreaded speedup without having to build the entire uncompressed tarball in memory first. The new `impl Write for RayonTee` uses `rayon::join` to split the compression work for each buffer to separate threads. This is scoped, so it can be fully zero-copy, sharing the input buffer directly. This is all wrapped in a 1 MiB `BufWriter` to balance the cost of thread wake-ups and synchronization. The net performance is unchanged, using around 125% CPU -- approximately 4:1 time spent in xz versus gz. The overall memory use is much reduced, now independent of the tarball size -- just a few MiB on top of the fixed-cost 674 MiB compressor memory requirements of `xz -9`. --- Cargo.toml | 3 +- src/lib.rs | 1 + src/tarballer.rs | 89 ++++++++++++++++++++++++++++++------------------ 3 files changed, 58 insertions(+), 35 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2535744..050e222 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,9 +11,10 @@ path = "src/main.rs" [dependencies] error-chain = "0.11.0" flate2 = "1.0.1" +rayon = "0.9" tar = "0.4.13" walkdir = "1.0.7" -xz2 = "0.1.3" +xz2 = "0.1.4" [dependencies.clap] features = ["yaml"] diff --git a/src/lib.rs b/src/lib.rs index b9375df..c8acc26 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ #[macro_use] extern crate error_chain; extern crate flate2; +extern crate rayon; extern crate tar; extern crate walkdir; extern crate xz2; diff --git a/src/tarballer.rs b/src/tarballer.rs index cd6adbe..f7a09fb 100644 --- a/src/tarballer.rs +++ b/src/tarballer.rs @@ -9,13 +9,12 @@ // except according to those terms. use std::fs::File; -use std::io::Write; +use std::io::{self, Write, BufWriter}; use std::path::Path; -use std::sync::Arc; -use std::thread; use flate2; use flate2::write::GzEncoder; +use rayon; use tar::{Builder, Header}; use walkdir::WalkDir; use xz2::write::XzEncoder; @@ -57,41 +56,42 @@ impl Tarballer { .chain_err(|| "failed to collect file paths")?; files.sort_by(|a, b| a.bytes().rev().cmp(b.bytes().rev())); - // Write the tar into both encoded files. We write all directories - // first, so files may be directly created. (see rustup.rs#1092) - let mut builder = Builder::new(Vec::new()); - for path in dirs { - let src = Path::new(&self.work_dir).join(&path); - builder.append_dir(&path, &src) - .chain_err(|| format!("failed to tar dir '{}'", src.display()))?; - } - for path in files { - let src = Path::new(&self.work_dir).join(&path); - let file = open_file(&src)?; - builder.append_data(&mut header(&src, &file)?, &path, &file) - .chain_err(|| format!("failed to tar file '{}'", src.display()))?; - } - let contents = builder.into_inner() - .chain_err(|| "failed to finish writing .tar stream")?; - let contents = Arc::new(contents); - // Prepare the .tar.gz file - let contents2 = contents.clone(); - let t = thread::spawn(move || { - let mut gz = GzEncoder::new(create_new_file(tar_gz)?, - flate2::Compression::best()); - gz.write_all(&contents2).chain_err(|| "failed to write .gz")?; - gz.finish().chain_err(|| "failed to finish .gz") - }); + let gz = GzEncoder::new(create_new_file(tar_gz)?, flate2::Compression::best()); // Prepare the .tar.xz file - let mut xz = XzEncoder::new(create_new_file(tar_xz)?, 9); - xz.write_all(&contents).chain_err(|| "failed to write .xz")?; - xz.finish().chain_err(|| "failed to finish .xz")?; + let xz = XzEncoder::new(create_new_file(tar_xz)?, 9); - t.join().unwrap()?; - - Ok(()) + // Write the tar into both encoded files. We write all directories + // first, so files may be directly created. (see rustup.rs#1092) + let tee = RayonTee(xz, gz); + let buf = BufWriter::with_capacity(1024 * 1024, tee); + let mut builder = Builder::new(buf); + + let pool = rayon::Configuration::new().num_threads(2).build().unwrap(); + pool.install(move || { + for path in dirs { + let src = Path::new(&self.work_dir).join(&path); + builder.append_dir(&path, &src) + .chain_err(|| format!("failed to tar dir '{}'", src.display()))?; + } + for path in files { + let src = Path::new(&self.work_dir).join(&path); + let file = open_file(&src)?; + builder.append_data(&mut header(&src, &file)?, &path, &file) + .chain_err(|| format!("failed to tar file '{}'", src.display()))?; + } + let RayonTee(xz, gz) = builder.into_inner() + .chain_err(|| "failed to finish writing .tar stream")? + .into_inner().ok().unwrap(); + + // Finish both encoded files + let (rxz, rgz) = rayon::join( + || xz.finish().chain_err(|| "failed to finish .tar.xz file"), + || gz.finish().chain_err(|| "failed to finish .tar.gz file"), + ); + rxz.and(rgz).and(Ok(())) + }) } } @@ -138,3 +138,24 @@ fn get_recursive_paths(root: P, name: Q) -> Result<(Vec, Vec(A, B); + +impl Write for RayonTee { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.write_all(buf)?; + Ok(buf.len()) + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + let (a, b) = (&mut self.0, &mut self.1); + let (ra, rb) = rayon::join(|| a.write_all(buf), || b.write_all(buf)); + ra.and(rb) + } + + fn flush(&mut self) -> io::Result<()> { + let (a, b) = (&mut self.0, &mut self.1); + let (ra, rb) = rayon::join(|| a.flush(), || b.flush()); + ra.and(rb) + } +}