Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unify read_to_end and io::copy impls for reading into a Vec #118222

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 3 additions & 66 deletions library/std/src/io/copy.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::{BorrowedBuf, BufReader, BufWriter, Read, Result, Write, DEFAULT_BUF_SIZE};
use crate::alloc::Allocator;
use crate::cmp;
use crate::cmp::min;
use crate::collections::VecDeque;
use crate::io::IoSlice;
use crate::mem::MaybeUninit;
Expand Down Expand Up @@ -256,79 +255,17 @@ impl<I: Write + ?Sized> BufferedWriterSpec for BufWriter<I> {
}
}

impl<A: Allocator> BufferedWriterSpec for Vec<u8, A> {
impl BufferedWriterSpec for Vec<u8> {
fn buffer_size(&self) -> usize {
cmp::max(DEFAULT_BUF_SIZE, self.capacity() - self.len())
}

fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> {
let mut bytes = 0;

// avoid inflating empty/small vecs before we have determined that there's anything to read
if self.capacity() < DEFAULT_BUF_SIZE {
let stack_read_limit = DEFAULT_BUF_SIZE as u64;
bytes = stack_buffer_copy(&mut reader.take(stack_read_limit), self)?;
// fewer bytes than requested -> EOF reached
if bytes < stack_read_limit {
return Ok(bytes);
}
}

// don't immediately offer the vec's whole spare capacity, otherwise
// we might have to fully initialize it if the reader doesn't have a custom read_buf() impl
let mut max_read_size = DEFAULT_BUF_SIZE;

loop {
self.reserve(DEFAULT_BUF_SIZE);
let mut initialized_spare_capacity = 0;

loop {
let buf = self.spare_capacity_mut();
let read_size = min(max_read_size, buf.len());
let mut buf = BorrowedBuf::from(&mut buf[..read_size]);
// SAFETY: init is either 0 or the init_len from the previous iteration.
unsafe {
buf.set_init(initialized_spare_capacity);
}
match reader.read_buf(buf.unfilled()) {
Ok(()) => {
let bytes_read = buf.len();

// EOF
if bytes_read == 0 {
return Ok(bytes);
}

// the reader is returning short reads but it doesn't call ensure_init()
if buf.init_len() < buf.capacity() {
max_read_size = usize::MAX;
}
// the reader hasn't returned short reads so far
if bytes_read == buf.capacity() {
max_read_size *= 2;
}

initialized_spare_capacity = buf.init_len() - bytes_read;
bytes += bytes_read as u64;
// SAFETY: BorrowedBuf guarantees all of its filled bytes are init
// and the number of read bytes can't exceed the spare capacity since
// that's what the buffer is borrowing from.
unsafe { self.set_len(self.len() + bytes_read) };

// spare capacity full, reserve more
if self.len() == self.capacity() {
break;
}
}
Err(e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
}
}
}
reader.read_to_end(self).map(|bytes| u64::try_from(bytes).expect("usize overflowed u64"))
}
}

fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>(
pub fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>(
reader: &mut R,
writer: &mut W,
) -> Result<u64> {
Expand Down
11 changes: 7 additions & 4 deletions library/std/src/io/copy/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ fn copy_specializes_bufreader() {

#[test]
fn copy_specializes_to_vec() {
let cap = 123456;
let mut source = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let cap = DEFAULT_BUF_SIZE * 10;
let mut source = ShortReader { cap, observed_buffer: 0, read_size: DEFAULT_BUF_SIZE };
let mut sink = Vec::new();
assert_eq!(cap as u64, io::copy(&mut source, &mut sink).unwrap());
let copied = io::copy(&mut source, &mut sink).unwrap();
assert_eq!(cap as u64, copied);
assert_eq!(sink.len() as u64, copied);
assert!(
source.observed_buffer > DEFAULT_BUF_SIZE,
"expected a large buffer to be provided to the reader"
"expected a large buffer to be provided to the reader, got {}",
source.observed_buffer
);
}

Expand Down
118 changes: 81 additions & 37 deletions library/std/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,16 @@ where
}
}

// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than a default reservation size of 32 if the
// reader has a very small amount of data to return.
// Here we must serve many masters with conflicting goals:
//
// - avoid allocating unless necessary
// - avoid overallocating if we know the exact size (#89165)
// - avoid passing large buffers to readers that always initialize the free capacity if they perform short reads (#23815, #23820)
// - pass large buffers to readers that do not initialize the spare capacity. this can amortize per-call overheads
// - and finally pass not-too-small and not-too-large buffers to Windows read APIs because they manage to suffer from both problems
// at the same time, i.e. small reads suffer from syscall overhead, all reads incur initialization cost
// proportional to buffer size (#110650)
//
pub(crate) fn default_read_to_end<R: Read + ?Sized>(
r: &mut R,
buf: &mut Vec<u8>,
Expand All @@ -412,20 +416,58 @@ pub(crate) fn default_read_to_end<R: Read + ?Sized>(
let start_cap = buf.capacity();
// Optionally limit the maximum bytes read on each iteration.
// This adds an arbitrary fiddle factor to allow for more data than we expect.
let max_read_size =
size_hint.and_then(|s| s.checked_add(1024)?.checked_next_multiple_of(DEFAULT_BUF_SIZE));
let mut max_read_size = size_hint
.and_then(|s| s.checked_add(1024)?.checked_next_multiple_of(DEFAULT_BUF_SIZE))
.unwrap_or(DEFAULT_BUF_SIZE);

let mut initialized = 0; // Extra initialized bytes from previous loop iteration

const PROBE_SIZE: usize = 32;

fn small_probe_read<R: Read + ?Sized>(r: &mut R, buf: &mut Vec<u8>) -> Result<usize> {
let mut probe = [0u8; PROBE_SIZE];

loop {
match r.read(&mut probe) {
Ok(n) => {
buf.extend_from_slice(&probe[..n]);
return Ok(n);
}
Err(ref e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
}
}
}

// avoid inflating empty/small vecs before we have determined that there's anything to read
if (size_hint.is_none() || size_hint == Some(0)) && buf.capacity() - buf.len() < PROBE_SIZE {
let read = small_probe_read(r, buf)?;

if read == 0 {
return Ok(0);
}
}

loop {
if buf.len() == buf.capacity() && buf.capacity() == start_cap {
// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary doubling of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
let read = small_probe_read(r, buf)?;

if read == 0 {
return Ok(buf.len() - start_len);
}
}

if buf.len() == buf.capacity() {
buf.reserve(32); // buf is full, need more space
buf.reserve(PROBE_SIZE); // buf is full, need more space
}

let mut spare = buf.spare_capacity_mut();
if let Some(size) = max_read_size {
let len = cmp::min(spare.len(), size);
spare = &mut spare[..len]
}
let buf_len = cmp::min(spare.len(), max_read_size);
spare = &mut spare[..buf_len];
let mut read_buf: BorrowedBuf<'_> = spare.into();

// SAFETY: These bytes were initialized but not filled in the previous loop
Expand All @@ -434,42 +476,44 @@ pub(crate) fn default_read_to_end<R: Read + ?Sized>(
}

let mut cursor = read_buf.unfilled();
match r.read_buf(cursor.reborrow()) {
Ok(()) => {}
Err(e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
loop {
match r.read_buf(cursor.reborrow()) {
Ok(()) => break,
Err(e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
}
}

if cursor.written() == 0 {
let unfilled_but_initialized = cursor.init_ref().len();
let bytes_read = cursor.written();
let was_fully_initialized = read_buf.init_len() == buf_len;

if bytes_read == 0 {
return Ok(buf.len() - start_len);
}

// store how much was initialized but not filled
initialized = cursor.init_ref().len();
initialized = unfilled_but_initialized;

// SAFETY: BorrowedBuf's invariants mean this much memory is initialized.
unsafe {
let new_len = read_buf.filled().len() + buf.len();
let new_len = bytes_read + buf.len();
buf.set_len(new_len);
}

if buf.len() == buf.capacity() && buf.capacity() == start_cap {
// The buffer might be an exact fit. Let's read into a probe buffer
// and see if it returns `Ok(0)`. If so, we've avoided an
// unnecessary doubling of the capacity. But if not, append the
// probe buffer to the primary buffer and let its capacity grow.
let mut probe = [0u8; 32];

loop {
match r.read(&mut probe) {
Ok(0) => return Ok(buf.len() - start_len),
Ok(n) => {
buf.extend_from_slice(&probe[..n]);
break;
}
Err(ref e) if e.is_interrupted() => continue,
Err(e) => return Err(e),
}
// Use heuristics to determine the max read size if no initial size hint was provided
if size_hint.is_none() {
// The reader is returning short reads but it doesn't call ensure_init().
// In that case we no longer need to restrict read sizes to avoid
// initialization costs.
if !was_fully_initialized {
max_read_size = usize::MAX;
}

// we have passed a larger buffer than previously and the
// reader still hasn't returned a short read
if buf_len >= max_read_size && bytes_read == buf_len {
max_read_size = max_read_size.saturating_mul(2);
}
}
}
Expand Down