Skip to content

Commit

Permalink
Add futures-io
Browse files Browse the repository at this point in the history
  • Loading branch information
cramertj committed Feb 20, 2018
1 parent 3e5aa4b commit fcb2fb9
Show file tree
Hide file tree
Showing 29 changed files with 3,718 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"futures-core",
"futures-channel",
"futures-executor",
"futures-io",
"futures-util",
"futures-sink",
]
19 changes: 19 additions & 0 deletions futures-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "futures-io"
version = "0.2.0"
authors = ["Alex Crichton <alex@alexcrichton.com>"]
license = "MIT/Apache-2.0"
repository = "https://github.com/alexcrichton/futures-rs"
homepage = "https://github.com/alexcrichton/futures-rs"
documentation = "https://docs.rs/futures-io"
description = """
The `AsyncRead` and `AsyncWrite` traits for the futures-rs library.
"""

[features]
std = ["futures-core/std", "iovec"]
default = ["std"]

[dependencies]
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
iovec = { version = "0.1", optional = true }
305 changes: 305 additions & 0 deletions futures-io/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
//! Asynchronous IO
//!
//! This crate contains the `AsyncRead` and `AsyncWrite` traits which allow
//! data to be read and written asynchronously.

#![no_std]
#![deny(missing_docs, missing_debug_implementations)]
#![doc(html_root_url = "https://docs.rs/futures-io/0.2")]

macro_rules! if_std {
($($i:item)*) => ($(
#[cfg(feature = "std")]
$i
)*)
}

if_std! {
extern crate futures_core;
extern crate iovec;
extern crate std;

use futures_core::{Async, Poll, task};
use std::boxed::Box;
use std::io as StdIo;
use std::ptr;
use std::vec::Vec;

// Re-export IoVec for convenience
pub use iovec::IoVec;

// Re-export io::Error so that users don't have to deal
// with conflicts when `use`ing `futures::io` and `std::io`.
pub use StdIo::Error as Error;

/// A type used to conditionally initialize buffers passed to `AsyncRead`
/// methods.
#[derive(Debug)]
pub struct Initializer(bool);

impl Initializer {
/// Returns a new `Initializer` which will zero out buffers.
#[inline]
pub fn zeroing() -> Initializer {
Initializer(true)
}

/// Returns a new `Initializer` which will not zero out buffers.
///
/// # Safety
///
/// This method may only be called by `AsyncRead`ers which guarantee
/// that they will not read from the buffers passed to `AsyncRead`
/// methods, and that the return value of the method accurately reflects
/// the number of bytes that have been written to the head of the buffer.
#[inline]
pub unsafe fn nop() -> Initializer {
Initializer(false)
}

/// Indicates if a buffer should be initialized.
#[inline]
pub fn should_initialize(&self) -> bool {
self.0
}

/// Initializes a buffer if necessary.
#[inline]
pub fn initialize(&self, buf: &mut [u8]) {
if self.should_initialize() {
unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
}
}
}

/// Objects which can be read asynchronously.
pub trait AsyncRead {
/// Determines if this `AsyncRead`er can work with buffers of
/// uninitialized memory.
///
/// The default implementation returns an initializer which will zero
/// buffers.
///
/// # Safety
///
/// This method is `unsafe` because and `AsyncRead`er could otherwise
/// return a non-zeroing `Initializer` from another `AsyncRead` type
/// without an `unsafe` block.
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::zeroing()
}

/// Attempt to read from the `AsyncRead` into `buf`.
///
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
///
/// If reading would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes readable or is closed.
fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
-> Poll<usize, Error>;

/// Attempt to read from the `AsyncRead` into `vec` using vectored
/// IO operations. This allows data to be read into multiple buffers
/// using a single operation.
///
/// On success, returns `Ok(Async::Ready(num_bytes_read))`.
///
/// By default, this method delegates to using `poll_read` on the first
/// buffer in `vec`. Objects which support vectored IO should override
/// this method.
///
/// If reading would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes readable or is closed.
fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
if let Some(first_iovec) = vec.get_mut(0) {
self.poll_read(&mut *first_iovec, cx)
} else {
// `vec` is empty.
return Ok(Async::Ready(0));
}
}
}

/// Objects which can be written to asynchronously.
pub trait AsyncWrite {
/// Attempt to write bytes from `buf` into the object.
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
///
/// If writing would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// the object becomes writable or is closed.
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
-> Poll<usize, Error>;

/// Attempt to write bytes from `vec` into the object using vectored
/// IO operations. This allows data from multiple buffers to be written
/// using a single operation.
///
/// On success, returns `Ok(Async::Ready(num_bytes_written))`.
///
/// By default, this method delegates to using `poll_write` on the first
/// buffer in `vec`. Objects which support vectored IO should override
/// this method.
///
/// If writing would block, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object becomes writable or is closed.
fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
if let Some(first_iovec) = vec.get(0) {
self.poll_write(&*first_iovec, cx)
} else {
// `vec` is empty.
return Ok(Async::Ready(0));
}
}

