Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add futures-io #780

Merged
merged 2 commits into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"futures-core",
"futures-channel",
"futures-executor",
"futures-io",
"futures-util",
"futures-sink",
]
1 change: 1 addition & 0 deletions LICENSE-MIT
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Copyright (c) 2016 Alex Crichton
Copyright (c) 2017 The Tokio Authors

Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
Expand Down
6 changes: 3 additions & 3 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ impl<T> Sender<T> {
}

// The channel has capacity to accept the message, so send it
self.do_send(Some(msg), None)
self.do_send(None, Some(msg))
.map_err(|ChannelClosed(v)| {
TryChannelClosed {
kind: TryChannelClosedKind::Disconnected(v.unwrap()),
Expand All @@ -419,12 +419,12 @@ impl<T> Sender<T> {
/// This function should only be called after `poll_ready` has responded
/// that the channel is ready to receive a message.
pub fn start_send(&mut self, msg: T) -> Result<(), ChannelClosed<T>> {
self.do_send(Some(msg), None)
self.do_send(None, Some(msg))
}

// Do the send without failing
// None means close
fn do_send(&mut self, msg: Option<T>, cx: Option<&mut task::Context>)
fn do_send(&mut self, cx: Option<&mut task::Context>, msg: Option<T>)
-> Result<(), ChannelClosed<T>>
{
// First, increment the number of messages contained by the channel.
Expand Down
19 changes: 19 additions & 0 deletions futures-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "futures-io"
version = "0.2.0"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT/Apache-2.0"
repository = "https://github.com/alexcrichton/futures-rs"
homepage = "https://github.com/alexcrichton/futures-rs"
documentation = "https://docs.rs/futures-io"
description = """
The `AsyncRead` and `AsyncWrite` traits for the futures-rs library.
"""

[features]
std = ["futures-core/std", "iovec"]
default = ["std"]

[dependencies]
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
iovec = { version = "0.1", optional = true }
305 changes: 305 additions & 0 deletions futures-io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
//! Asynchronous IO
//!
//! This crate contains the `AsyncRead` and `AsyncWrite` traits which allow
//! data to be read and written asynchronously.

#![no_std]
#![deny(missing_docs, missing_debug_implementations)]
#![doc(html_root_url = "https://docs.rs/futures-io/0.2")]

macro_rules! if_std {
($($i:item)*) => ($(
#[cfg(feature = "std")]
$i
)*)
}

if_std! {
extern crate futures_core;
extern crate iovec;
extern crate std;

use futures_core::{Async, Poll, task};
use std::boxed::Box;
use std::io as StdIo;
use std::ptr;
use std::vec::Vec;

// Re-export IoVec for convenience
pub use iovec::IoVec;

// Re-export io::Error so that users don't have to deal
// with conflicts when `use`ing `futures::io` and `std::io`.
pub use StdIo::Error as Error;

/// A type used to conditionally initialize buffers passed to `AsyncRead`
/// methods.
#[derive(Debug)]
pub struct Initializer(bool);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the std Read::initializer tracking issue, it seems there is doubt that this is the correct API. In std, the unstable attribute protects them.

It seems odd to be using that API here if it's thought to be inferior, especially since we can't rely on unstable to protect against breaking changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's true that some people have doubts about the API. I personally think that it's the best-designed interface for this that I've seen, and it's the only one I've seen that is sound in the presence of rust-lang/rfcs#2316.


impl Initializer {
/// Returns a new `Initializer` which will zero out buffers.
#[inline]
pub fn zeroing() -> Initializer {
Initializer(true)
}

/// Returns a new `Initializer` which will not zero out buffers.
///
/// # Safety
///
/// This method may only be called by `AsyncRead`ers which guarantee
/// that they will not read from the buffers passed to `AsyncRead`
/// methods, and that the return value of the method accurately reflects
/// the number of bytes that have been written to the head of the buffer.
#[inline]
pub unsafe fn nop() -> Initializer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is nop? Why not use a descriptive name, like the alternate constructor, zeroing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done to match the current (unstable) std API. When that API is released, this Iniitalizer will be changed to a type alias to that one: https://doc.rust-lang.org/std/io/struct.Initializer.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name nop seems unfortunate. Regardless of the previous naming choice of an unstable std API, it seems better to pick a better name in a new library.

Initializer(false)
}

/// Indicates if a buffer should be initialized.
#[inline]
pub fn should_initialize(&self) -> bool {
self.0
}

/// Initializes a buffer if necessary.
#[inline]
pub fn initialize(&self, buf: &mut [u8]) {
if self.should_initialize() {
unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
}
}
}

/// Objects which can be read asynchronously.
pub trait AsyncRead {
/// Determines if this `AsyncRead`er can work with buffers of
/// uninitialized memory.
///
/// The default implementation returns an initializer which will zero
/// buffers.
///
/// # Safety
///
/// This method is `unsafe` because and `AsyncRead`er could otherwise
/// return a non-zeroing `Initializer` from another `AsyncRead` type
/// without an `unsafe` block.
#[inline]
unsafe fn initializer(&self) -> Initializer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the point of the Initializer having an unsafe constructor was so that this method didn't need to be unsafe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initializer::zeroing() is safe, Initializer::nop() is unsafe. It is safe to implement this method using Initializer::zeroing(), but unsafe to implement it using Initializer::nop().

This function is made unsafe so that users cannot implement it safely by delegating to the internally-unsafe implementation of the initializer function on another type.

All of this is copied from the std::io API. If you have comments, questions, or ideas about how to improve this design, please leave them in the tracking issue. If that API changes, we will update this one as necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting from the 0.2 RFC:

This resolves the issue of the unsafe on prepare_uninitialized_buffer being unsafe to implement, not unsafe to use (the more common, and arguably the correct meaning of unsafe fn).

The implementation doesn't seem to match, since even if Initializer::zeroing() is returned from fn initializer(), it doesn't matter, because it's unsafe to call initializer().

I'm leaving comments here because this API is being proposed in futures, a library that I rely on, and a place where stability markers cannot protect the it. The tracking issue in std hasn't seen much traction otherwise.

Copy link
Member Author

@cramertj cramertj Feb 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does indeed address the issue I raised in the text you quoted. However, I think the wording I used may have been confusing.

Currently, unsafe fn prepare_uninitialized_buffer is unsafe to implement, but not unsafe to call. unsafe on functions is intended to mean "unsafe to call", not "unsafe to implement." If an RFC like rust-lang/rfcs#2316 were to be approved, the prepare_uninitialized_buffer API would be unsound.

This PR fixes this problem by moving the unsafety to the call to Initializer::nop(). Even if rust-lang/rfcs#2316 were approved, this API would be sound. The initializer function itself is still unsafe, but the reason is different: it isn't that initializer is unsafe to implement, but that calling initializer is unsafe because it could allow you to produce values of type Initializer::nop() even when your AsyncRead implementation read from the supplied buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, unsafe fn prepare_uninitialized_buffer is unsafe to implement, but not unsafe to call.

I assume you mean that is the intent, since it definitely is unsafe to call the function with the current compilers.

I find the reasoning that you could get an Initializer::nop() back a little bit arbitrary: you don't find this sort of unsafe "poisoning" elsewhere. For instance, you can get a string back with str::from_utf8_unchecked(), but that shouldn't mean that any function that happens to wrap that is inherently unsafe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider this implementation:

struct MyUnsafePerformantRead;

impl AsyncRead for MyUnsafePerformantRead {
    fn initializer(&self) -> Initializer {
        unsafe { Initializer::nop() }
    }
    fn poll_read.... // Doesn't read from the buffer
}

struct MySafeButStillUninitRead;

impl AsyncRead for MySafeButStillUninitRead {
    fn initializer&self) -> Initializer {
        MyUnsafePerformantRead.initializer()
    }
    fn poll_read.... // Does read from the buffer (uninitialized memory), causes UB
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I imagined that is the concern, but I don't see anything that adding an unsafe to the function will help with. In fact, adding the unsafe keywords doesn't protect it at all. A wrong implementation is still wrong.

Copy link
Member Author

@cramertj cramertj Feb 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsafe prevents the unsound implementation of MySafeButStillUninitRead (undefined behavior without unsafe code). If MyUnsafeButPerformantRead::initializer() is unsafe, then, MySafeButStillUninitRead::initializer would have to use unsafe in order to trigger the UB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm even more unconvinced now, since you can easily grab an Initializer from a completely unrelated reader, and carry it around, and safely use it via unrelated_initializer.initialize(my_buf) everywhere else, and trigger the UB.

As a not-arbitrary example, consider Chain implementation takes Box<AsyncRead>s, and pushes them into a vec. The naive implementation calls self.reads[0].initializer(), and saves it, and then uses it in later reads, instead of remembering to ask for a new one.

I think the std API gives more an illusion of protection against UB, while still quite easily allowing it to happen, even by mistake.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you can easily grab an Initializer from a completely unrelated reader, and carry it around

Not without using unsafe. If you're making unsafe calls and then randomly passing around the result, in explicit violation of the invariants stated in the docs, then you're right, it is easy to cause UB.

Initializer::zeroing()
}

/// Attempt to read from the `AsyncRead` into `buf`.
///
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
///
/// If reading would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes readable or is closed.
fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
-> Poll<usize, Error>;

/// Attempt to read from the `AsyncRead` into `vec` using vectored
/// IO operations. This allows data to be read into multiple buffers
/// using a single operation.
///
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
///
/// By default, this method delegates to using `poll_read` on the first
/// buffer in `vec`. Objects which support vectored IO should override
/// this method.
///
/// If reading would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes readable or is closed.
fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
-> Poll<usize, Error>
{
if let Some(ref mut first_iovec) = vec.get_mut(0) {
self.poll_read(cx, first_iovec)
} else {
// `vec` is empty.
return Ok(Async::Ready(0));
}
}
}

/// Objects which can be written to asynchronously.
pub trait AsyncWrite {
/// Attempt to write bytes from `buf` into the object.
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
///
/// If writing would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// the object becomes writable or is closed.
fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
-> Poll<usize, Error>;

/// Attempt to write bytes from `vec` into the object using vectored
/// IO operations. This allows data from multiple buffers to be written
/// using a single operation.
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
///
/// By default, this method delegates to using `poll_write` on the first
/// buffer in `vec`. Objects which support vectored IO should override
/// this method.
///
/// If writing would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes writable or is closed.
fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
-> Poll<usize, Error>
{
if let Some(ref first_iovec) = vec.get(0) {
self.poll_write(cx, &*first_iovec)
} else {
// `vec` is empty.
return Ok(Async::Ready(0));
}
}

/// Attempt to flush the object, ensuring that all intermediately
/// buffered contents reach their destination.
///
/// On success, returns `Ok(Async::Ready(()))`.
///
/// If flushing is incomplete, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object can make progress towards flushing.
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error>;

/// Attempt to close the object.
///
/// On success, returns `Ok(Async::Ready(()))`.
///
/// If closing is incomplete, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object can make progress towards closing.
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
}

macro_rules! deref_async_read {
() => {
unsafe fn initializer(&self) -> Initializer {
(**self).initializer()
}

fn poll_read(&mut self, cx: &mut task::Context, buf: &mut [u8])
-> Poll<usize, Error>
{
(**self).poll_read(cx, buf)
}

fn poll_vectored_read(&mut self, cx: &mut task::Context, vec: &mut [&mut IoVec])
-> Poll<usize, Error>
{
(**self).poll_vectored_read(cx, vec)
}
}
}

impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
deref_async_read!();
}

impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
deref_async_read!();
}

/// `unsafe` because the `StdIo::Read` type must not access the buffer
/// before reading data into it.
macro_rules! unsafe_delegate_async_read_to_stdio {
() => {
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}

fn poll_read(&mut self, _: &mut task::Context, buf: &mut [u8])
-> Poll<usize, Error>
{
Ok(Async::Ready(StdIo::Read::read(self, buf)?))
}
}
}

impl<'a> AsyncRead for &'a [u8] {
unsafe_delegate_async_read_to_stdio!();
}

impl AsyncRead for StdIo::Repeat {
unsafe_delegate_async_read_to_stdio!();
}

impl<T: AsRef<[u8]>> AsyncRead for StdIo::Cursor<T> {
unsafe_delegate_async_read_to_stdio!();
}

macro_rules! deref_async_write {
() => {
fn poll_write(&mut self, cx: &mut task::Context, buf: &[u8])
-> Poll<usize, Error>
{
(**self).poll_write(cx, buf)
}

fn poll_vectored_write(&mut self, cx: &mut task::Context, vec: &[&IoVec])
-> Poll<usize, Error>
{
(**self).poll_vectored_write(cx, vec)
}

fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
(**self).poll_flush(cx)
}

fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
(**self).poll_close(cx)
}
}
}

impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
deref_async_write!();
}

impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
deref_async_write!();
}

macro_rules! delegate_async_write_to_stdio {
() => {
fn poll_write(&mut self, _: &mut task::Context, buf: &[u8])
-> Poll<usize, Error>
{
Ok(Async::Ready(StdIo::Write::write(self, buf)?))
}

fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Error> {
Ok(Async::Ready(StdIo::Write::flush(self)?))
}

fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
self.poll_flush(cx)
}
}
}

impl<'a> AsyncWrite for StdIo::Cursor<&'a mut [u8]> {
delegate_async_write_to_stdio!();
}

impl AsyncWrite for StdIo::Cursor<Vec<u8>> {
delegate_async_write_to_stdio!();
}

impl AsyncWrite for StdIo::Cursor<Box<[u8]>> {
delegate_async_write_to_stdio!();
}

impl AsyncWrite for StdIo::Sink {
delegate_async_write_to_stdio!();
}
}
5 changes: 4 additions & 1 deletion futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ Common utilities and extension traits for the futures-rs library.
"""

[features]
std = ["futures-core/std", "futures-sink/std"]
std = ["bytes", "log", "futures-core/std", "futures-io/std", "futures-sink/std"]
default = ["std"]

[dependencies]
bytes = { version = "0.4", optional = true }
log = { version = "0.4", optional = true }
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}

[dev-dependencies]
Expand Down
Loading