Skip to content

Extend io::copy buffer reuse to BufReader too #112330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion library/std/src/io/buffered/bufreader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl<R: ?Sized> BufReader<R> {

/// Invalidates all data in the internal buffer.
#[inline]
fn discard_buffer(&mut self) {
pub(in crate::io) fn discard_buffer(&mut self) {
self.buf.discard_buffer()
}
}
Expand Down
110 changes: 97 additions & 13 deletions library/std/src/io/copy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use super::{BorrowedBuf, BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use super::{BorrowedBuf, BufReader, BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use crate::mem::MaybeUninit;

#[cfg(test)]
mod tests;

/// Copies the entire contents of a reader into a writer.
///
/// This function will continuously read data from `reader` and then
Expand Down Expand Up @@ -71,32 +74,113 @@ where
R: Read,
W: Write,
{
BufferedCopySpec::copy_to(reader, writer)
let read_buf = BufferedReaderSpec::buffer_size(reader);
let write_buf = BufferedWriterSpec::buffer_size(writer);

if read_buf >= DEFAULT_BUF_SIZE && read_buf >= write_buf {
return BufferedReaderSpec::copy_to(reader, writer);
}

BufferedWriterSpec::copy_from(writer, reader)
}

/// Specialization of the read-write loop that reuses the internal
/// buffer of a BufReader. If there's no buffer then the writer side
/// should be used intead.
trait BufferedReaderSpec {
fn buffer_size(&self) -> usize;

fn copy_to(&mut self, to: &mut (impl Write + ?Sized)) -> Result<u64>;
}

impl<T> BufferedReaderSpec for T
where
Self: Read,
T: ?Sized,
{
#[inline]
default fn buffer_size(&self) -> usize {
0
}

default fn copy_to(&mut self, _to: &mut (impl Write + ?Sized)) -> Result<u64> {
unimplemented!("only called from specializations");
}
}

impl<I> BufferedReaderSpec for BufReader<I>
where
Self: Read,
I: ?Sized,
{
fn buffer_size(&self) -> usize {
self.capacity()
}

fn copy_to(&mut self, to: &mut (impl Write + ?Sized)) -> Result<u64> {
let mut len = 0;

loop {
// Hack: this relies on `impl Read for BufReader` always calling fill_buf
// if the buffer is empty, even for empty slices.
// It can't be called directly here since specialization prevents us
// from adding I: Read
match self.read(&mut []) {
Comment on lines +124 to +128
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I could maybe get everything I want by adding a bunch of hidden helper functions to the Read trait and only implement them on BufReader (and provide stub default impls for everything else) but imo that would be an even bigger hack than this. And that would require combining associated type defaults with specialization which is uncharted territory.

Copy link
Member

@thomcc thomcc Jun 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is worth doing right now.

Ok(_) => {}
Err(e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
let buf = self.buffer();
if self.buffer().len() == 0 {
return Ok(len);
}

// In case the writer side is a BufWriter then its write_all
// implements an optimization that passes through large
// buffers to the underlying writer. That code path is #[cold]
// but we're still avoiding redundant memcopies when doing
// a copy between buffered inputs and outputs.
to.write_all(buf)?;
len += buf.len() as u64;
self.discard_buffer();
}
}
}

/// Specialization of the read-write loop that either uses a stack buffer
/// or reuses the internal buffer of a BufWriter
trait BufferedCopySpec: Write {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64>;
trait BufferedWriterSpec: Write {
fn buffer_size(&self) -> usize;

fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64>;
}

impl<W: Write + ?Sized> BufferedCopySpec for W {
default fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
stack_buffer_copy(reader, writer)
impl<W: Write + ?Sized> BufferedWriterSpec for W {
#[inline]
default fn buffer_size(&self) -> usize {
0
}

default fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> {
stack_buffer_copy(reader, self)
}
}

impl<I: ?Sized + Write> BufferedCopySpec for BufWriter<I> {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
if writer.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, writer);
impl<I: Write + ?Sized> BufferedWriterSpec for BufWriter<I> {
fn buffer_size(&self) -> usize {
self.capacity()
}

fn copy_from<R: Read + ?Sized>(&mut self, reader: &mut R) -> Result<u64> {
if self.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, self);
}

let mut len = 0;
let mut init = 0;

loop {
let buf = writer.buffer_mut();
let buf = self.buffer_mut();
let mut read_buf: BorrowedBuf<'_> = buf.spare_capacity_mut().into();

unsafe {
Expand Down Expand Up @@ -127,7 +211,7 @@ impl<I: ?Sized + Write> BufferedCopySpec for BufWriter<I> {
Err(e) => return Err(e),
}
} else {
writer.flush_buf()?;
self.flush_buf()?;
init = 0;
}
}
Expand Down
108 changes: 108 additions & 0 deletions library/std/src/io/copy/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use crate::cmp::{max, min};
use crate::io::*;

#[test]
fn copy_copies() {
let mut r = repeat(0).take(4);
let mut w = sink();
assert_eq!(copy(&mut r, &mut w).unwrap(), 4);

let mut r = repeat(0).take(1 << 17);
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}

struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}

impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}

struct WriteObserver {
observed_buffer: usize,
}

impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}

fn flush(&mut self) -> Result<()> {
Ok(())
}
}

#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}

#[test]
fn copy_specializes_bufreader() {
let mut source = vec![0; 768 * 1024];
source[1] = 42;
let mut buffered = BufReader::with_capacity(256 * 1024, Cursor::new(&mut source));

let mut sink = Vec::new();
assert_eq!(crate::io::copy(&mut buffered, &mut sink).unwrap(), source.len() as u64);
assert_eq!(source.as_slice(), sink.as_slice());

let buf_sz = 71 * 1024;
assert!(buf_sz > DEFAULT_BUF_SIZE, "test precondition");

let mut buffered = BufReader::with_capacity(buf_sz, Cursor::new(&mut source));
let mut sink = WriteObserver { observed_buffer: 0 };
assert_eq!(crate::io::copy(&mut buffered, &mut sink).unwrap(), source.len() as u64);
assert_eq!(
sink.observed_buffer, buf_sz,
"expected a large buffer to be provided to the writer"
);
}

#[cfg(unix)]
mod io_benches {
use crate::fs::File;
use crate::fs::OpenOptions;
use crate::io::prelude::*;
use crate::io::BufReader;

use test::Bencher;

#[bench]
fn bench_copy_buf_reader(b: &mut Bencher) {
let mut file_in = File::open("/dev/zero").expect("opening /dev/zero failed");
// use dyn to avoid specializations unrelated to readbuf
let dyn_in = &mut file_in as &mut dyn Read;
let mut reader = BufReader::with_capacity(256 * 1024, dyn_in.take(0));
let mut writer =
OpenOptions::new().write(true).open("/dev/null").expect("opening /dev/null failed");

const BYTES: u64 = 1024 * 1024;

b.bytes = BYTES;

b.iter(|| {
reader.get_mut().set_limit(BYTES);
crate::io::copy(&mut reader, &mut writer).unwrap()
});
}
}
61 changes: 1 addition & 60 deletions library/std/src/io/util/tests.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,8 @@
use crate::cmp::{max, min};
use crate::io::prelude::*;
use crate::io::{
copy, empty, repeat, sink, BorrowedBuf, BufWriter, Empty, Repeat, Result, SeekFrom, Sink,
DEFAULT_BUF_SIZE,
};
use crate::io::{empty, repeat, sink, BorrowedBuf, Empty, Repeat, SeekFrom, Sink};

use crate::mem::MaybeUninit;

#[test]
fn copy_copies() {
let mut r = repeat(0).take(4);
let mut w = sink();
assert_eq!(copy(&mut r, &mut w).unwrap(), 4);

let mut r = repeat(0).take(1 << 17);
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}

struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}

impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}

struct WriteObserver {
observed_buffer: usize,
}

impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}

fn flush(&mut self) -> Result<()> {
Ok(())
}
}

#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}

#[test]
fn sink_sinks() {
let mut s = sink();
Expand Down