Skip to content

Commit

Permalink
Limit futures dependency to Stream via feature flag (#1774)
Browse files Browse the repository at this point in the history
In an effort to reach API stability, the `tokio` crate is shedding its
_public_ dependencies on crates that are either a) do not provide a
stable (1.0+) release with longevity guarantees or b) match the `tokio`
release cadence. Of course, implementing `std` traits fits the
requirements.

The on exception, for now, is the `Stream` trait found in `futures_core`.
It is expected that this trait will not change much and be moved into `std.
Since Tokio is not yet going reaching 1.0, I feel that it is acceptable to maintain
a dependency on this trait given how foundational it is.

Since the `Stream` implementation is optional, types that are logically
streams provide `async fn next_*` functions to obtain the next value.
Avoiding the `next()` name prevents fn conflicts with `StreamExt::next()`.

Additionally, some misc cleanup is also done:

- `tokio::io::io` -> `tokio::io::util`.
- `delay` -> `delay_until`.
- `Timeout::new` -> `timeout(...)`.
- `signal::ctrl_c()` returns a future instead of a stream.
- `{tcp,unix}::Incoming` is removed (due to lack of `Stream` trait).
- `time::Throttle` is removed (due to lack of `Stream` trait).
-  Fix: `mpsc::UnboundedSender::send(&self)` (no more conflict with `Sink` fns).
  • Loading branch information
carllerche authored Nov 16, 2019
1 parent 9306795 commit 8a7e577
Show file tree
Hide file tree
Showing 130 changed files with 1,655 additions and 1,686 deletions.
16 changes: 5 additions & 11 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,12 @@ impl Shared {

/// Send a `LineCodec` encoded message to every peer, except
/// for the sender.
async fn broadcast(
&mut self,
sender: SocketAddr,
message: &str,
) -> Result<(), mpsc::error::UnboundedSendError> {
async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
for peer in self.peers.iter_mut() {
if *peer.0 != sender {
peer.1.send(message.into()).await?;
let _ = peer.1.send(message.into());
}
}

Ok(())
}
}

Expand Down Expand Up @@ -218,7 +212,7 @@ async fn process(
let mut state = state.lock().await;
let msg = format!("{} has joined the chat", username);
println!("{}", msg);
state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}

// Process incoming messages until our stream is exhausted by a disconnect.
Expand All @@ -230,7 +224,7 @@ async fn process(
let mut state = state.lock().await;
let msg = format!("{}: {}", username, msg);

state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}
// A message was received from a peer. Send it to the
// current user.
Expand All @@ -254,7 +248,7 @@ async fn process(

let msg = format!("{} has left the chat", username);
println!("{}", msg);
state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}

Ok(())
Expand Down
10 changes: 6 additions & 4 deletions examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::io;
use tokio::sync::{mpsc, oneshot};
use tokio_util::codec::{FramedRead, FramedWrite};

use futures::{SinkExt, Stream, StreamExt};
use futures::{Stream, StreamExt};
use std::env;
use std::error::Error;
use std::net::SocketAddr;
Expand Down Expand Up @@ -69,12 +69,14 @@ async fn run() -> Result<(), Box<dyn Error>> {

// Temporary work around for stdin blocking the stream
fn stdin() -> impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin {
let mut stdin = FramedRead::new(io::stdin(), codec::Bytes).map(Ok);
let mut stdin = FramedRead::new(io::stdin(), codec::Bytes);

let (mut tx, rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
tx.send_all(&mut stdin).await.unwrap();
while let Some(res) = stdin.next().await {
let _ = tx.send(res);
}
});

rx
Expand Down
2 changes: 1 addition & 1 deletion examples/print_each_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
#![warn(rust_2018_idioms)]

use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_util::codec::{BytesCodec, Decoder};

use futures::StreamExt;
use std::env;

#[tokio::main]
Expand Down
6 changes: 3 additions & 3 deletions examples/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#![warn(rust_2018_idioms)]

use futures::{future::try_join, FutureExt, StreamExt};
use futures::{future::try_join, FutureExt};
use std::{env, error::Error};
use tokio::{
io::AsyncReadExt,
Expand All @@ -37,9 +37,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);

let mut incoming = TcpListener::bind(listen_addr).await?.incoming();
let mut listener = TcpListener::bind(listen_addr).await?;

while let Some(Ok(inbound)) = incoming.next().await {
while let Ok((inbound, _)) = listener.accept().await {
let transfer = transfer(inbound, server_addr.clone()).map(|r| {
if let Err(e) = r {
println!("Failed to transfer; error={}", e);
Expand Down
5 changes: 2 additions & 3 deletions examples/udp-codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
#![warn(rust_2018_idioms)]

use tokio::future::FutureExt as TokioFutureExt;
use tokio::io;
use tokio::net::UdpSocket;
use tokio::{io, time};
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;

Expand Down Expand Up @@ -68,7 +67,7 @@ async fn ping(socket: &mut UdpFramed<BytesCodec>, b_addr: SocketAddr) -> Result<
async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
let timeout = Duration::from_millis(200);

while let Ok(Some(Ok((bytes, addr)))) = socket.next().timeout(timeout).await {
while let Ok(Some(Ok((bytes, addr)))) = time::timeout(timeout, socket.next()).await {
println!("[b] recv: {}", String::from_utf8_lossy(&bytes));

socket.send((Bytes::from(&b"PONG"[..]), addr)).await?;
Expand Down
5 changes: 2 additions & 3 deletions tests-integration/tests/process_stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use tokio::process::{Child, Command};
use tokio_test::assert_ok;

use futures::future::{self, FutureExt};
use futures::stream::StreamExt;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
Expand Down Expand Up @@ -47,9 +46,9 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
// (i.e. EOF is reached after `n` lines.
loop {
let data = reader
.next()
.next_line()
.await
.unwrap_or_else(|| Ok(String::new()))
.unwrap_or_else(|_| Some(String::new()))
.expect("failed to read line");

let num_read = data.len();
Expand Down
8 changes: 4 additions & 4 deletions tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Handle {
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.tx.try_send(Action::Read(buf.into())).unwrap();
self.tx.send(Action::Read(buf.into())).unwrap();
self
}

Expand All @@ -131,7 +131,7 @@ impl Handle {
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.tx.try_send(Action::Write(buf.into())).unwrap();
self.tx.send(Action::Write(buf.into())).unwrap();
self
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ impl AsyncRead for Mock {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(time::delay(until));
self.inner.sleep = Some(time::delay_until(until));
} else {
self.inner.read_wait = Some(cx.waker().clone());
return Poll::Pending;
Expand Down Expand Up @@ -340,7 +340,7 @@ impl AsyncWrite for Mock {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(time::delay(until));
self.inner.sleep = Some(time::delay_until(until));
} else {
panic!("unexpected WouldBlock");
}
Expand Down
4 changes: 2 additions & 2 deletions tokio-test/tests/block_on.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)]

use tokio::time::{delay, Duration, Instant};
use tokio::time::{delay_until, Duration, Instant};
use tokio_test::block_on;

#[test]
Expand All @@ -20,5 +20,5 @@ fn async_fn() {
#[test]
fn test_delay() {
let deadline = Instant::now() + Duration::from_millis(100);
assert_eq!((), block_on(delay(deadline)));
assert_eq!((), block_on(delay_until(deadline)));
}
6 changes: 3 additions & 3 deletions tokio-tls/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ async fn client_to_server() {
drop(env_logger::try_init());

// Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());

let (server_cx, client_cx) = contexts();
Expand Down Expand Up @@ -559,7 +559,7 @@ async fn server_to_client() {
drop(env_logger::try_init());

// Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());

let (server_cx, client_cx) = contexts();
Expand Down Expand Up @@ -590,7 +590,7 @@ async fn one_byte_at_a_time() {
const AMT: usize = 1024;
drop(env_logger::try_init());

let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());

let (server_cx, client_cx) = contexts();
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pin-project = "0.4"
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }

futures-util = "0.3.0"
futures = "0.3.0"

[package.metadata.docs.rs]
all-features = true
1 change: 1 addition & 0 deletions tokio-util/src/codec/length_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
//! use tokio::prelude::*;
//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
//!
//! use futures::SinkExt;
//! use bytes::Bytes;
//!
//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>
Expand Down
1 change: 1 addition & 0 deletions tokio-util/tests/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio_test::assert_ok;
use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};

use bytes::{Buf, BufMut, BytesMut, IntoBuf};
use futures::StreamExt;
use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down
3 changes: 2 additions & 1 deletion tokio-util/tests/framed_read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#![warn(rust_2018_idioms)]

use tokio::prelude::*;
use tokio::io::AsyncRead;
use tokio_test::assert_ready;
use tokio_test::task;
use tokio_util::codec::{Decoder, FramedRead};

use bytes::{Buf, BytesMut, IntoBuf};
use futures::Stream;
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
Expand Down
3 changes: 1 addition & 2 deletions tokio-util/tests/length_delimited.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#![warn(rust_2018_idioms)]

use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::*;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
use tokio_util::codec::*;

use bytes::{BufMut, Bytes, BytesMut};
use futures_util::pin_mut;
use futures::{pin_mut, Sink, Stream};
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
Expand Down
8 changes: 4 additions & 4 deletions tokio-util/tests/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use tokio_util::codec::{Decoder, Encoder};
use tokio_util::udp::UdpFramed;

use bytes::{BufMut, BytesMut};
use futures_util::future::try_join;
use futures_util::future::FutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures::future::try_join;
use futures::future::FutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::io;

#[tokio::test]
Expand Down
10 changes: 6 additions & 4 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ default = [
"process",
"rt-full",
"signal",
"stream",
"sync",
"time",
]
Expand All @@ -40,7 +41,7 @@ blocking = ["rt-core"]
dns = ["blocking"]
fs = ["blocking"]
io-driver = ["mio", "lazy_static", "sync"] # TODO: get rid of sync
io-util = ["pin-project", "memchr"]
io-util = ["pin-project", "pin-project-lite", "memchr"]
macros = ["tokio-macros"]
net = ["dns", "tcp", "udp", "uds"]
process = [
Expand All @@ -55,6 +56,7 @@ process = [
]
# Includes basic task execution capabilities
rt-core = []
# TODO: rename this -> `rt-threaded`
rt-full = [
"macros",
"num_cpus",
Expand All @@ -72,6 +74,7 @@ signal = [
"winapi/consoleapi",
"winapi/minwindef",
]
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["io-driver"]
Expand All @@ -84,18 +87,17 @@ uds = ["io-driver", "mio-uds", "libc"]
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }

bytes = "0.4"
futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-util = { version = "0.3.0", features = ["sink", "channel"] }
iovec = "0.1"

# Everything else is optional...
fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.14", optional = true }
num_cpus = { version = "1.8.0", optional = true }
pin-project = { version = "0.4", optional = true }
pin-project-lite = { version = "0.1", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }

Expand Down
1 change: 0 additions & 1 deletion tokio/src/fs/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::fs::sys;
use crate::io::{AsyncRead, AsyncWrite};

use futures_core::ready;
use std::cmp;
use std::future::Future;
use std::io;
Expand Down
3 changes: 1 addition & 2 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::fs::blocking::Buf;
use crate::fs::{asyncify, sys};
use crate::io::{AsyncRead, AsyncWrite};

use futures_core::ready;
use std::fmt;
use std::fs::{Metadata, Permissions};
use std::future::Future;
Expand Down Expand Up @@ -430,7 +429,7 @@ impl File {
}

async fn complete_inflight(&mut self) {
use futures_util::future::poll_fn;
use crate::future::poll_fn;

if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
self.last_write_err = Some(e.kind());
Expand Down
Loading

0 comments on commit 8a7e577

Please sign in to comment.