Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit futures dependency to Stream via feature flag #1774

Merged
merged 15 commits into from
Nov 16, 2019
16 changes: 5 additions & 11 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,12 @@ impl Shared {

/// Send a `LineCodec` encoded message to every peer, except
/// for the sender.
async fn broadcast(
&mut self,
sender: SocketAddr,
message: &str,
) -> Result<(), mpsc::error::UnboundedSendError> {
async fn broadcast(&mut self, sender: SocketAddr, message: &str) {
for peer in self.peers.iter_mut() {
if *peer.0 != sender {
peer.1.send(message.into()).await?;
let _ = peer.1.send(message.into());
}
}

Ok(())
}
}

Expand Down Expand Up @@ -218,7 +212,7 @@ async fn process(
let mut state = state.lock().await;
let msg = format!("{} has joined the chat", username);
println!("{}", msg);
state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}

// Process incoming messages until our stream is exhausted by a disconnect.
Expand All @@ -230,7 +224,7 @@ async fn process(
let mut state = state.lock().await;
let msg = format!("{}: {}", username, msg);

state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}
// A message was received from a peer. Send it to the
// current user.
Expand All @@ -254,7 +248,7 @@ async fn process(

let msg = format!("{} has left the chat", username);
println!("{}", msg);
state.broadcast(addr, &msg).await?;
state.broadcast(addr, &msg).await;
}

Ok(())
Expand Down
10 changes: 6 additions & 4 deletions examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::io;
use tokio::sync::{mpsc, oneshot};
use tokio_util::codec::{FramedRead, FramedWrite};

use futures::{SinkExt, Stream, StreamExt};
use futures::{Stream, StreamExt};
use std::env;
use std::error::Error;
use std::net::SocketAddr;
Expand Down Expand Up @@ -69,12 +69,14 @@ async fn run() -> Result<(), Box<dyn Error>> {

// Temporary work around for stdin blocking the stream
fn stdin() -> impl Stream<Item = Result<Vec<u8>, io::Error>> + Unpin {
let mut stdin = FramedRead::new(io::stdin(), codec::Bytes).map(Ok);
let mut stdin = FramedRead::new(io::stdin(), codec::Bytes);

let (mut tx, rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
tx.send_all(&mut stdin).await.unwrap();
while let Some(res) = stdin.next().await {
let _ = tx.send(res);
}
});

rx
Expand Down
2 changes: 1 addition & 1 deletion examples/print_each_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@
#![warn(rust_2018_idioms)]

use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_util::codec::{BytesCodec, Decoder};

use futures::StreamExt;
use std::env;

#[tokio::main]
Expand Down
6 changes: 3 additions & 3 deletions examples/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

#![warn(rust_2018_idioms)]

use futures::{future::try_join, FutureExt, StreamExt};
use futures::{future::try_join, FutureExt};
use std::{env, error::Error};
use tokio::{
io::AsyncReadExt,
Expand All @@ -37,9 +37,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);

let mut incoming = TcpListener::bind(listen_addr).await?.incoming();
let mut listener = TcpListener::bind(listen_addr).await?;

while let Some(Ok(inbound)) = incoming.next().await {
while let Ok((inbound, _)) = listener.accept().await {
let transfer = transfer(inbound, server_addr.clone()).map(|r| {
if let Err(e) = r {
println!("Failed to transfer; error={}", e);
Expand Down
5 changes: 2 additions & 3 deletions examples/udp-codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

#![warn(rust_2018_idioms)]

use tokio::future::FutureExt as TokioFutureExt;
use tokio::io;
use tokio::net::UdpSocket;
use tokio::{io, time};
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;

Expand Down Expand Up @@ -68,7 +67,7 @@ async fn ping(socket: &mut UdpFramed<BytesCodec>, b_addr: SocketAddr) -> Result<
async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
let timeout = Duration::from_millis(200);

while let Ok(Some(Ok((bytes, addr)))) = socket.next().timeout(timeout).await {
while let Ok(Some(Ok((bytes, addr)))) = time::timeout(timeout, socket.next()).await {
println!("[b] recv: {}", String::from_utf8_lossy(&bytes));

socket.send((Bytes::from(&b"PONG"[..]), addr)).await?;
Expand Down
5 changes: 2 additions & 3 deletions tests-integration/tests/process_stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use tokio::process::{Child, Command};
use tokio_test::assert_ok;

use futures::future::{self, FutureExt};
use futures::stream::StreamExt;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
Expand Down Expand Up @@ -47,9 +46,9 @@ async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
// (i.e. EOF is reached after `n` lines.
loop {
let data = reader
.next()
.next_line()
.await
.unwrap_or_else(|| Ok(String::new()))
.unwrap_or_else(|_| Some(String::new()))
.expect("failed to read line");

let num_read = data.len();
Expand Down
8 changes: 4 additions & 4 deletions tokio-test/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Handle {
/// The next operation in the mock's script will be to expect a `read` call
/// and return `buf`.
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.tx.try_send(Action::Read(buf.into())).unwrap();
self.tx.send(Action::Read(buf.into())).unwrap();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unbounded mpsc send API changed to be more ergonomic.

self
}

Expand All @@ -131,7 +131,7 @@ impl Handle {
/// The next operation in the mock's script will be to expect a `write`
/// call.
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.tx.try_send(Action::Write(buf.into())).unwrap();
self.tx.send(Action::Write(buf.into())).unwrap();
self
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ impl AsyncRead for Mock {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(time::delay(until));
self.inner.sleep = Some(time::delay_until(until));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delay was renamed delay_until to further disambiguate from delay_for.

} else {
self.inner.read_wait = Some(cx.waker().clone());
return Poll::Pending;
Expand Down Expand Up @@ -340,7 +340,7 @@ impl AsyncWrite for Mock {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = Instant::now() + rem;
self.inner.sleep = Some(time::delay(until));
self.inner.sleep = Some(time::delay_until(until));
} else {
panic!("unexpected WouldBlock");
}
Expand Down
4 changes: 2 additions & 2 deletions tokio-test/tests/block_on.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![warn(rust_2018_idioms)]

use tokio::time::{delay, Duration, Instant};
use tokio::time::{delay_until, Duration, Instant};
use tokio_test::block_on;

#[test]
Expand All @@ -20,5 +20,5 @@ fn async_fn() {
#[test]
fn test_delay() {
let deadline = Instant::now() + Duration::from_millis(100);
assert_eq!((), block_on(delay(deadline)));
assert_eq!((), block_on(delay_until(deadline)));
}
6 changes: 3 additions & 3 deletions tokio-tls/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ async fn client_to_server() {
drop(env_logger::try_init());

// Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

incoming now takes &mut self.

let addr = t!(srv.local_addr());

let (server_cx, client_cx) = contexts();
Expand Down Expand Up @@ -559,7 +559,7 @@ async fn server_to_client() {
drop(env_logger::try_init());

// Create a server listening on a port, then figure out what that port is
let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());

let (server_cx, client_cx) = contexts();
Expand Down Expand Up @@ -590,7 +590,7 @@ async fn one_byte_at_a_time() {
const AMT: usize = 1024;
drop(env_logger::try_init());

let srv = t!(TcpListener::bind("127.0.0.1:0").await);
let mut srv = t!(TcpListener::bind("127.0.0.1:0").await);
let addr = t!(srv.local_addr());

let (server_cx, client_cx) = contexts();
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ log = "0.4"
tokio = { version = "=0.2.0-alpha.6", path = "../tokio" }
tokio-test = { version = "=0.2.0-alpha.6", path = "../tokio-test" }

futures-util = "0.3.0"
futures = "0.3.0"

[package.metadata.docs.rs]
all-features = true
1 change: 1 addition & 0 deletions tokio-util/src/codec/length_delimited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
//! use tokio::prelude::*;
//! use tokio_util::codec::{Framed, LengthDelimitedCodec};
//!
//! use futures::SinkExt;
//! use bytes::Bytes;
//!
//! async fn write_frame<T>(io: T) -> Result<(), Box<dyn std::error::Error>>
Expand Down
1 change: 1 addition & 0 deletions tokio-util/tests/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use tokio_test::assert_ok;
use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts};

use bytes::{Buf, BufMut, BytesMut, IntoBuf};
use futures::StreamExt;
use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down
3 changes: 2 additions & 1 deletion tokio-util/tests/framed_read.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#![warn(rust_2018_idioms)]

use tokio::prelude::*;
use tokio::io::AsyncRead;
use tokio_test::assert_ready;
use tokio_test::task;
use tokio_util::codec::{Decoder, FramedRead};

use bytes::{Buf, BytesMut, IntoBuf};
use futures::Stream;
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
Expand Down
3 changes: 1 addition & 2 deletions tokio-util/tests/length_delimited.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
#![warn(rust_2018_idioms)]

use tokio::io::{AsyncRead, AsyncWrite};
use tokio::prelude::*;
use tokio_test::task;
use tokio_test::{
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
};
use tokio_util::codec::*;

use bytes::{BufMut, Bytes, BytesMut};
use futures_util::pin_mut;
use futures::{pin_mut, Sink, Stream};
use std::collections::VecDeque;
use std::io;
use std::pin::Pin;
Expand Down
8 changes: 4 additions & 4 deletions tokio-util/tests/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use tokio_util::codec::{Decoder, Encoder};
use tokio_util::udp::UdpFramed;

use bytes::{BufMut, BytesMut};
use futures_util::future::try_join;
use futures_util::future::FutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use futures::future::try_join;
use futures::future::FutureExt;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use std::io;

#[tokio::test]
Expand Down
10 changes: 6 additions & 4 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ default = [
"process",
"rt-full",
"signal",
"stream",
"sync",
"time",
]
Expand All @@ -40,7 +41,7 @@ blocking = ["rt-core"]
dns = ["blocking"]
fs = ["blocking"]
io-driver = ["mio", "lazy_static", "sync"] # TODO: get rid of sync
io-util = ["pin-project", "memchr"]
io-util = ["pin-project", "pin-project-lite", "memchr"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying out pin-project-lite, we probably should switch.

macros = ["tokio-macros"]
net = ["dns", "tcp", "udp", "uds"]
process = [
Expand All @@ -55,6 +56,7 @@ process = [
]
# Includes basic task execution capabilities
rt-core = []
# TODO: rename this -> `rt-threaded`
rt-full = [
"macros",
"num_cpus",
Expand All @@ -72,6 +74,7 @@ signal = [
"winapi/consoleapi",
"winapi/minwindef",
]
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["io-driver"]
Expand All @@ -84,18 +87,17 @@ uds = ["io-driver", "mio-uds", "libc"]
tokio-macros = { version = "=0.2.0-alpha.6", optional = true, path = "../tokio-macros" }

bytes = "0.4"
futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-util = { version = "0.3.0", features = ["sink", "channel"] }
iovec = "0.1"

# Everything else is optional...
fnv = { version = "1.0.6", optional = true }
futures-core = { version = "0.3.0", optional = true }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the Stream trait is used.

lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.14", optional = true }
num_cpus = { version = "1.8.0", optional = true }
pin-project = { version = "0.4", optional = true }
pin-project-lite = { version = "0.1", optional = true }
# Backs `DelayQueue`
slab = { version = "0.4.1", optional = true }

Expand Down
1 change: 0 additions & 1 deletion tokio/src/fs/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::fs::sys;
use crate::io::{AsyncRead, AsyncWrite};

use futures_core::ready;
use std::cmp;
use std::future::Future;
use std::io;
Expand Down
3 changes: 1 addition & 2 deletions tokio/src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::fs::blocking::Buf;
use crate::fs::{asyncify, sys};
use crate::io::{AsyncRead, AsyncWrite};

use futures_core::ready;
use std::fmt;
use std::fs::{Metadata, Permissions};
use std::future::Future;
Expand Down Expand Up @@ -430,7 +429,7 @@ impl File {
}

async fn complete_inflight(&mut self) {
use futures_util::future::poll_fn;
use crate::future::poll_fn;

if let Err(e) = poll_fn(|cx| Pin::new(&mut *self).poll_flush(cx)).await {
self.last_write_err = Some(e.kind());
Expand Down
Loading