diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index be5e562d56a..fbb465b5cc9 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -13,8 +13,13 @@ use crate::utils::utils; use std::collections::HashSet; use std::env; use std::fmt; -use std::io::Read; +use std::fs::OpenOptions; +use std::io::{self, ErrorKind as IOErrorKind, Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::mpsc::{channel, Sender}; + +use tar::EntryType; +use time::precise_time_s; /// The current metadata revision used by rust-installer pub const INSTALLER_VERSION: &str = "3"; @@ -215,8 +220,67 @@ impl<'a> TarPackage<'a> { } } +#[derive(Debug)] +pub enum Kind { + Directory, + File(Vec), +} + +#[derive(Debug)] +pub struct Item { + full_path: PathBuf, + kind: Kind, + start: f64, + finish: f64, + size: Option, + result: io::Result<()>, + mode: u32, +} + trait Unpacker { - fn handle(&mut self, unpacked: tar::Unpacked); + // Unpack a single item to disk. Feedback from the unpacking + // is given via the mpsc completion queue. Possibly for + // single threaded this will need to change - pending test + // outcomes. + fn handle(&mut self, item: Item); + + /// True if the unpacker is ready for more work. Unpackers MUST + /// always be ready for work if they have no in-progress work. + fn ready(&mut self) -> bool; + + // Wrap up any pending operations and close the transmit channel + // so that rx.iter() can be used (and thus a race-free termination). + fn join(&mut self); +} + +#[allow(unused_variables)] +pub fn write_file, C: AsRef<[u8]>>( + path: P, + contents: C, + mode: u32, +) -> io::Result<()> { + let mut opts = OpenOptions::new(); + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + opts.mode(mode); + } + opts.write(true) + .create(true) + .truncate(true) + .open(path.as_ref())? + .write_all(contents.as_ref()) +} + +pub fn create_dir>(path: P) -> io::Result<()> { + match std::fs::create_dir(path.as_ref()) { + Ok(_) => Ok(()), + Err(e) => match e.kind() { + // The optimistic directory creation code creates a race condition. + IOErrorKind::AlreadyExists => Ok(()), + _ => Err(e), + }, + } } mod threadedunpacker { @@ -230,10 +294,14 @@ mod threadedunpacker { n_files: Arc, pool: threadpool::ThreadPool, notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + tx: Option>, } impl<'a> Unpacker<'a> { - pub fn new(notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Self { + pub fn new( + tx: super::Sender, + notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + ) -> Self { // Defaults to hardware thread count threads; this is suitable for // our needs as IO bound operations tend to show up as write latencies // rather than close latencies, so we don't need to look at @@ -245,10 +313,12 @@ mod threadedunpacker { n_files: Arc::new(AtomicUsize::new(0)), pool, notify_handler, + tx: Some(tx), } } pub fn new_with_threads( + tx: super::Sender, notify_handler: Option<&'a dyn Fn(Notification<'_>)>, thread_count: usize, ) -> Self { @@ -264,25 +334,41 @@ mod threadedunpacker { n_files: Arc::new(AtomicUsize::new(0)), pool, notify_handler, + tx: Some(tx), } } } impl<'a> super::Unpacker for Unpacker<'a> { - fn handle(&mut self, unpacked: tar::Unpacked) { - if let tar::Unpacked::File(f) = unpacked { - self.n_files.fetch_add(1, Ordering::Relaxed); - let n_files = self.n_files.clone(); - self.pool.execute(move || { - drop(f); - n_files.fetch_sub(1, Ordering::Relaxed); - }); - } + fn handle(&mut self, mut item: super::Item) { + let tx = self.tx.clone(); + self.n_files.fetch_add(1, Ordering::Relaxed); + let n_files = self.n_files.clone(); + self.pool.execute(move || { + // directories: make them, TODO: register with the dir existence cache. + // Files, write them. + item.result = match item.kind { + super::Kind::Directory => super::create_dir(&item.full_path), + super::Kind::File(ref contents) => { + super::write_file(&item.full_path, &contents, item.mode) + } + }; + n_files.fetch_sub(1, Ordering::Relaxed); + item.finish = super::precise_time_s(); + tx.as_ref() + .unwrap() + .send(item) + .expect("receiver should be listening"); + }) } - } - impl<'a> Drop for Unpacker<'a> { - fn drop(&mut self) { + fn ready(&mut self) -> bool { + // 100 is arbitrary; just a number to stop exhausting fd's on linux + // and still allow rapid decompression to generate work to dispatch + self.pool.queued_count() < 5 + } + + fn join(&mut self) { // Some explanation is in order. Even though the tar we are reading from (if // any) will have had its FileWithProgress download tracking // completed before we hit drop, that is not true if we are unwinding due to a @@ -300,7 +386,7 @@ mod threadedunpacker { self.notify_handler .map(|handler| handler(Notification::DownloadFinished)); self.notify_handler - .map(|handler| handler(Notification::DownloadPushUnits("handles"))); + .map(|handler| handler(Notification::DownloadPushUnits("iops"))); let mut prev_files = self.n_files.load(Ordering::Relaxed); self.notify_handler.map(|handler| { handler(Notification::DownloadContentLengthReceived( @@ -308,7 +394,7 @@ mod threadedunpacker { )) }); if prev_files > 50 { - println!("Closing {} deferred file handles", prev_files); + println!("{} deferred IO operations", prev_files); } let buf: Vec = vec![0; prev_files]; assert!(32767 > prev_files); @@ -328,20 +414,59 @@ mod threadedunpacker { .map(|handler| handler(Notification::DownloadFinished)); self.notify_handler .map(|handler| handler(Notification::DownloadPopUnits)); + // close the feedback channel so that blocking reads on it can + // complete. + self.tx = None; + } + } + + impl<'a> Drop for Unpacker<'a> { + fn drop(&mut self) { + use super::Unpacker; + self.join() } } } mod unpacker { use crate::utils::notifications::Notification; - pub struct Unpacker {} + pub struct Unpacker { + tx: Option>, + } impl Unpacker { - pub fn new<'a>(_notify_handler: Option<&'a dyn Fn(Notification<'_>)>) -> Unpacker { - Unpacker {} + pub fn new<'a>( + tx: super::Sender, + _notify_handler: Option<&'a dyn Fn(Notification<'_>)>, + ) -> Unpacker { + Unpacker { tx: Some(tx) } } } impl super::Unpacker for Unpacker { - fn handle(&mut self, _unpacked: tar::Unpacked) {} + fn handle(&mut self, mut item: super::Item) { + // Trivial single-threaded IO: + // directories: make them, TODO: register with the dir existence cache. + // Files, write them. + item.result = match item.kind { + super::Kind::Directory => super::create_dir(&item.full_path), + super::Kind::File(ref contents) => { + super::write_file(&item.full_path, &contents, item.mode) + } + }; + item.finish = super::precise_time_s(); + self.tx + .as_ref() + .unwrap() + .send(item) + .expect("receiver should be listening"); + } + + fn ready(&mut self) -> bool { + true + } + + fn join(&mut self) { + self.tx = None; + } } } @@ -350,22 +475,22 @@ fn unpack_without_first_dir<'a, R: Read>( path: &Path, notify_handler: Option<&'a dyn Fn(Notification<'_>)>, ) -> Result<()> { + let (tx, rx) = channel(); let mut unpacker : Box = // If this gets lots of use, consider exposing via the config file. if let Ok(thread_str) = env::var("RUSTUP_CLOSE_THREADS") { if thread_str == "disabled" { - Box::new(unpacker::Unpacker::new(notify_handler)) + Box::new(unpacker::Unpacker::new(tx, notify_handler)) } else { if let Ok(thread_count) = thread_str.parse::() { - Box::new(threadedunpacker::Unpacker::new_with_threads(notify_handler, thread_count)) + Box::new(threadedunpacker::Unpacker::new_with_threads(tx, notify_handler, thread_count)) } else { - Box::new(threadedunpacker::Unpacker::new(notify_handler)) + Box::new(threadedunpacker::Unpacker::new(tx, notify_handler)) } } } else { - Box::new(threadedunpacker::Unpacker::new(notify_handler)) - } - ; + Box::new(threadedunpacker::Unpacker::new(tx, notify_handler)) + }; let entries = archive .entries() .chain_err(|| ErrorKind::ExtractingPackage)?; @@ -373,18 +498,104 @@ fn unpack_without_first_dir<'a, R: Read>( for entry in entries { let mut entry = entry.chain_err(|| ErrorKind::ExtractingPackage)?; + // Similarly if the path is anything other than a regular subdir let relpath = { let path = entry.path(); let path = path.chain_err(|| ErrorKind::ExtractingPackage)?; path.into_owned() }; + for part in relpath.components() { + match part { + std::path::Component::Normal(_) => {} + _ => return Err(ErrorKind::BadPath(relpath).into()), + } + } let mut components = relpath.components(); - // Throw away the first path component + // Throw away the first path component: we make our own root components.next(); let full_path = path.join(&components.as_path()); + let size = entry.header().size()?; + if size > 50_000_000 { + return Err(format!("File too big {} {}", relpath.display(), size).into()); + } + // Bail out if we get hard links, device nodes or any other unusual content + // - it is most likely an attack, as rusts cross-platform nature precludes + // such artifacts + let kind = entry.header().entry_type(); + let (kind, size) = match kind { + EntryType::Directory => (Kind::Directory, None), + EntryType::Regular => { + let mut v = Vec::with_capacity(size as usize); + entry.read_to_end(&mut v)?; + let len = v.len(); + (Kind::File(v), Some(len)) + } + _ => return Err(ErrorKind::UnsupportedKind(format!("{:?}", kind)).into()), + }; + + // Now we have a choice: + // - perform some IO in this thread + // - dispatch some or all IO to another thead + // known tradeoffs: + // NFS: network latency incurred on create, chmod, close calls + // WSLv1: Defender latency incurred on close calls; mutex shared with create calls + // Windows: Defender latency incurred on close calls + // Unix: limited open file count + // Defender : CPU limited, so more service points than cores brings no gain. + // Some machines: IO limited, more service points than cores brings more efficient + // Hello world footprint ~350MB, so around 400MB to install is considered ok. + // IO utilisation. + // All systems: dispatching to a thread has some overhead. + // Basic idea then is a locally measured congestion control problem. + // Underlying system has two + // dimensions - how much work we have queued, and how much work we execute + // at once. Queued work is both memory footprint, and unless each executor + // is performing complex logic, potentially concurrent work. + // Single core machines - thread anyway, they probably don't have SSDs? + // How many service points? Blocking latency due to networks and disks + // is independent of CPU: more threads will garner more throughput up + // to actual resource service capapbility. + // so: + // a) measure time around each IO op from dispatch to completion. + // b) create more threads than CPUs - 2x for now (because threadpool + // doesn't allow creating dynamically), with very shallow stacks + // (say 1MB) + // c) keep adding work while the P95? P80? of completion stays the same + // when pNN starts to increase either (i) we've saturated the system + // or (ii) other work coming in has saturated the system or (iii) this + // sort of work is a lot harder to complete. We use NN<100 to avoid + // having jitter throttle us inappropriately. We use a high NN to + // avoid making the system perform poorly for the user / other users + // on shared components. Perhaps time-to-completion should be scaled by size. + // d) if we have a lot of (iii) we should respond to it the same as (i), so + // lets reduce this to (i) and (ii). Being unable to tell the difference + // between load we created and anothers, we have to throttle back when + // the system saturates. Our most throttled position will be one service + // worker: dispatch IO, extract the next text, wait for IO completion, + // repeat. + // e) scaling up and down: TCP's lessons here are pretty good. So exponential + // up - single thread and measure. two, 4 etc. When Pnn goes bad back off + // for a time and then try again with linear increase (it could be case (ii) + // - lots of room to experiment here; working with a time based approach is important + // as that is the only way we can detect saturation: we are not facing + // loss or errors in this model. + // f) data gathering: record (name, bytes, start, duration) + // write to disk afterwards as a csv file? + + let item = Item { + full_path, + kind, + start: precise_time_s(), + finish: 0.0, + size, + result: Ok(()), + mode: entry.header().mode().ok().unwrap(), + }; + + // FUTURE: parallelise or delete (surely all distribution tars are well formed in this regard). // Create the full path to the entry if it does not exist already - if let Some(parent) = full_path.parent() { + if let Some(parent) = item.full_path.parent() { if !checked_parents.contains(parent) { checked_parents.insert(parent.to_owned()); // It would be nice to optimise this stat out, but the tar could be like so: @@ -399,11 +610,29 @@ fn unpack_without_first_dir<'a, R: Read>( } } } - entry.set_preserve_mtime(false); - entry - .unpack(&full_path) - .map(|unpacked| unpacker.handle(unpacked)) - .chain_err(|| ErrorKind::ExtractingPackage)?; + // Ensure the unpacker can accept the work we have decided to submit. + while !unpacker.ready() { + for prev_item in rx.iter().take(1) { + // TODO capture metrics, add directories to created cache + prev_item + .result + .chain_err(|| ErrorKind::ExtractingPackage)?; + } + } + unpacker.handle(item); + // drain completed results to keep memory pressure low + for prev_item in rx.try_iter() { + // TODO capture metrics, add directories to created cache + prev_item + .result + .chain_err(|| ErrorKind::ExtractingPackage)?; + } + } + unpacker.join(); + // And drain final results + for item in rx.try_iter() { + // TODO capture metrics, add directories to created cache + item.result.chain_err(|| ErrorKind::ExtractingPackage)?; } Ok(()) diff --git a/src/errors.rs b/src/errors.rs index dfd09d8d41d..3779bb6b3fc 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -24,6 +24,7 @@ error_chain! { Temp(temp::Error); Io(io::Error); Open(opener::OpenError); + Thread(std::sync::mpsc::RecvError); } errors { @@ -325,6 +326,14 @@ error_chain! { NoExeName { description("couldn't determine self executable name") } + UnsupportedKind(v: String) { + description("unsupported tar entry") + display("tar entry kind '{}' is not supported", v) + } + BadPath(v: PathBuf) { + description("bad path in tar") + display("tar path '{}' is not supported", v.display()) + } } }