diff --git a/library/std/src/io/buffered/bufwriter.rs b/library/std/src/io/buffered/bufwriter.rs index 3ec272fea6668..e03d2eb8baf7a 100644 --- a/library/std/src/io/buffered/bufwriter.rs +++ b/library/std/src/io/buffered/bufwriter.rs @@ -2,6 +2,34 @@ use crate::fmt; use crate::io::{ self, Error, ErrorKind, IntoInnerError, IoSlice, Seek, SeekFrom, Write, DEFAULT_BUF_SIZE, }; +use crate::iter::FusedIterator; + +/// Helper macro for a common write pattern. Write a buffer using the given +/// function call, then use the returned usize to get the unwritten tail of +/// the buffer. +/// +/// Example: +/// +/// ``` +/// // Use a ? for an i/o operation +/// let tail = tail!(self.flush_buf_vectored(buf)?); +/// +/// // omit the ? for an infallible operation +/// let tail = tail!(self.write_to_buffer(buf)); +/// ``` +macro_rules! tail { + ($($write:ident).+ ($buf:expr)) => {{ + let buf = $buf; + let written = $($write).+ (buf); + &buf[written..] + }}; + + ($($write:ident).+ ($buf:expr) ? ) => {{ + let buf = $buf; + let written = $($write).+ (buf)?; + &buf[written..] + }}; +} /// Wraps a writer and buffers its output. /// @@ -72,6 +100,51 @@ pub struct BufWriter { panicked: bool, } +/// Helper struct for BufWriter::flush_buf to ensure the buffer is updated +/// after all the writes are complete. It tracks the number of written bytes +/// and drains them all from the front of the buffer when dropped. +struct BufGuard<'a> { + buffer: &'a mut Vec, + written: usize, +} + +impl<'a> BufGuard<'a> { + fn new(buffer: &'a mut Vec) -> Self { + Self { buffer, written: 0 } + } + + /// The unwritten part of the buffer + fn remaining(&self) -> &[u8] { + &self.buffer[self.written..] + } + + /// Flag some bytes as removed from the front of the buffer + fn consume(&mut self, amt: usize) { + self.written += amt; + } + + /// true if all of the bytes have been written + fn done(&self) -> bool { + self.written >= self.buffer.len() + } + + /// Used in vectored flush mode; reports how many *extra* bytes after + /// `buffer` (ie, new bytes from the caller) were written + fn extra_written(&self) -> Option { + self.written.checked_sub(self.buffer.len()) + } +} + +impl Drop for BufGuard<'_> { + fn drop(&mut self) { + if self.written >= self.buffer.len() { + self.buffer.clear(); + } else if self.written > 0 { + self.buffer.drain(..self.written); + } + } +} + impl BufWriter { /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, /// but may change in the future. @@ -115,45 +188,9 @@ impl BufWriter { /// `write`), any 0-length writes from `inner` must be reported as i/o /// errors from this method. pub(super) fn flush_buf(&mut self) -> io::Result<()> { - /// Helper struct to ensure the buffer is updated after all the writes - /// are complete. It tracks the number of written bytes and drains them - /// all from the front of the buffer when dropped. - struct BufGuard<'a> { - buffer: &'a mut Vec, - written: usize, - } - - impl<'a> BufGuard<'a> { - fn new(buffer: &'a mut Vec) -> Self { - Self { buffer, written: 0 } - } - - /// The unwritten part of the buffer - fn remaining(&self) -> &[u8] { - &self.buffer[self.written..] - } - - /// Flag some bytes as removed from the front of the buffer - fn consume(&mut self, amt: usize) { - self.written += amt; - } - - /// true if all of the bytes have been written - fn done(&self) -> bool { - self.written >= self.buffer.len() - } - } - - impl Drop for BufGuard<'_> { - fn drop(&mut self) { - if self.written > 0 { - self.buffer.drain(..self.written); - } - } - } - let mut guard = BufGuard::new(&mut self.buf); let inner = self.inner.as_mut().unwrap(); + while !guard.done() { self.panicked = true; let r = inner.write(guard.remaining()); @@ -174,6 +211,47 @@ impl BufWriter { Ok(()) } + /// Same as flush_buf, but uses vector operations to attempt to *also* + /// flush an incoming buffer. The returned usize is the number of bytes + /// successfully written from the *new* buf. This method will loop until + /// the entire *current* buffer is flushed, even if that means 0 bytes + /// from the new buffer were written. + pub(super) fn flush_buf_vectored(&mut self, buf: &[u8]) -> io::Result { + let inner = self.inner.as_mut().unwrap(); + + if !inner.is_write_vectored() { + self.flush_buf()?; + return Ok(0); + } + + let mut guard = BufGuard::new(&mut self.buf); + + // Continue looping only as long as there is unwritten content in self.buf + loop { + match guard.extra_written() { + None => { + let buffers = [IoSlice::new(guard.remaining()), IoSlice::new(buf)]; + self.panicked = true; + let r = inner.write_vectored(&buffers); + self.panicked = false; + + match r { + Ok(0) => { + return Err(Error::new( + ErrorKind::WriteZero, + "failed to write the buffered data", + )); + } + Ok(n) => guard.consume(n), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Some(extra) => return Ok(extra), + } + } + } + /// Buffer some data without flushing it, regardless of the size of the /// data. Writes as much as possible without exceeding capacity. Returns /// the number of bytes written. @@ -260,6 +338,11 @@ impl BufWriter { self.buf.capacity() } + /// Returns the unused buffer capacity. + fn available(&self) -> usize { + self.capacity() - self.buf.len() + } + /// Unwraps this `BufWriter`, returning the underlying writer. /// /// The buffer is written out before returning the writer. @@ -291,64 +374,153 @@ impl BufWriter { #[stable(feature = "rust1", since = "1.0.0")] impl Write for BufWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - if self.buf.len() + buf.len() > self.buf.capacity() { - self.flush_buf()?; - } - // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 - if buf.len() >= self.buf.capacity() { - self.panicked = true; - let r = self.get_mut().write(buf); - self.panicked = false; - r + // We assume that callers of `write` prefer to avoid split writes where + // possible, so if the incoming buf doesn't fit in remaining available + // buffer, we pre-flush rather than doing a partial write to fill it. + // + // During the pre-flush, though, we attempt a vectored write of both + // the buffered bytes and the new bytes. In the worst case, this will + // be the same as a typical pre-flush, since by default vectored + // writes just do a normal write of the first buffer. In the best case, + // we were able to do some additional writing during a single syscall. + let written = match buf.len() > self.available() { + true => self.flush_buf_vectored(buf)?, + false => 0, + }; + let tail = &buf[written..]; + + // If the incoming buf doesn't fit in our buffer, even after we flushed + // it to make room, we should forward it directly (via inner.write). + // However, if the vectored flush successfully wrote some of `buf`, + // we're now obligated to return Ok(..) before trying any more + // fallible i/o operations. + let tail_written = if tail.len() < self.capacity() { + self.write_to_buf(tail) + } else if written > 0 { + 0 } else { - self.buf.extend_from_slice(buf); - Ok(buf.len()) - } + // It's guaranteed at this point that the buffer is empty, because + // if wasn't, it would have been flushed earlier in this function. + self.get_mut().write(tail)? + }; + + Ok(written + tail_written) } fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { - // Normally, `write_all` just calls `write` in a loop. We can do better - // by calling `self.get_mut().write_all()` directly, which avoids - // round trips through the buffer in the event of a series of partial - // writes in some circumstances. - if self.buf.len() + buf.len() > self.buf.capacity() { - self.flush_buf()?; + // Unlike with `write`, we assume that a caller of `write_all` is + // interested in minimizing system calls even if the buffer is split. + // This method tries to fill up the buffer as much as possible before + // flushing, whereas `write` prefers not split incoming bufs. + + // Bypass the buffer if the the incoming write is larger than the + // whole buffer. Use a vectored write to attempt to write the new + // data and the existing buffer in a single operation + let buf = match buf.len() >= self.capacity() { + true => match tail!(self.flush_buf_vectored(buf)?) { + // If the vectored write flushed everything at once, we're done! + [] => return Ok(()), + + // If what's left after the vector flush is *still* larger than + // the buffer, bypass the buffer and forward it directly + tail if tail.len() >= self.capacity() => return self.get_mut().write_all(tail), + + // Otherwise, we're going to buffer whatever's left of the user input + tail => tail, + }, + false => buf, + }; + + // In order to reduce net writes in aggregate, we buffer as much as + // possible, then forward, then buffer the rest + let buf = tail!(self.write_to_buf(buf)); + if !buf.is_empty() { + let buf = tail!(self.flush_buf_vectored(buf)?); + + // At this point, because we know that buf.len() < self.buf.len(), + // and that the buffer has been flushed we know that this will + // succeed in totality + self.write_to_buf(buf); } - // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 - if buf.len() >= self.buf.capacity() { - self.panicked = true; - let r = self.get_mut().write_all(buf); - self.panicked = false; - r - } else { - self.buf.extend_from_slice(buf); - Ok(()) + + // If, at this point, the buffer is full, we may as well eagerly + // attempt to flush, so that the next write will have an empty + // buffer. + if self.available() == 0 { + self.flush_buf()?; } + + Ok(()) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { - let total_len = bufs.iter().map(|b| b.len()).sum::(); - if self.buf.len() + total_len > self.buf.capacity() { - self.flush_buf()?; - } - // FIXME: Why no len > capacity? Why not buffer len == capacity? #72919 - if total_len >= self.buf.capacity() { - self.panicked = true; - let r = self.get_mut().write_vectored(bufs); - self.panicked = false; - r + if let Some(buf) = only_one(bufs, |b| !b.is_empty()) { + // If there's exactly 1 incoming buffer, `Self::write` can make + // use of self.inner.write_vectored to attempt to combine flushing + // the existing buffer with writing the new one. + self.write(buf) + } else if self.get_ref().is_write_vectored() { + let total_len: usize = bufs.iter().map(|buf| buf.len()).sum(); + + if total_len > self.available() { + self.flush_buf()?; + } + + if total_len >= self.capacity() { + self.get_mut().write_vectored(bufs) + } else { + // Correctness note: we've already verified that none of these + // will overflow the buffer, because total_len < capacity + bufs.iter().for_each(|buf| self.buf.extend_from_slice(buf)); + Ok(total_len) + } } else { - bufs.iter().for_each(|b| self.buf.extend_from_slice(b)); - Ok(total_len) + // Because the inner writer doesn't have native vectored write + // support, we should take care of buffering together the individual + // incoming bufs, even if the *total* length is larger than our + // buffer. We only want to skip our buffer if an *individual* write + // exceeds our buffer capacity. + let mut total_buffered = 0; + + for buf in bufs { + if total_buffered == 0 { + if buf.len() > self.available() { + // If an individual write would overflow our remaining + // capacity and we haven't buffered anything yet, + // pre-flush before buffering (same as with regular + // write()). + self.flush_buf()?; + } + + if buf.len() >= self.capacity() && self.buf.is_empty() { + // If an individual buffer exceeds our *total* capacity + // and we haven't buffered anything yet, just forward + // it to the underlying device + return self.get_mut().write(buf); + } + } + + // Buffer as much as possible until we reach full capacity. + // Once we've buffered at least 1 byte, we're obligated to + // return Ok(..) before attempting any fallible i/o operations, + // so once the buffer is full we immediately return. + total_buffered += self.write_to_buf(buf); + if self.available() == 0 { + break; + } + } + + Ok(total_buffered) } } fn is_write_vectored(&self) -> bool { - self.get_ref().is_write_vectored() + true } fn flush(&mut self) -> io::Result<()> { - self.flush_buf().and_then(|()| self.get_mut().flush()) + self.flush_buf()?; + self.get_mut().flush() } } @@ -359,7 +531,7 @@ where { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("BufWriter") - .field("writer", &self.inner.as_ref().unwrap()) + .field("writer", self.get_ref()) .field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity())) .finish() } @@ -385,3 +557,22 @@ impl Drop for BufWriter { } } } + +/// Similar to iter.find, this method searches an iterator for an item +/// matching a predicate, but returns it only if it is the *only* item +/// matching that predicate. Used to check if there is exactly one non-empty +/// buffer in a list input to write_vectored. +/// +/// FIXME: delete this function and replace it with slice::trim if that becomes +/// a thing (https://github.com/rust-lang/rfcs/issues/2547) +fn only_one(iter: I, filter: impl Fn(&I::Item) -> bool) -> Option +where + I: IntoIterator, + I::IntoIter: FusedIterator, +{ + let mut iter = iter.into_iter().filter(filter); + match (iter.next(), iter.next()) { + (Some(item), None) => Some(item), + _ => None, + } +} diff --git a/library/std/src/io/buffered/linewritershim.rs b/library/std/src/io/buffered/linewritershim.rs index a80d08db8692e..2c7819d1b04ed 100644 --- a/library/std/src/io/buffered/linewritershim.rs +++ b/library/std/src/io/buffered/linewritershim.rs @@ -20,6 +20,13 @@ impl<'a, W: Write> LineWriterShim<'a, W> { Self { buffer } } + /// Get a reference to the inner writer (that is, the writer wrapped by + /// the BufWriter). Be careful with this writer, as writes to it will + /// bypass the buffer. + fn inner_ref(&self) -> &W { + self.buffer.get_ref() + } + /// Get a mutable reference to the inner writer (that is, the writer /// wrapped by the BufWriter). Be careful with this writer, as writes to /// it will bypass the buffer. @@ -71,25 +78,29 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { Some(newline_idx) => newline_idx + 1, }; + // This is what we're going to try to write directly to the inner + // writer. The rest will be buffered, if nothing goes wrong. + let lines = &buf[..newline_idx]; + // Flush existing content to prepare for our write. We have to do this // before attempting to write `buf` in order to maintain consistency; // if we add `buf` to the buffer then try to flush it all at once, // we're obligated to return Ok(), which would mean suppressing any // errors that occur during flush. - self.buffer.flush_buf()?; - - // This is what we're going to try to write directly to the inner - // writer. The rest will be buffered, if nothing goes wrong. - let lines = &buf[..newline_idx]; - - // Write `lines` directly to the inner writer. In keeping with the - // `write` convention, make at most one attempt to add new (unbuffered) - // data. Because this write doesn't touch the BufWriter state directly, - // and the buffer is known to be empty, we don't need to worry about - // self.buffer.panicked here. - let flushed = self.inner_mut().write(lines)?; + // + // We can, however, use a vectored write to attempt to write the lines + // at the same time as the buffer. + let flushed = match self.buffer.flush_buf_vectored(lines)? { + // Write `lines` directly to the inner writer. In keeping with the + // `write` convention, make at most one attempt to add new (unbuffered) + // data. Because this write doesn't touch the BufWriter state directly, + // and the buffer is known to be empty, we don't need to worry about + // self.buffer.panicked here. + 0 => self.inner_mut().write(lines)?, + flushed => flushed, + }; - // If buffer returns Ok(0), propagate that to the caller without + // If the write returns Ok(0), propagate that to the caller without // doing additional buffering; otherwise we're just guaranteeing // an "ErrorKind::WriteZero" later. if flushed == 0 { @@ -159,9 +170,16 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { /// get the benefits of more granular partial-line handling without losing /// anything in efficiency fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + // FIXME: BufWriter recently received some optimized handling of + // vectored writes; update this method to take advantage of those + // updates. In particular, BufWriter::is_write_vectored is always true, + // because BufWriter::write_vectored takes special care to buffer + // together the incoming sub-buffers when !W::is_write_vectored, while + // still using W::write_vectored when it is. + // If there's no specialized behavior for write_vectored, just use // write. This has the benefit of more granular partial-line handling. - if !self.is_write_vectored() { + if !self.inner_ref().is_write_vectored() { return match bufs.iter().find(|buf| !buf.is_empty()) { Some(buf) => self.write(buf), None => Ok(0), @@ -178,7 +196,6 @@ impl<'a, W: Write> Write for LineWriterShim<'a, W> { // If there are no new newlines (that is, if this write is less than // one line), just do a regular buffered write let last_newline_buf_idx = match last_newline_buf_idx { - // No newlines; just do a normal buffered write None => { self.flush_if_completed_line()?; return self.buffer.write_vectored(bufs); diff --git a/library/std/src/io/buffered/tests.rs b/library/std/src/io/buffered/tests.rs index 66a64f667baa4..a39e425ace4e2 100644 --- a/library/std/src/io/buffered/tests.rs +++ b/library/std/src/io/buffered/tests.rs @@ -9,12 +9,15 @@ pub struct ShortReader { lengths: Vec, } -// FIXME: rustfmt and tidy disagree about the correct formatting of this -// function. This leads to issues for users with editors configured to -// rustfmt-on-save. impl Read for ShortReader { fn read(&mut self, _: &mut [u8]) -> io::Result { - if self.lengths.is_empty() { Ok(0) } else { Ok(self.lengths.remove(0)) } + // Note for developers: if your editor is fighting you about the + // correct rustfmt here, make sure you're using nightly + if self.lengths.is_empty() { + Ok(0) + } else { + Ok(self.lengths.remove(0)) + } } } @@ -243,8 +246,11 @@ fn test_buffered_reader_seek_underflow_discard_buffer_between_seeks() { assert_eq!(reader.buffer().len(), 0); } +/// Basic tests of BufWriter when the wrapped writer supports vectored writes. +/// BufWriter will use vectored writes to attempt to combine buffer flushes +/// with incoming writes. #[test] -fn test_buffered_writer() { +fn test_buffered_writer_inner_vectored() { let inner = Vec::new(); let mut writer = BufWriter::with_capacity(2, inner); @@ -270,8 +276,8 @@ fn test_buffered_writer() { assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); writer.write(&[6]).unwrap(); - assert_eq!(writer.buffer(), [6]); - assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6]); writer.write(&[7, 8]).unwrap(); assert_eq!(writer.buffer(), []); @@ -286,6 +292,141 @@ fn test_buffered_writer() { assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); } +#[test] +fn test_buffered_writer_write() { + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + writer.write(b"ab").unwrap(); + assert_eq!(writer.get_ref(), b""); + assert_eq!(writer.buffer(), b"ab"); + + // `write` prefers to avoid split writes + writer.write(b"cde").unwrap(); + assert_eq!(writer.get_ref(), b"ab"); + assert_eq!(writer.buffer(), b"cde"); + + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // `write` skips the buffer if the write is >= capacity + writer.write(b"abcd").unwrap(); + assert_eq!(writer.get_ref(), b"abcd"); + assert_eq!(writer.buffer(), b""); + + writer.write(b"efghi").unwrap(); + assert_eq!(writer.get_ref(), b"abcdefghi"); + assert_eq!(writer.buffer(), b""); + + // `write` fills the buffer + writer.write(b"jk").unwrap(); + writer.write(b"lm").unwrap(); + assert_eq!(writer.get_ref(), b"abcdefghi"); + assert_eq!(writer.buffer(), b"jklm"); +} + +#[test] +fn test_buffered_writer_write_all_unvectored() { + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // write_all will perform split writes as necessary such that the minimal + // number of writes are performed on inner. Additionally, it will eagerly + // flush if a write_all fully fills the buffer + writer.write_all(b"abc").unwrap(); + writer.write_all(b"abc").unwrap(); + + assert_eq!(writer.buffer(), b"bc"); + assert_eq!(writer.get_ref(), b"abca"); + + writer.write_all(b"abc").unwrap(); + writer.write_all(b"abc").unwrap(); + + assert_eq!(writer.buffer(), b""); + assert_eq!(writer.get_ref(), b"abcabcabcabc"); + assert_eq!(writer.get_ref().write_count, 3); +} + +#[test] +fn test_buffered_writer_write_all_vectored() { + let inner = ProgrammableSink { enable_vectored: true, ..ProgrammableSink::default() }; + let mut writer = BufWriter::with_capacity(10, inner); + + writer.write_all(b"abc").unwrap(); + writer.write_all(b"abc").unwrap(); + writer.write_all(b"abc").unwrap(); + + assert_eq!(writer.buffer(), b"abcabcabc"); + assert_eq!(writer.get_ref(), b""); + assert_eq!(writer.get_ref().write_count, 0); + + // This should be used to fill the buffer, but then the whole thing should + // be sent as a single vectored write + writer.write_all(b"aaaaaa").unwrap(); + + assert_eq!(writer.buffer(), b""); + assert_eq!(writer.get_ref(), b"abcabcabcaaaaaa"); + assert_eq!(writer.get_ref().write_count, 1); +} + +#[test] +fn test_buffered_writer_write_vectored() { + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // A vectored write is buffered, even if the total size is larger than the + // buffer + let input = + [IoSlice::new(b"aa"), IoSlice::new(b"bb"), IoSlice::new(b"cc"), IoSlice::new(b"dd")]; + + assert_eq!(writer.write_vectored(&input).unwrap(), 4); + assert_eq!(writer.buffer(), b"aabb"); + assert_eq!(writer.get_ref(), b""); + + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // If the first encountered buffer is large, it is forwarded directly + let input = [IoSlice::new(b""), IoSlice::new(b"abcdefg")]; + + assert_eq!(writer.write_vectored(&input).unwrap(), 7); + assert_eq!(writer.buffer(), b""); + assert_eq!(writer.get_ref(), b"abcdefg"); + assert_eq!(writer.get_ref().write_count, 1); + + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(4, inner); + + // if a subsequent encountered buffer is large, it is buffered (because of + // the infallibility requirement) + let input = [IoSlice::new(b"a"), IoSlice::new(b"bcdefg")]; + + assert_eq!(writer.write_vectored(&input).unwrap(), 4); + assert_eq!(writer.buffer(), b"abcd"); + assert_eq!(writer.get_ref(), b""); + assert_eq!(writer.get_ref().write_count, 0); +} + +// An old implementation of BufWriter had a bug when deciding whether to +// forward a write directly (skipping the buffer) where it would forward even +// if there was buffered data from a previous write. This is a regression test +// for that bug. +#[test] +fn test_buffered_writer_vectored_corner_case() -> io::Result<()> { + let inner = ProgrammableSink::default(); + let mut writer = BufWriter::with_capacity(10, inner); + + assert_eq!(writer.write(b"aaaa")?, 4); + + let input = [IoSlice::new(b"bbbbbb"), IoSlice::new(b"cccccc")]; + + writer.write_vectored(&input)?; + writer.flush()?; + assert_eq!(&writer.get_ref().buffer[..10], b"aaaabbbbbb"); + + Ok(()) +} + #[test] fn test_buffered_writer_inner_flushes() { let mut w = BufWriter::with_capacity(3, Vec::new()); @@ -451,31 +592,38 @@ fn bench_buffered_writer(b: &mut test::Bencher) { /// A simple `Write` target, designed to be wrapped by `LineWriter` / /// `BufWriter` / etc, that can have its `write` & `flush` behavior /// configured -#[derive(Default, Clone)] +#[derive(Default, Debug, Clone)] struct ProgrammableSink { - // Writes append to this slice + /// Writes append to this slice pub buffer: Vec, - // Flush sets this flag + /// Flush sets this flag pub flushed: bool, - // If true, writes will always be an error + /// Each `write` call increments this + pub write_count: usize, + + /// If true, writes will always be an error pub always_write_error: bool, - // If true, flushes will always be an error + /// If true, flushes will always be an error pub always_flush_error: bool, - // If set, only up to this number of bytes will be written in a single - // call to `write` + /// If set, only up to this number of bytes will be written in a single + /// call to `write` pub accept_prefix: Option, - // If set, counts down with each write, and writes return an error - // when it hits 0 + /// If set, counts down with each write, and writes return an error + /// when it hits 0 pub max_writes: Option, - // If set, attempting to write when max_writes == Some(0) will be an - // error; otherwise, it will return Ok(0). + /// If set, attempting to write when max_writes == Some(0) will be an + /// error; otherwise, it will return Ok(0). pub error_after_max_writes: bool, + + /// If set, vectored writes are enabled. All of the above configuration + /// will apply to each write_vectored call as though it was a single write. + pub enable_vectored: bool, } impl Write for ProgrammableSink { @@ -500,6 +648,7 @@ impl Write for ProgrammableSink { let data = &data[..len]; self.buffer.extend_from_slice(data); + self.write_count += 1; Ok(len) } @@ -512,6 +661,73 @@ impl Write for ProgrammableSink { Ok(()) } } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { + if !self.enable_vectored { + // If we're not vectored, use the default behavior, which is to + // write the first non-empty buf with write. + return match bufs.iter().find(|b| !b.is_empty()) { + Some(buf) => self.write(buf), + None => Ok(0), + }; + } + + if self.always_write_error { + return Err(io::Error::new(io::ErrorKind::Other, "test - always_write_error")); + } + + match self.max_writes { + Some(0) if self.error_after_max_writes => { + return Err(io::Error::new(io::ErrorKind::Other, "test - max_writes")); + } + Some(0) => return Ok(0), + Some(ref mut count) => *count -= 1, + None => {} + } + + let total_written = match self.accept_prefix { + None => bufs.iter().fold(0, |len, buf| { + self.buffer.extend_from_slice(buf); + len + buf.len() + }), + Some(mut len) => { + let mut written = 0; + + for buf in bufs { + if len == 0 { + break; + } + + let buf = &buf[..buf.len().min(len)]; + self.buffer.extend_from_slice(buf); + written += buf.len(); + len -= buf.len(); + } + + written + } + }; + + self.write_count += 1; + Ok(total_written) + } + + fn is_write_vectored(&self) -> bool { + self.enable_vectored + } +} + +/// PartialEq allows for easy comparison of the contents of a ProgrammableSink +impl PartialEq<[u8]> for ProgrammableSink { + fn eq(&self, other: &[u8]) -> bool { + self.buffer == other + } +} + +impl PartialEq<[u8; N]> for ProgrammableSink { + fn eq(&self, other: &[u8; N]) -> bool { + self.buffer == other + } } /// Previously the `LineWriter` could successfully write some bytes but diff --git a/library/std/src/lib.rs b/library/std/src/lib.rs index 96a7755c68821..fcc0371d4bba6 100644 --- a/library/std/src/lib.rs +++ b/library/std/src/lib.rs @@ -279,6 +279,7 @@ #![feature(maybe_uninit_extra)] #![feature(maybe_uninit_ref)] #![feature(maybe_uninit_slice)] +#![feature(min_const_generics)] #![feature(min_specialization)] #![feature(needs_panic_runtime)] #![feature(negative_impls)]