Skip to content

Commit

Permalink
Merge pull request #76 from cuviper/rayon
Browse files Browse the repository at this point in the history
Stream the parallel xz/gz tarball generation
  • Loading branch information
alexcrichton authored Jan 17, 2018
2 parents 745a020 + 5926023 commit b55e0fc
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 35 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ path = "src/main.rs"
[dependencies]
error-chain = "0.11.0"
flate2 = "1.0.1"
rayon = "0.9"
tar = "0.4.13"
walkdir = "1.0.7"
xz2 = "0.1.3"
xz2 = "0.1.4"

[dependencies.clap]
features = ["yaml"]
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#[macro_use]
extern crate error_chain;
extern crate flate2;
extern crate rayon;
extern crate tar;
extern crate walkdir;
extern crate xz2;
Expand Down
89 changes: 55 additions & 34 deletions src/tarballer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
// except according to those terms.

use std::fs::File;
use std::io::Write;
use std::io::{self, Write, BufWriter};
use std::path::Path;
use std::sync::Arc;
use std::thread;

use flate2;
use flate2::write::GzEncoder;
use rayon;
use tar::{Builder, Header};
use walkdir::WalkDir;
use xz2::write::XzEncoder;
Expand Down Expand Up @@ -57,41 +56,42 @@ impl Tarballer {
.chain_err(|| "failed to collect file paths")?;
files.sort_by(|a, b| a.bytes().rev().cmp(b.bytes().rev()));

// Write the tar into both encoded files. We write all directories
// first, so files may be directly created. (see rustup.rs#1092)
let mut builder = Builder::new(Vec::new());
for path in dirs {
let src = Path::new(&self.work_dir).join(&path);
builder.append_dir(&path, &src)
.chain_err(|| format!("failed to tar dir '{}'", src.display()))?;
}
for path in files {
let src = Path::new(&self.work_dir).join(&path);
let file = open_file(&src)?;
builder.append_data(&mut header(&src, &file)?, &path, &file)
.chain_err(|| format!("failed to tar file '{}'", src.display()))?;
}
let contents = builder.into_inner()
.chain_err(|| "failed to finish writing .tar stream")?;
let contents = Arc::new(contents);

// Prepare the .tar.gz file
let contents2 = contents.clone();
let t = thread::spawn(move || {
let mut gz = GzEncoder::new(create_new_file(tar_gz)?,
flate2::Compression::best());
gz.write_all(&contents2).chain_err(|| "failed to write .gz")?;
gz.finish().chain_err(|| "failed to finish .gz")
});
let gz = GzEncoder::new(create_new_file(tar_gz)?, flate2::Compression::best());

// Prepare the .tar.xz file
let mut xz = XzEncoder::new(create_new_file(tar_xz)?, 9);
xz.write_all(&contents).chain_err(|| "failed to write .xz")?;
xz.finish().chain_err(|| "failed to finish .xz")?;
let xz = XzEncoder::new(create_new_file(tar_xz)?, 9);

t.join().unwrap()?;

Ok(())
// Write the tar into both encoded files. We write all directories
// first, so files may be directly created. (see rustup.rs#1092)
let tee = RayonTee(xz, gz);
let buf = BufWriter::with_capacity(1024 * 1024, tee);
let mut builder = Builder::new(buf);

let pool = rayon::Configuration::new().num_threads(2).build().unwrap();
pool.install(move || {
for path in dirs {
let src = Path::new(&self.work_dir).join(&path);
builder.append_dir(&path, &src)
.chain_err(|| format!("failed to tar dir '{}'", src.display()))?;
}
for path in files {
let src = Path::new(&self.work_dir).join(&path);
let file = open_file(&src)?;
builder.append_data(&mut header(&src, &file)?, &path, &file)
.chain_err(|| format!("failed to tar file '{}'", src.display()))?;
}
let RayonTee(xz, gz) = builder.into_inner()
.chain_err(|| "failed to finish writing .tar stream")?
.into_inner().ok().unwrap();

// Finish both encoded files
let (rxz, rgz) = rayon::join(
|| xz.finish().chain_err(|| "failed to finish .tar.xz file"),
|| gz.finish().chain_err(|| "failed to finish .tar.gz file"),
);
rxz.and(rgz).and(Ok(()))
})
}
}

Expand Down Expand Up @@ -138,3 +138,24 @@ fn get_recursive_paths<P, Q>(root: P, name: Q) -> Result<(Vec<String>, Vec<Strin
}
Ok((dirs, files))
}

struct RayonTee<A, B>(A, B);

impl<A: Write + Send, B: Write + Send> Write for RayonTee<A, B> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_all(buf)?;
Ok(buf.len())
}

fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
let (a, b) = (&mut self.0, &mut self.1);
let (ra, rb) = rayon::join(|| a.write_all(buf), || b.write_all(buf));
ra.and(rb)
}

fn flush(&mut self) -> io::Result<()> {
let (a, b) = (&mut self.0, &mut self.1);
let (ra, rb) = rayon::join(|| a.flush(), || b.flush());
ra.and(rb)
}
}

0 comments on commit b55e0fc

Please sign in to comment.