/// Attempt to flush the object, ensuring that all intermediately
/// buffered contents reach their destination.
///
/// On success, returns `Ok(Async::Ready(()))`.
///
/// If flushing is incomplete, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object can make progress towards flushing.
fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error>;

/// Attempt to close the object.
///
/// On success, returns `Ok(Async::Ready(()))`.
///
/// If closing is incomplete, this function returns `Ok(Async::Pending)`
/// and arranges for `cx.waker()` to receive a notification when the
/// object can make progress towards closing.
fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error>;
}

macro_rules! deref_async_read {
() => {
unsafe fn initializer(&self) -> Initializer {
(**self).initializer()
}

fn poll_read(&mut self, buf: &mut [u8], cx: &mut task::Context)
-> Poll<usize, Error>
{
(**self).poll_read(buf, cx)
}

fn poll_vectored_read(&mut self, vec: &mut [&mut IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
(**self).poll_vectored_read(vec, cx)
}
}
}

impl<T: ?Sized + AsyncRead> AsyncRead for Box<T> {
deref_async_read!();
}

impl<'a, T: ?Sized + AsyncRead> AsyncRead for &'a mut T {
deref_async_read!();
}

/// `unsafe` because the `StdIo::Read` type must not access the buffer
/// before reading data into it.
macro_rules! unsafe_delegate_async_read_to_stdio {
() => {
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}

fn poll_read(&mut self, buf: &mut [u8], _: &mut task::Context)
-> Poll<usize, Error>
{
Ok(Async::Ready(StdIo::Read::read(self, buf)?))
}
}
}

impl<'a> AsyncRead for &'a [u8] {
unsafe_delegate_async_read_to_stdio!();
}

impl AsyncRead for StdIo::Repeat {
unsafe_delegate_async_read_to_stdio!();
}

impl<T: AsRef<[u8]>> AsyncRead for StdIo::Cursor<T> {
unsafe_delegate_async_read_to_stdio!();
}

macro_rules! deref_async_write {
() => {
fn poll_write(&mut self, buf: &[u8], cx: &mut task::Context)
-> Poll<usize, Error>
{
(**self).poll_write(buf, cx)
}

fn poll_vectored_write(&mut self, vec: &[&IoVec], cx: &mut task::Context)
-> Poll<usize, Error>
{
(**self).poll_vectored_write(vec, cx)
}

fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
(**self).poll_flush(cx)
}

fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
(**self).poll_close(cx)
}
}
}

impl<T: ?Sized + AsyncWrite> AsyncWrite for Box<T> {
deref_async_write!();
}

impl<'a, T: ?Sized + AsyncWrite> AsyncWrite for &'a mut T {
deref_async_write!();
}

macro_rules! delegate_async_write_to_stdio {
() => {
fn poll_write(&mut self, buf: &[u8], _: &mut task::Context)
-> Poll<usize, Error>
{
Ok(Async::Ready(StdIo::Write::write(self, buf)?))
}

fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Error> {
Ok(Async::Ready(StdIo::Write::flush(self)?))
}

fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Error> {
self.poll_flush(cx)
}
}
}

impl<'a> AsyncWrite for StdIo::Cursor<&'a mut [u8]> {
delegate_async_write_to_stdio!();
}

impl AsyncWrite for StdIo::Cursor<Vec<u8>> {
delegate_async_write_to_stdio!();
}

impl AsyncWrite for StdIo::Cursor<Box<[u8]>> {
delegate_async_write_to_stdio!();
}

impl AsyncWrite for StdIo::Sink {
delegate_async_write_to_stdio!();
}
}
5 changes: 4 additions & 1 deletion futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ Common utilities and extension traits for the futures-rs library.
"""

[features]
std = ["futures-core/std", "futures-sink/std"]
std = ["bytes", "log", "futures-core/std", "futures-io/std", "futures-sink/std"]
default = ["std"]

[dependencies]
bytes = { version = "0.4", optional = true }
log = { version = "0.4", optional = true }
futures-core = { path = "../futures-core", version = "0.2.0", default-features = false }
futures-channel = { path = "../futures-channel", version = "0.2.0", default-features = false }
futures-io = { path = "../futures-io", version = "0.2.0", default-features = false }
futures-sink = { path = "../futures-sink", version = "0.2.0", default-features = false}

[dev-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions futures-util/benches/bilock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ extern crate futures;
extern crate futures_util;
extern crate test;

use futures::prelude::*;
use futures::task::{self, Wake, Waker};
use futures::executor::LocalPool;
use prelude::*;
use task::{self, Wake, Waker};
useexecutor::LocalPool;
use futures_util::lock::BiLock;
use futures_util::lock::BiLockAcquire;
use futures_util::lock::BiLockAcquired;
Expand Down
6 changes: 3 additions & 3 deletions futures-util/benches/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ extern crate futures_channel;
extern crate futures_executor;
extern crate test;

use futures::prelude::*;
use futures::future;
use futures::stream::FuturesUnordered;
use prelude::*;
use future;
use stream::FuturesUnordered;
use futures_channel::oneshot;
use futures_executor::block_on;

Expand Down
Loading

0 comments on commit fcb2fb9

Please sign in to comment.