forked from rust-lang/futures-rs
-
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
321 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,314 @@ | ||
use futures_core::task::{Context, Poll}; | ||
use futures_io::{AsyncWrite, IoVec}; | ||
use pin_utils::{unsafe_pinned, unsafe_unpinned}; | ||
use std::pin::Pin; | ||
use std::{fmt, io}; | ||
|
||
/// Wraps a writer and buffers its output. | ||
/// | ||
/// It can be excessively inefficient to work directly with something that | ||
/// implements [`Write`]. For example, every call to | ||
/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A | ||
/// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying | ||
/// writer in large, infrequent batches. | ||
/// | ||
/// `BufWriter` can improve the speed of programs that make *small* and | ||
/// *repeated* write calls to the same file or network socket. It does not | ||
/// help when writing very large amounts at once, or writing just one or a few | ||
/// times. It also provides no advantage when writing to a destination that is | ||
/// in memory, like a `Vec<u8>`. | ||
/// | ||
/// When the `BufWriter` is dropped, the contents of its buffer will be written | ||
/// out. However, any errors that happen in the process of flushing the buffer | ||
/// when the writer is dropped will be ignored. Code that wishes to handle such | ||
/// errors must manually call [`flush`] before the writer is dropped. | ||
/// | ||
/// # Examples | ||
/// | ||
/// Let's write the numbers one through ten to a [`TcpStream`]: | ||
/// | ||
/// ```no_run | ||
/// use std::io::prelude::*; | ||
/// use std::net::TcpStream; | ||
/// | ||
/// let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); | ||
/// | ||
/// for i in 0..10 { | ||
/// stream.write(&[i+1]).unwrap(); | ||
/// } | ||
/// ``` | ||
/// | ||
/// Because we're not buffering, we write each one in turn, incurring the | ||
/// overhead of a system call per byte written. We can fix this with a | ||
/// `BufWriter`: | ||
/// | ||
/// ```no_run | ||
/// use std::io::prelude::*; | ||
/// use std::io::BufWriter; | ||
/// use std::net::TcpStream; | ||
/// | ||
/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); | ||
/// | ||
/// for i in 0..10 { | ||
/// stream.write(&[i+1]).unwrap(); | ||
/// } | ||
/// ``` | ||
/// | ||
/// By wrapping the stream with a `BufWriter`, these ten writes are all grouped | ||
/// together by the buffer, and will all be written out in one system call when | ||
/// the `stream` is dropped. | ||
/// | ||
/// [`Write`]: ../../std/io/trait.Write.html | ||
/// [`TcpStream::write`]: ../../std/net/struct.TcpStream.html#method.write | ||
/// [`TcpStream`]: ../../std/net/struct.TcpStream.html | ||
/// [`flush`]: #method.flush | ||
pub struct BufWriter<W: AsyncWrite> { | ||
inner: Option<W>, | ||
buf: Vec<u8>, | ||
// rust-lang/rust#30888: If the inner writer panics in a call to write, we don't want to | ||
// write the buffered data a second time in BufWriter's destructor. This | ||
// flag tells the Drop impl if it should skip the flush. | ||
panicked: bool, | ||
} | ||
|
||
/* | ||
/// An error returned by `into_inner` which combines an error that | ||
/// happened while writing out the buffer, and the buffered writer object | ||
/// which may be used to recover from the condition. | ||
#[derive(Debug)] | ||
pub struct IntoInnerError<W>(W, io::Error); | ||
*/ | ||
|
||
impl<W: AsyncWrite> BufWriter<W> { | ||
unsafe_pinned!(inner: Option<W>); | ||
unsafe_pinned!(buf: Vec<u8>); | ||
unsafe_unpinned!(panicked: bool); | ||
|
||
/// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, | ||
/// but may change in the future. | ||
pub fn new(inner: W) -> Self { | ||
Self { | ||
inner: Some(inner), | ||
buf: Vec::new(), | ||
panicked: false, | ||
} | ||
} | ||
|
||
/// Creates a new `BufWriter` with the specified buffer capacity. | ||
pub fn with_capacity(cap: usize, inner: W) -> Self { | ||
Self { | ||
inner: Some(inner), | ||
buf: Vec::with_capacity(cap), | ||
panicked: false, | ||
} | ||
} | ||
|
||
fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
let Self { | ||
inner, | ||
buf, | ||
panicked, | ||
} = unsafe { Pin::get_unchecked_mut(self) }; | ||
let mut inner = unsafe { Pin::new_unchecked(inner) }; | ||
let mut buf = unsafe { Pin::new_unchecked(buf) }; | ||
|
||
let mut written = 0; | ||
let len = buf.len(); | ||
let mut ret = Ok(()); | ||
while written < len { | ||
*panicked = true; | ||
let r = ready!(inner | ||
.as_mut() | ||
.as_pin_mut() | ||
.unwrap() | ||
.poll_write(cx, &buf[written..])); | ||
*panicked = false; | ||
|
||
match r { | ||
Ok(0) => { | ||
ret = Err(io::Error::new( | ||
io::ErrorKind::WriteZero, | ||
"failed to write the buffered data", | ||
)); | ||
break; | ||
} | ||
Ok(n) => written += n, | ||
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} | ||
Err(e) => { | ||
ret = Err(e); | ||
break; | ||
} | ||
} | ||
} | ||
if written > 0 { | ||
buf.drain(..written); | ||
} | ||
Poll::Ready(ret) | ||
} | ||
|
||
/// Gets a reference to the underlying writer. | ||
pub fn get_ref(&self) -> &W { | ||
self.inner.as_ref().unwrap() | ||
} | ||
|
||
/// Gets a mutable reference to the underlying writer. | ||
/// | ||
/// It is inadvisable to directly write to the underlying writer. | ||
pub fn get_mut(&mut self) -> &mut W { | ||
self.inner.as_mut().unwrap() | ||
} | ||
|
||
/// Gets a pinned mutable reference to the underlying writer. | ||
/// | ||
/// It is inadvisable to directly write to the underlying writer. | ||
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut W> { | ||
self.inner().as_pin_mut().unwrap() | ||
} | ||
|
||
/* | ||
#[unstable(feature = "bufreader_buffer", issue = "45323")] | ||
/// Returns a reference to the internally buffered data. | ||
pub fn buffer(&self) -> &[u8] { | ||
&self.buf | ||
} | ||
*/ | ||
|
||
/* | ||
/// Unwraps this `BufWriter`, returning the underlying writer. | ||
/// | ||
/// The buffer is written out before returning the writer. | ||
/// | ||
/// # Errors | ||
/// | ||
/// An `Err` will be returned if an error occurs while flushing the buffer. | ||
pub fn into_inner(mut self) -> Result<W, IntoInnerError<BufWriter<W>>> { | ||
match self.flush_buf() { | ||
Err(e) => Err(IntoInnerError(self, e)), | ||
Ok(()) => Ok(self.inner.take().unwrap()) | ||
} | ||
} | ||
*/ | ||
} | ||
|
||
impl<W: AsyncWrite> AsyncWrite for BufWriter<W> { | ||
fn poll_write( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &[u8], | ||
) -> Poll<io::Result<usize>> { | ||
if self.buf.len() + buf.len() > self.buf.capacity() { | ||
try_ready!(self.as_mut().flush_buf(cx)); | ||
} | ||
if buf.len() >= self.buf.capacity() { | ||
*self.as_mut().panicked() = true; | ||
let r = self.as_mut().get_pin_mut().poll_write(cx, buf); | ||
*self.as_mut().panicked() = false; | ||
r | ||
} else { | ||
self.buf().poll_write(cx, buf) | ||
} | ||
} | ||
|
||
fn poll_vectored_write( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
bufs: &[&IoVec], | ||
) -> Poll<io::Result<usize>> { | ||
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); | ||
if self.buf.len() + total_len > self.buf.capacity() { | ||
try_ready!(self.as_mut().flush_buf(cx)); | ||
} | ||
if total_len >= self.buf.capacity() { | ||
*self.as_mut().panicked() = true; | ||
let r = self.as_mut().get_pin_mut().poll_vectored_write(cx, bufs); | ||
*self.as_mut().panicked() = false; | ||
r | ||
} else { | ||
self.buf().poll_vectored_write(cx, bufs) | ||
} | ||
} | ||
|
||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
try_ready!(self.as_mut().flush_buf(cx)); | ||
self.get_pin_mut().poll_flush(cx) | ||
} | ||
|
||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||
try_ready!(self.as_mut().flush_buf(cx)); | ||
self.get_pin_mut().poll_close(cx) | ||
} | ||
} | ||
|
||
impl<W: AsyncWrite> fmt::Debug for BufWriter<W> | ||
where | ||
W: fmt::Debug, | ||
{ | ||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
fmt.debug_struct("BufWriter") | ||
.field("writer", &self.inner.as_ref().unwrap()) | ||
.field("buffer", &format_args!("{}/{}", self.buf.len(), self.buf.capacity())) | ||
.finish() | ||
} | ||
} | ||
|
||
/* | ||
impl<W: AsyncWrite + Seek> Seek for BufWriter<W> { | ||
/// Seek to the offset, in bytes, in the underlying writer. | ||
/// | ||
/// Seeking always writes out the internal buffer before seeking. | ||
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { | ||
self.flush_buf().and_then(|_| self.get_mut().seek(pos)) | ||
} | ||
} | ||
*/ | ||
|
||
/* | ||
impl<W: AsyncWrite> Drop for BufWriter<W> { | ||
fn drop(&mut self) { | ||
if self.inner.is_some() && !self.panicked { | ||
if let Some(inner) = self.inner.take() { | ||
pin_mut!(inner); | ||
// dtors should not panic, so we ignore a failed flush | ||
let _r = inner.flush_buf(); | ||
} | ||
} | ||
} | ||
} | ||
*/ | ||
|
||
/* | ||
impl<W> IntoInnerError<W> { | ||
/// Returns the error which caused the call to `into_inner()` to fail. | ||
/// | ||
/// This error was returned when attempting to write the internal buffer. | ||
pub fn error(&self) -> &io::Error { | ||
&self.1 | ||
} | ||
/// Returns the buffered writer instance which generated the error. | ||
/// | ||
/// The returned object can be used for error recovery, such as | ||
/// re-inspecting the buffer. | ||
pub fn into_inner(self) -> W { | ||
self.0 | ||
} | ||
} | ||
impl<W> From<IntoInnerError<W>> for io::Error { | ||
fn from(iie: IntoInnerError<W>) -> Self { | ||
iie.1 | ||
} | ||
} | ||
impl<W: Send + fmt::Debug> error::Error for IntoInnerError<W> { | ||
fn description(&self) -> &str { | ||
error::Error::description(self.error()) | ||
} | ||
} | ||
impl<W> fmt::Display for IntoInnerError<W> { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
self.error().fmt(f) | ||
} | ||
} | ||
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters