From 5155657406cbd2b5b0f2b0f2e58c31d450b43e78 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Mon, 14 Dec 2020 21:07:11 +0000 Subject: [PATCH] Move stream items into `tokio-stream` Closes #2870 --- CONTRIBUTING.md | 1 - Cargo.toml | 1 + examples/Cargo.toml | 5 +- examples/chat.rs | 15 +- examples/print_each_packet.rs | 2 +- examples/tinydb.rs | 2 +- examples/tinyhttp.rs | 2 +- examples/udp-codec.rs | 2 +- tokio-stream/Cargo.toml | 35 ++ {tokio/src/stream => tokio-stream/src}/all.rs | 2 +- {tokio/src/stream => tokio-stream/src}/any.rs | 2 +- .../src/stream => tokio-stream/src}/chain.rs | 2 +- .../stream => tokio-stream/src}/collect.rs | 2 +- .../src/stream => tokio-stream/src}/empty.rs | 4 +- .../src/stream => tokio-stream/src}/filter.rs | 2 +- .../stream => tokio-stream/src}/filter_map.rs | 2 +- .../src/stream => tokio-stream/src}/fold.rs | 2 +- .../src/stream => tokio-stream/src}/fuse.rs | 2 +- .../src/stream => tokio-stream/src}/iter.rs | 21 +- .../stream/mod.rs => tokio-stream/src/lib.rs | 73 ++- tokio-stream/src/macros.rs | 18 + {tokio/src/stream => tokio-stream/src}/map.rs | 2 +- .../src/stream => tokio-stream/src}/merge.rs | 2 +- .../src/stream => tokio-stream/src}/next.rs | 2 +- .../src/stream => tokio-stream/src}/once.rs | 6 +- .../stream => tokio-stream/src}/pending.rs | 4 +- .../src/stream => tokio-stream/src}/skip.rs | 2 +- .../stream => tokio-stream/src}/skip_while.rs | 2 +- .../stream => tokio-stream/src}/stream_map.rs | 151 ++++- .../src/stream => tokio-stream/src}/take.rs | 2 +- .../stream => tokio-stream/src}/take_while.rs | 2 +- .../stream => tokio-stream/src}/throttle.rs | 2 +- .../stream => tokio-stream/src}/timeout.rs | 2 +- .../stream => tokio-stream/src}/try_next.rs | 2 +- tokio-stream/tests/CONTRIBUTING.md | 562 ++++++++++++++++++ tokio-stream/tests/async_send_sync.rs | 105 ++++ {tokio => tokio-stream}/tests/stream_chain.rs | 15 +- .../tests/stream_collect.rs | 23 +- {tokio => tokio-stream}/tests/stream_empty.rs | 2 +- {tokio => tokio-stream}/tests/stream_fuse.rs | 2 +- {tokio => tokio-stream}/tests/stream_iter.rs | 2 +- {tokio => tokio-stream}/tests/stream_merge.rs | 15 +- {tokio => tokio-stream}/tests/stream_once.rs | 2 +- .../tests/stream_pending.rs | 2 +- .../tests/stream_stream_map.rs | 30 +- .../tests/stream_timeout.rs | 2 +- tokio-stream/tests/support/mpsc.rs | 15 + .../tests/time_throttle.rs | 2 +- tokio-test/Cargo.toml | 1 + tokio-test/src/io.rs | 19 +- tokio-test/src/task.rs | 2 +- tokio-util/Cargo.toml | 1 + tokio-util/src/codec/decoder.rs | 2 +- tokio-util/src/codec/framed.rs | 14 +- tokio-util/src/codec/framed_impl.rs | 6 +- tokio-util/src/codec/framed_read.rs | 5 +- tokio-util/src/codec/framed_write.rs | 3 +- tokio-util/src/codec/mod.rs | 2 +- tokio-util/src/either.rs | 6 +- tokio-util/src/io/read_buf.rs | 2 +- tokio-util/src/io/reader_stream.rs | 6 +- tokio-util/src/io/stream_reader.rs | 4 +- tokio-util/src/lib.rs | 2 +- tokio-util/src/udp/frame.rs | 5 +- tokio-util/tests/framed.rs | 3 +- tokio-util/tests/io_reader_stream.rs | 2 +- tokio-util/tests/io_stream_reader.rs | 2 +- tokio-util/tests/udp.rs | 3 +- tokio/Cargo.toml | 2 + tokio/src/coop.rs | 4 +- tokio/src/fs/read_dir.rs | 13 - tokio/src/io/util/async_buf_read_ext.rs | 11 +- tokio/src/io/util/lines.rs | 13 - tokio/src/io/util/split.rs | 13 - tokio/src/lib.rs | 4 - tokio/src/macros/pin.rs | 2 +- tokio/src/macros/select.rs | 6 +- tokio/src/net/tcp/listener.rs | 28 - tokio/src/net/unix/listener.rs | 19 +- tokio/src/signal/unix.rs | 10 - tokio/src/signal/windows.rs | 2 +- tokio/src/sync/broadcast.rs | 2 +- tokio/src/sync/mpsc/bounded.rs | 10 - tokio/src/sync/mpsc/unbounded.rs | 9 - tokio/src/time/interval.rs | 9 - tokio/tests/async_send_sync.rs | 14 +- tokio/tests/fs_dir.rs | 32 - tokio/tests/io_lines.rs | 16 - tokio/tests/rt_basic.rs | 15 +- tokio/tests/support/mpsc_stream.rs | 29 + tokio/tests/sync_broadcast.rs | 40 -- tokio/tests/sync_mpsc.rs | 15 +- tokio/tests/task_blocking.rs | 14 +- tokio/tests/tcp_accept.rs | 4 +- tokio/tests/time_interval.rs | 15 - 95 files changed, 1168 insertions(+), 430 deletions(-) create mode 100644 tokio-stream/Cargo.toml rename {tokio/src/stream => tokio-stream/src}/all.rs (98%) rename {tokio/src/stream => tokio-stream/src}/any.rs (98%) rename {tokio/src/stream => tokio-stream/src}/chain.rs (96%) rename {tokio/src/stream => tokio-stream/src}/collect.rs (99%) rename {tokio/src/stream => tokio-stream/src}/empty.rs (93%) rename {tokio/src/stream => tokio-stream/src}/filter.rs (98%) rename {tokio/src/stream => tokio-stream/src}/filter_map.rs (98%) rename {tokio/src/stream => tokio-stream/src}/fold.rs (98%) rename {tokio/src/stream => tokio-stream/src}/fuse.rs (97%) rename {tokio/src/stream => tokio-stream/src}/iter.rs (74%) rename tokio/src/stream/mod.rs => tokio-stream/src/lib.rs (93%) create mode 100644 tokio-stream/src/macros.rs rename {tokio/src/stream => tokio-stream/src}/map.rs (97%) rename {tokio/src/stream => tokio-stream/src}/merge.rs (98%) rename {tokio/src/stream => tokio-stream/src}/next.rs (97%) rename {tokio/src/stream => tokio-stream/src}/once.rs (88%) rename {tokio/src/stream => tokio-stream/src}/pending.rs (94%) rename {tokio/src/stream => tokio-stream/src}/skip.rs (98%) rename {tokio/src/stream => tokio-stream/src}/skip_while.rs (98%) rename {tokio/src/stream => tokio-stream/src}/stream_map.rs (77%) rename {tokio/src/stream => tokio-stream/src}/take.rs (98%) rename {tokio/src/stream => tokio-stream/src}/take_while.rs (98%) rename {tokio/src/stream => tokio-stream/src}/throttle.rs (98%) rename {tokio/src/stream => tokio-stream/src}/timeout.rs (98%) rename {tokio/src/stream => tokio-stream/src}/try_next.rs (96%) create mode 100644 tokio-stream/tests/CONTRIBUTING.md create mode 100644 tokio-stream/tests/async_send_sync.rs rename {tokio => tokio-stream}/tests/stream_chain.rs (89%) rename {tokio => tokio-stream}/tests/stream_collect.rs (89%) rename {tokio => tokio-stream}/tests/stream_empty.rs (79%) rename {tokio => tokio-stream}/tests/stream_fuse.rs (96%) rename {tokio => tokio-stream}/tests/stream_iter.rs (91%) rename {tokio => tokio-stream}/tests/stream_merge.rs (86%) rename {tokio => tokio-stream}/tests/stream_once.rs (82%) rename {tokio => tokio-stream}/tests/stream_pending.rs (85%) rename {tokio => tokio-stream}/tests/stream_stream_map.rs (93%) rename {tokio => tokio-stream}/tests/stream_timeout.rs (98%) create mode 100644 tokio-stream/tests/support/mpsc.rs rename {tokio => tokio-stream}/tests/time_throttle.rs (95%) create mode 100644 tokio/tests/support/mpsc_stream.rs diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 18e38e8087e..7de50fc6ac9 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -486,7 +486,6 @@ missing a difficulty rating, and you should feel free to add one. - **M-process** The `tokio::process` module. - **M-runtime** The `tokio::runtime` module. - **M-signal** The `tokio::signal` module. - - **M-stream** The `tokio::stream` module. - **M-sync** The `tokio::sync` module. - **M-task** The `tokio::task` module. - **M-time** The `tokio::time` module. diff --git a/Cargo.toml b/Cargo.toml index 1867acb7206..bc01f186281 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "tokio", "tokio-macros", "tokio-test", + "tokio-stream", "tokio-util", # Internal diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 4b736248239..fc35e60f303 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -8,9 +8,12 @@ edition = "2018" # [dependencies] instead. [dev-dependencies] tokio = { version = "1.0.0", path = "../tokio", features = ["full", "tracing"] } +tokio-util = { version = "0.6.0", path = "../tokio-util", features = ["full"] } +tokio-stream = { version = "0.1", path = "../tokio-stream" } + +async-stream = "0.3" tracing = "0.1" tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] } -tokio-util = { version = "0.6.0", path = "../tokio-util", features = ["full"] } bytes = "0.6" futures = "0.3.0" http = "0.2" diff --git a/examples/chat.rs b/examples/chat.rs index 3f945039232..821fd964b0b 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -27,8 +27,8 @@ #![warn(rust_2018_idioms)] use tokio::net::{TcpListener, TcpStream}; -use tokio::stream::{Stream, StreamExt}; use tokio::sync::{mpsc, Mutex}; +use tokio_stream::{Stream, StreamExt}; use tokio_util::codec::{Framed, LinesCodec, LinesCodecError}; use futures::SinkExt; @@ -101,9 +101,6 @@ async fn main() -> Result<(), Box> { /// Shorthand for the transmit half of the message channel. type Tx = mpsc::UnboundedSender; -/// Shorthand for the receive half of the message channel. -type Rx = mpsc::UnboundedReceiver; - /// Data that is shared between all peers in the chat server. /// /// This is the set of `Tx` handles for all connected clients. Whenever a @@ -127,7 +124,7 @@ struct Peer { /// /// This is used to receive messages from peers. When a message is received /// off of this `Rx`, it will be written to the socket. - rx: Rx, + rx: Pin + Send>>, } impl Shared { @@ -159,11 +156,17 @@ impl Peer { let addr = lines.get_ref().peer_addr()?; // Create a channel for this peer - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = mpsc::unbounded_channel(); // Add an entry for this `Peer` in the shared state map. state.lock().await.peers.insert(addr, tx); + let rx = Box::pin(async_stream::stream! { + while let Some(item) = rx.recv().await { + yield item; + } + }); + Ok(Peer { lines, rx }) } } diff --git a/examples/print_each_packet.rs b/examples/print_each_packet.rs index b3e1b17ecde..087f9cf03eb 100644 --- a/examples/print_each_packet.rs +++ b/examples/print_each_packet.rs @@ -55,7 +55,7 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; -use tokio::stream::StreamExt; +use tokio_stream::StreamExt; use tokio_util::codec::{BytesCodec, Decoder}; use std::env; diff --git a/examples/tinydb.rs b/examples/tinydb.rs index f0db7fa86d8..9da429ace69 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -42,7 +42,7 @@ #![warn(rust_2018_idioms)] use tokio::net::TcpListener; -use tokio::stream::StreamExt; +use tokio_stream::StreamExt; use tokio_util::codec::{Framed, LinesCodec}; use futures::SinkExt; diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index c561bbd31b1..e86305f367e 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -20,7 +20,7 @@ use http::{header::HeaderValue, Request, Response, StatusCode}; extern crate serde_derive; use std::{env, error::Error, fmt, io}; use tokio::net::{TcpListener, TcpStream}; -use tokio::stream::StreamExt; +use tokio_stream::StreamExt; use tokio_util::codec::{Decoder, Encoder, Framed}; #[tokio::main] diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index 8b64cbc3ab5..7c305245bce 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -9,8 +9,8 @@ #![warn(rust_2018_idioms)] use tokio::net::UdpSocket; -use tokio::stream::StreamExt; use tokio::{io, time}; +use tokio_stream::StreamExt; use tokio_util::codec::BytesCodec; use tokio_util::udp::UdpFramed; diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml new file mode 100644 index 00000000000..c6720320682 --- /dev/null +++ b/tokio-stream/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "tokio-stream" +# When releasing to crates.io: +# - Remove path dependencies +# - Update html_root_url. +# - Update doc url +# - Cargo.toml +# - Update CHANGELOG.md. +# - Create "tokio-stream-0.1.x" git tag. +version = "0.1.0" +edition = "2018" +authors = ["Tokio Contributors "] +license = "MIT" +repository = "https://github.com/tokio-rs/tokio" +homepage = "https://tokio.rs" +documentation = "https://docs.rs/tokio-stream/0.1.0/tokio_stream" +description = """ +Utilities to work with `Stream` and `tokio`. +""" +categories = ["asynchronous"] +publish = false + +[dependencies] +futures-core = { version = "0.3.0" } +pin-project-lite = "0.2.0" +rand = "0.7.3" +tokio = { version = "1.0", path = "../tokio", features = ["sync"] } +async-stream = "0.3" + +[dev-dependencies] +tokio = { version = "1.0", path = "../tokio", features = ["full"] } +tokio-test = { path = "../tokio-test" } +futures = { version = "0.3", default-features = false } + +proptest = "0.10.0" \ No newline at end of file diff --git a/tokio/src/stream/all.rs b/tokio-stream/src/all.rs similarity index 98% rename from tokio/src/stream/all.rs rename to tokio-stream/src/all.rs index 353d61a3b04..11573f9b973 100644 --- a/tokio/src/stream/all.rs +++ b/tokio-stream/src/all.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; diff --git a/tokio/src/stream/any.rs b/tokio-stream/src/any.rs similarity index 98% rename from tokio/src/stream/any.rs rename to tokio-stream/src/any.rs index aac0ec7569c..4c4c5939483 100644 --- a/tokio/src/stream/any.rs +++ b/tokio-stream/src/any.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; diff --git a/tokio/src/stream/chain.rs b/tokio-stream/src/chain.rs similarity index 96% rename from tokio/src/stream/chain.rs rename to tokio-stream/src/chain.rs index 6124c91e44f..cfdef83d73c 100644 --- a/tokio/src/stream/chain.rs +++ b/tokio-stream/src/chain.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, Stream}; +use crate::{Fuse, Stream}; use core::pin::Pin; use core::task::{Context, Poll}; diff --git a/tokio/src/stream/collect.rs b/tokio-stream/src/collect.rs similarity index 99% rename from tokio/src/stream/collect.rs rename to tokio-stream/src/collect.rs index 1aafc303d5f..9e20179f037 100644 --- a/tokio/src/stream/collect.rs +++ b/tokio-stream/src/collect.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; diff --git a/tokio/src/stream/empty.rs b/tokio-stream/src/empty.rs similarity index 93% rename from tokio/src/stream/empty.rs rename to tokio-stream/src/empty.rs index 2f56ac6cad3..965dcf5da7a 100644 --- a/tokio/src/stream/empty.rs +++ b/tokio-stream/src/empty.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::marker::PhantomData; use core::pin::Pin; @@ -24,7 +24,7 @@ unsafe impl Sync for Empty {} /// Basic usage: /// /// ``` -/// use tokio::stream::{self, StreamExt}; +/// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { diff --git a/tokio/src/stream/filter.rs b/tokio-stream/src/filter.rs similarity index 98% rename from tokio/src/stream/filter.rs rename to tokio-stream/src/filter.rs index 799630b2346..f3dd8716b48 100644 --- a/tokio/src/stream/filter.rs +++ b/tokio-stream/src/filter.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::fmt; use core::pin::Pin; diff --git a/tokio/src/stream/filter_map.rs b/tokio-stream/src/filter_map.rs similarity index 98% rename from tokio/src/stream/filter_map.rs rename to tokio-stream/src/filter_map.rs index 8dc05a54603..fe604a6f4b5 100644 --- a/tokio/src/stream/filter_map.rs +++ b/tokio-stream/src/filter_map.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::fmt; use core::pin::Pin; diff --git a/tokio/src/stream/fold.rs b/tokio-stream/src/fold.rs similarity index 98% rename from tokio/src/stream/fold.rs rename to tokio-stream/src/fold.rs index 5cf2bfafa8c..e2e97d8f375 100644 --- a/tokio/src/stream/fold.rs +++ b/tokio-stream/src/fold.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; diff --git a/tokio/src/stream/fuse.rs b/tokio-stream/src/fuse.rs similarity index 97% rename from tokio/src/stream/fuse.rs rename to tokio-stream/src/fuse.rs index 6c9e02d6643..2500641d95d 100644 --- a/tokio/src/stream/fuse.rs +++ b/tokio-stream/src/fuse.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use pin_project_lite::pin_project; use std::pin::Pin; diff --git a/tokio/src/stream/iter.rs b/tokio-stream/src/iter.rs similarity index 74% rename from tokio/src/stream/iter.rs rename to tokio-stream/src/iter.rs index bc0388a1442..77d065fc430 100644 --- a/tokio/src/stream/iter.rs +++ b/tokio-stream/src/iter.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::pin::Pin; use core::task::{Context, Poll}; @@ -8,6 +8,7 @@ use core::task::{Context, Poll}; #[must_use = "streams do nothing unless polled"] pub struct Iter { iter: I, + yield_amt: usize, } impl Unpin for Iter {} @@ -20,7 +21,7 @@ impl Unpin for Iter {} /// /// ``` /// # async fn dox() { -/// use tokio::stream::{self, StreamExt}; +/// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(vec![17, 19]); /// @@ -35,6 +36,7 @@ where { Iter { iter: i.into_iter(), + yield_amt: 0, } } @@ -45,9 +47,18 @@ where type Item = I::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let coop = ready!(crate::coop::poll_proceed(cx)); - coop.made_progress(); - Poll::Ready(self.iter.next()) + // TODO: add coop back + if self.yield_amt > 32 { + self.yield_amt = 0; + + cx.waker().wake_by_ref(); + + Poll::Pending + } else { + self.yield_amt += 1; + + Poll::Ready(self.iter.next()) + } } fn size_hint(&self) -> (usize, Option) { diff --git a/tokio/src/stream/mod.rs b/tokio-stream/src/lib.rs similarity index 93% rename from tokio/src/stream/mod.rs rename to tokio-stream/src/lib.rs index 81afe7aef13..ff93a18d7c4 100644 --- a/tokio/src/stream/mod.rs +++ b/tokio-stream/src/lib.rs @@ -15,7 +15,7 @@ //! `while let` loop as follows: //! //! ```rust -//! use tokio::stream::{self, StreamExt}; +//! use tokio_stream::{self as stream, StreamExt}; //! //! #[tokio::main] //! async fn main() { @@ -53,6 +53,9 @@ //! [`ReaderStream`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.ReaderStream.html //! [`StreamReader`]: https://docs.rs/tokio-util/0.4/tokio_util/io/struct.StreamReader.html +#[macro_use] +mod macros; + mod all; use all::AllFuture; @@ -122,7 +125,7 @@ cfg_time! { use timeout::Timeout; use crate::time::Duration; mod throttle; - use crate::stream::throttle::{throttle, Throttle}; + use crate::throttle::{throttle, Throttle}; } #[doc(no_inline)] @@ -146,11 +149,11 @@ pub use futures_core::Stream; /// # #[tokio::main(flavor = "current_thread")] /// # async fn main() { /// -/// let a = tokio::stream::iter(vec![1, 3, 5]); -/// let b = tokio::stream::iter(vec![2, 4, 6]); +/// let a = tokio_stream::iter(vec![1, 3, 5]); +/// let b = tokio_stream::iter(vec![2, 4, 6]); /// /// // use the fully qualified call syntax for the other trait: -/// let merged = tokio::stream::StreamExt::merge(a, b); +/// let merged = tokio_stream::StreamExt::merge(a, b); /// /// // use normal call notation for futures::stream::StreamExt::collect /// let output: Vec<_> = merged.collect().await; @@ -183,7 +186,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(1..=3); /// @@ -219,7 +222,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(vec![Ok(1), Ok(2), Err("nope")]); /// @@ -251,7 +254,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let stream = stream::iter(1..=3); /// let mut stream = stream.map(|x| x + 3); @@ -289,11 +292,12 @@ pub trait StreamExt: Stream { /// # Examples /// /// ``` - /// use tokio::stream::StreamExt; + /// use tokio_stream::{StreamExt, Stream}; /// use tokio::sync::mpsc; /// use tokio::time; /// /// use std::time::Duration; + /// use std::pin::Pin; /// /// # /* /// #[tokio::main] @@ -301,8 +305,21 @@ pub trait StreamExt: Stream { /// # #[tokio::main(flavor = "current_thread")] /// async fn main() { /// # time::pause(); - /// let (tx1, rx1) = mpsc::channel(10); - /// let (tx2, rx2) = mpsc::channel(10); + /// let (tx1, mut rx1) = mpsc::channel::(10); + /// let (tx2, mut rx2) = mpsc::channel::(10); + /// + /// // Convert the channels to a `Stream`. + /// let rx1 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx1.recv().await { + /// yield item; + /// } + /// }) as Pin + Send>>; + /// + /// let rx2 = Box::pin(async_stream::stream! { + /// while let Some(item) = rx2.recv().await { + /// yield item; + /// } + /// }) as Pin + Send>>; /// /// let mut rx = rx1.merge(rx2); /// @@ -365,7 +382,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let stream = stream::iter(1..=8); /// let mut evens = stream.filter(|x| x % 2 == 0); @@ -401,7 +418,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let stream = stream::iter(1..=8); /// let mut evens = stream.filter_map(|x| { @@ -433,7 +450,7 @@ pub trait StreamExt: Stream { /// # Examples /// /// ``` - /// use tokio::stream::{Stream, StreamExt}; + /// use tokio_stream::{Stream, StreamExt}; /// /// use std::pin::Pin; /// use std::task::{Context, Poll}; @@ -498,7 +515,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(1..=10).take(3); /// @@ -527,7 +544,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(1..=10).take_while(|x| *x <= 3); /// @@ -553,7 +570,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let mut stream = stream::iter(1..=10).skip(7); /// @@ -584,7 +601,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// let mut stream = stream::iter(vec![1,2,3,4,1]).skip_while(|x| *x < 3); /// /// assert_eq!(Some(3), stream.next().await); @@ -627,7 +644,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let a = [1, 2, 3]; /// @@ -642,7 +659,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let a = [1, 2, 3]; /// @@ -686,7 +703,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let a = [1, 2, 3]; /// @@ -701,7 +718,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// let a = [1, 2, 3]; /// @@ -730,7 +747,7 @@ pub trait StreamExt: Stream { /// # Examples /// /// ``` - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { @@ -770,7 +787,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, *}; + /// use tokio_stream::{self as stream, *}; /// /// let s = stream::iter(vec![1u8, 2, 3]); /// let sum = s.fold(0, |acc, x| acc + x).await; @@ -816,7 +833,7 @@ pub trait StreamExt: Stream { /// Basic usage: /// /// ``` - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { @@ -833,7 +850,7 @@ pub trait StreamExt: Stream { /// Collecting a stream of `Result` values /// /// ``` - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { @@ -889,7 +906,7 @@ pub trait StreamExt: Stream { /// ``` /// # #[tokio::main] /// # async fn main() { - /// use tokio::stream::{self, StreamExt}; + /// use tokio_stream::{self as stream, StreamExt}; /// use std::time::Duration; /// # let int_stream = stream::iter(1..=3); /// @@ -934,7 +951,7 @@ pub trait StreamExt: Stream { /// Create a throttled stream. /// ```rust,no_run /// use std::time::Duration; - /// use tokio::stream::StreamExt; + /// use tokio_stream::StreamExt; /// /// # async fn dox() { /// let mut item_stream = futures::stream::repeat("one").throttle(Duration::from_secs(2)); diff --git a/tokio-stream/src/macros.rs b/tokio-stream/src/macros.rs new file mode 100644 index 00000000000..0d493a85119 --- /dev/null +++ b/tokio-stream/src/macros.rs @@ -0,0 +1,18 @@ +macro_rules! cfg_time { + ($($item:item)*) => { + $( + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + $item + )* + } +} + +macro_rules! ready { + ($e:expr $(,)?) => { + match $e { + std::task::Poll::Ready(t) => t, + std::task::Poll::Pending => return std::task::Poll::Pending, + } + }; +} diff --git a/tokio/src/stream/map.rs b/tokio-stream/src/map.rs similarity index 97% rename from tokio/src/stream/map.rs rename to tokio-stream/src/map.rs index dfac5a2c942..e6b47cd2582 100644 --- a/tokio/src/stream/map.rs +++ b/tokio-stream/src/map.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::fmt; use core::pin::Pin; diff --git a/tokio/src/stream/merge.rs b/tokio-stream/src/merge.rs similarity index 98% rename from tokio/src/stream/merge.rs rename to tokio-stream/src/merge.rs index 50ba518ce39..ea0ace0e21e 100644 --- a/tokio/src/stream/merge.rs +++ b/tokio-stream/src/merge.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, Stream}; +use crate::{Fuse, Stream}; use core::pin::Pin; use core::task::{Context, Poll}; diff --git a/tokio/src/stream/next.rs b/tokio-stream/src/next.rs similarity index 97% rename from tokio/src/stream/next.rs rename to tokio-stream/src/next.rs index d9b1f920599..175490c488a 100644 --- a/tokio/src/stream/next.rs +++ b/tokio-stream/src/next.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::future::Future; use core::marker::PhantomPinned; diff --git a/tokio/src/stream/once.rs b/tokio-stream/src/once.rs similarity index 88% rename from tokio/src/stream/once.rs rename to tokio-stream/src/once.rs index 7fe204cc127..04b4c052b84 100644 --- a/tokio/src/stream/once.rs +++ b/tokio-stream/src/once.rs @@ -1,4 +1,4 @@ -use crate::stream::{self, Iter, Stream}; +use crate::{Iter, Stream}; use core::option; use core::pin::Pin; @@ -20,7 +20,7 @@ impl Unpin for Once {} /// # Examples /// /// ``` -/// use tokio::stream::{self, StreamExt}; +/// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { @@ -35,7 +35,7 @@ impl Unpin for Once {} /// ``` pub fn once(value: T) -> Once { Once { - iter: stream::iter(Some(value).into_iter()), + iter: crate::iter(Some(value).into_iter()), } } diff --git a/tokio/src/stream/pending.rs b/tokio-stream/src/pending.rs similarity index 94% rename from tokio/src/stream/pending.rs rename to tokio-stream/src/pending.rs index 21224c38596..76faec04092 100644 --- a/tokio/src/stream/pending.rs +++ b/tokio-stream/src/pending.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::marker::PhantomData; use core::pin::Pin; @@ -25,7 +25,7 @@ unsafe impl Sync for Pending {} /// Basic usage: /// /// ```no_run -/// use tokio::stream::{self, StreamExt}; +/// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { diff --git a/tokio/src/stream/skip.rs b/tokio-stream/src/skip.rs similarity index 98% rename from tokio/src/stream/skip.rs rename to tokio-stream/src/skip.rs index 39540cc984c..80a0a0aff0d 100644 --- a/tokio/src/stream/skip.rs +++ b/tokio-stream/src/skip.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::fmt; use core::pin::Pin; diff --git a/tokio/src/stream/skip_while.rs b/tokio-stream/src/skip_while.rs similarity index 98% rename from tokio/src/stream/skip_while.rs rename to tokio-stream/src/skip_while.rs index 4e0500701a3..985a92666e0 100644 --- a/tokio/src/stream/skip_while.rs +++ b/tokio-stream/src/skip_while.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::fmt; use core::pin::Pin; diff --git a/tokio/src/stream/stream_map.rs b/tokio-stream/src/stream_map.rs similarity index 77% rename from tokio/src/stream/stream_map.rs rename to tokio-stream/src/stream_map.rs index 9fed3c19679..44a4c83c965 100644 --- a/tokio/src/stream/stream_map.rs +++ b/tokio-stream/src/stream_map.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use std::borrow::Borrow; use std::hash::Hash; @@ -52,13 +52,27 @@ use std::task::{Context, Poll}; /// Merging two streams, then remove them after receiving the first value /// /// ``` -/// use tokio::stream::{StreamExt, StreamMap}; +/// use tokio_stream::{StreamExt, StreamMap, Stream}; /// use tokio::sync::mpsc; +/// use std::pin::Pin; /// /// #[tokio::main] /// async fn main() { -/// let (tx1, rx1) = mpsc::channel(10); -/// let (tx2, rx2) = mpsc::channel(10); +/// let (tx1, mut rx1) = mpsc::channel::(10); +/// let (tx2, mut rx2) = mpsc::channel::(10); +/// +/// // Convert the channels to a `Stream`. +/// let rx1 = Box::pin(async_stream::stream! { +/// while let Some(item) = rx1.recv().await { +/// yield item; +/// } +/// }) as Pin + Send>>; +/// +/// let rx2 = Box::pin(async_stream::stream! { +/// while let Some(item) = rx2.recv().await { +/// yield item; +/// } +/// }) as Pin + Send>>; /// /// tokio::spawn(async move { /// tx1.send(1).await.unwrap(); @@ -103,7 +117,7 @@ use std::task::{Context, Poll}; /// sent to the client over a socket. /// /// ```no_run -/// use tokio::stream::{Stream, StreamExt, StreamMap}; +/// use tokio_stream::{Stream, StreamExt, StreamMap}; /// /// enum Command { /// Join(String), @@ -112,13 +126,13 @@ use std::task::{Context, Poll}; /// /// fn commands() -> impl Stream { /// // Streams in user commands by parsing `stdin`. -/// # tokio::stream::pending() +/// # tokio_stream::pending() /// } /// /// // Join a channel, returns a stream of messages received on the channel. /// fn join(channel: &str) -> impl Stream + Unpin { /// // left as an exercise to the reader -/// # tokio::stream::pending() +/// # tokio_stream::pending() /// } /// /// #[tokio::main] @@ -170,7 +184,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// @@ -193,7 +207,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// @@ -217,7 +231,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, Pending}; + /// use tokio_stream::{StreamMap, Pending}; /// /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); /// ``` @@ -233,7 +247,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, Pending}; + /// use tokio_stream::{StreamMap, Pending}; /// /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); /// ``` @@ -250,7 +264,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// @@ -273,7 +287,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// @@ -296,7 +310,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// @@ -320,7 +334,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, Pending}; + /// use tokio_stream::{StreamMap, Pending}; /// /// let map: StreamMap> = StreamMap::with_capacity(100); /// assert!(map.capacity() >= 100); @@ -334,7 +348,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut a = StreamMap::new(); /// assert_eq!(a.len(), 0); @@ -367,7 +381,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut a = StreamMap::new(); /// a.insert(1, pending::()); @@ -388,7 +402,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// @@ -416,7 +430,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// map.insert(1, pending::()); @@ -445,7 +459,7 @@ impl StreamMap { /// # Examples /// /// ``` - /// use tokio::stream::{StreamMap, pending}; + /// use tokio_stream::{StreamMap, pending}; /// /// let mut map = StreamMap::new(); /// map.insert(1, pending::()); @@ -476,7 +490,7 @@ where fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll> { use Poll::*; - let start = crate::util::thread_rng_n(self.entries.len() as u32) as usize; + let start = rand::thread_rng_n(self.entries.len() as u32) as usize; let mut idx = start; for _ in 0..self.entries.len() { @@ -553,3 +567,98 @@ where ret } } + +mod rand { + use std::cell::Cell; + + mod loom { + #[cfg(not(loom))] + pub(crate) mod rand { + use std::collections::hash_map::RandomState; + use std::hash::{BuildHasher, Hash, Hasher}; + use std::sync::atomic::AtomicU32; + use std::sync::atomic::Ordering::Relaxed; + + static COUNTER: AtomicU32 = AtomicU32::new(1); + + pub(crate) fn seed() -> u64 { + let rand_state = RandomState::new(); + + let mut hasher = rand_state.build_hasher(); + + // Hash some unique-ish data to generate some new state + COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); + + // Get the seed + hasher.finish() + } + } + + #[cfg(loom)] + pub(crate) mod rand { + pub(crate) fn seed() -> u64 { + 1 + } + } + } + + /// Fast random number generate + /// + /// Implement xorshift64+: 2 32-bit xorshift sequences added together. + /// Shift triplet [17,7,16] was calculated as indicated in Marsaglia's + /// Xorshift paper: https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf + /// This generator passes the SmallCrush suite, part of TestU01 framework: + /// http://simul.iro.umontreal.ca/testu01/tu01.html + #[derive(Debug)] + pub(crate) struct FastRand { + one: Cell, + two: Cell, + } + + impl FastRand { + /// Initialize a new, thread-local, fast random number generator. + pub(crate) fn new(seed: u64) -> FastRand { + let one = (seed >> 32) as u32; + let mut two = seed as u32; + + if two == 0 { + // This value cannot be zero + two = 1; + } + + FastRand { + one: Cell::new(one), + two: Cell::new(two), + } + } + + pub(crate) fn fastrand_n(&self, n: u32) -> u32 { + // This is similar to fastrand() % n, but faster. + // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + let mul = (self.fastrand() as u64).wrapping_mul(n as u64); + (mul >> 32) as u32 + } + + fn fastrand(&self) -> u32 { + let mut s1 = self.one.get(); + let s0 = self.two.get(); + + s1 ^= s1 << 17; + s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; + + self.one.set(s0); + self.two.set(s1); + + s0.wrapping_add(s1) + } + } + + // Used by the select macro and `StreamMap` + pub(crate) fn thread_rng_n(n: u32) -> u32 { + thread_local! { + static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); + } + + THREAD_RNG.with(|rng| rng.fastrand_n(n)) + } +} diff --git a/tokio/src/stream/take.rs b/tokio-stream/src/take.rs similarity index 98% rename from tokio/src/stream/take.rs rename to tokio-stream/src/take.rs index a92430b77c8..c75648f6065 100644 --- a/tokio/src/stream/take.rs +++ b/tokio-stream/src/take.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::cmp; use core::fmt; diff --git a/tokio/src/stream/take_while.rs b/tokio-stream/src/take_while.rs similarity index 98% rename from tokio/src/stream/take_while.rs rename to tokio-stream/src/take_while.rs index cf1e1606131..5ce4dd98a98 100644 --- a/tokio/src/stream/take_while.rs +++ b/tokio-stream/src/take_while.rs @@ -1,4 +1,4 @@ -use crate::stream::Stream; +use crate::Stream; use core::fmt; use core::pin::Pin; diff --git a/tokio/src/stream/throttle.rs b/tokio-stream/src/throttle.rs similarity index 98% rename from tokio/src/stream/throttle.rs rename to tokio-stream/src/throttle.rs index ff1fbf01f90..a661001e9b8 100644 --- a/tokio/src/stream/throttle.rs +++ b/tokio-stream/src/throttle.rs @@ -1,7 +1,7 @@ //! Slow down a stream by enforcing a delay between items. -use crate::stream::Stream; use crate::time::{Duration, Instant, Sleep}; +use crate::Stream; use std::future::Future; use std::marker::Unpin; diff --git a/tokio/src/stream/timeout.rs b/tokio-stream/src/timeout.rs similarity index 98% rename from tokio/src/stream/timeout.rs rename to tokio-stream/src/timeout.rs index 61154da0581..441a76f8efe 100644 --- a/tokio/src/stream/timeout.rs +++ b/tokio-stream/src/timeout.rs @@ -1,5 +1,5 @@ -use crate::stream::{Fuse, Stream}; use crate::time::{error::Elapsed, Instant, Sleep}; +use crate::{Fuse, Stream}; use core::future::Future; use core::pin::Pin; diff --git a/tokio/src/stream/try_next.rs b/tokio-stream/src/try_next.rs similarity index 96% rename from tokio/src/stream/try_next.rs rename to tokio-stream/src/try_next.rs index b21d279af96..e91e8a429b5 100644 --- a/tokio/src/stream/try_next.rs +++ b/tokio-stream/src/try_next.rs @@ -1,4 +1,4 @@ -use crate::stream::{Next, Stream}; +use crate::{Next, Stream}; use core::future::Future; use core::marker::PhantomPinned; diff --git a/tokio-stream/tests/CONTRIBUTING.md b/tokio-stream/tests/CONTRIBUTING.md new file mode 100644 index 00000000000..7de50fc6ac9 --- /dev/null +++ b/tokio-stream/tests/CONTRIBUTING.md @@ -0,0 +1,562 @@ +# Contributing to Tokio + +:balloon: Thanks for your help improving the project! We are so happy to have +you! + +There are opportunities to contribute to Tokio at any level. It doesn't matter if +you are just getting started with Rust or are the most weathered expert, we can +use your help. + +**No contribution is too small and all contributions are valued.** + +This guide will help you get started. **Do not let this guide intimidate you**. +It should be considered a map to help you navigate the process. + +The [dev channel][dev] is available for any concerns not covered in this guide, please join +us! + +[dev]: https://discord.gg/tokio + +## Conduct + +The Tokio project adheres to the [Rust Code of Conduct][coc]. This describes +the _minimum_ behavior expected from all contributors. Instances of violations of the +Code of Conduct can be reported by contacting the project team at +[moderation@tokio.rs](mailto:moderation@tokio.rs). + +[coc]: https://github.com/rust-lang/rust/blob/master/CODE_OF_CONDUCT.md + +## Contributing in Issues + +For any issue, there are fundamentally three ways an individual can contribute: + +1. By opening the issue for discussion: For instance, if you believe that you + have discovered a bug in Tokio, creating a new issue in [the tokio-rs/tokio + issue tracker][issue] is the way to report it. + +2. By helping to triage the issue: This can be done by providing + supporting details (a test case that demonstrates a bug), providing + suggestions on how to address the issue, or ensuring that the issue is tagged + correctly. + +3. By helping to resolve the issue: Typically this is done either in the form of + demonstrating that the issue reported is not a problem after all, or more + often, by opening a Pull Request that changes some bit of something in + Tokio in a concrete and reviewable manner. + +[issue]: https://github.com/tokio-rs/tokio/issues + +**Anybody can participate in any stage of contribution**. We urge you to +participate in the discussion around bugs and participate in reviewing PRs. + +### Asking for General Help + +If you have reviewed existing documentation and still have questions or are +having problems, you can [open a discussion] asking for help. + +In exchange for receiving help, we ask that you contribute back a documentation +PR that helps others avoid the problems that you encountered. + +[open a discussion]: https://github.com/tokio-rs/tokio/discussions/new + +### Submitting a Bug Report + +When opening a new issue in the Tokio issue tracker, you will be presented +with a basic template that should be filled in. If you believe that you have +uncovered a bug, please fill out this form, following the template to the best +of your ability. Do not worry if you cannot answer every detail, just fill in +what you can. + +The two most important pieces of information we need in order to properly +evaluate the report is a description of the behavior you are seeing and a simple +test case we can use to recreate the problem on our own. If we cannot recreate +the issue, it becomes impossible for us to fix. + +In order to rule out the possibility of bugs introduced by userland code, test +cases should be limited, as much as possible, to using only Tokio APIs. + +See [How to create a Minimal, Complete, and Verifiable example][mcve]. + +[mcve]: https://stackoverflow.com/help/mcve + +### Triaging a Bug Report + +Once an issue has been opened, it is not uncommon for there to be discussion +around it. Some contributors may have differing opinions about the issue, +including whether the behavior being seen is a bug or a feature. This discussion +is part of the process and should be kept focused, helpful, and professional. + +Short, clipped responses—that provide neither additional context nor supporting +detail—are not helpful or professional. To many, such responses are simply +annoying and unfriendly. + +Contributors are encouraged to help one another make forward progress as much as +possible, empowering one another to solve issues collaboratively. If you choose +to comment on an issue that you feel either is not a problem that needs to be +fixed, or if you encounter information in an issue that you feel is incorrect, +explain why you feel that way with additional supporting context, and be willing +to be convinced that you may be wrong. By doing so, we can often reach the +correct outcome much faster. + +### Resolving a Bug Report + +In the majority of cases, issues are resolved by opening a Pull Request. The +process for opening and reviewing a Pull Request is similar to that of opening +and triaging issues, but carries with it a necessary review and approval +workflow that ensures that the proposed changes meet the minimal quality and +functional guidelines of the Tokio project. + +## Pull Requests + +Pull Requests are the way concrete changes are made to the code, documentation, +and dependencies in the Tokio repository. + +Even tiny pull requests (e.g., one character pull request fixing a typo in API +documentation) are greatly appreciated. Before making a large change, it is +usually a good idea to first open an issue describing the change to solicit +feedback and guidance. This will increase the likelihood of the PR getting +merged. + +### Cargo Commands + +Due to the extensive use of features in Tokio, you will often need to add extra +arguments to many common cargo commands. This section lists some commonly needed +commands. + +Some commands just need the `--all-features` argument: +``` +cargo build --all-features +cargo check --all-features +cargo test --all-features +``` +When building documentation normally, the markers that list the features +required for various parts of Tokio are missing. To build the documentation +correctly, use this command: +``` +RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --all-features +``` +The `cargo fmt` command does not work on the Tokio codebase. You can use the +command below instead: + +``` +# Mac or Linux +rustfmt --check --edition 2018 $(find . -name '*.rs' -print) + +# Powershell +Get-ChildItem . -Filter "*.rs" -Recurse | foreach { rustfmt --check --edition 2018 $_.FullName } +``` +The `--check` argument prints the things that need to be fixed. If you remove +it, `rustfmt` will update your files locally instead. + +You can run loom tests with +``` +cd tokio # tokio crate in workspace +LOOM_MAX_PREEMPTIONS=1 RUSTFLAGS="--cfg loom" \ + cargo test --lib --release --features full -- --test-threads=1 --nocapture +``` + +### Tests + +If the change being proposed alters code (as opposed to only documentation for +example), it is either adding new functionality to Tokio or it is fixing +existing, broken functionality. In both of these cases, the pull request should +include one or more tests to ensure that Tokio does not regress in the future. +There are two ways to write tests: integration tests and documentation tests +(Tokio avoids unit tests as much as possible). + +#### Integration tests + +Integration tests go in the same crate as the code they are testing. Each sub +crate should have a `dev-dependency` on `tokio` itself. This makes all Tokio +utilities available to use in tests, no matter the crate being tested. + +The best strategy for writing a new integration test is to look at existing +integration tests in the crate and follow the style. + +#### Documentation tests + +Ideally, every API has at least one [documentation test] that demonstrates how to +use the API. Documentation tests are run with `cargo test --doc`. This ensures +that the example is correct and provides additional test coverage. + +The trick to documentation tests is striking a balance between being succinct +for a reader to understand and actually testing the API. + +Same as with integration tests, when writing a documentation test, the full +`tokio` crate is available. This is especially useful for getting access to the +runtime to run the example. + +The documentation tests will be visible from both the crate specific +documentation **and** the `tokio` facade documentation via the re-export. The +example should be written from the point of view of a user that is using the +`tokio` crate. As such, the example should use the API via the facade and not by +directly referencing the crate. + +The type level example for `tokio_timer::Timeout` provides a good example of a +documentation test: + +``` +/// // import the `timeout` function, usually this is done +/// // with `use tokio::prelude::*` +/// use tokio::prelude::FutureExt; +/// use futures::Stream; +/// use futures::sync::mpsc; +/// use std::time::Duration; +/// +/// # fn main() { +/// let (tx, rx) = mpsc::unbounded(); +/// # tx.unbounded_send(()).unwrap(); +/// # drop(tx); +/// +/// let process = rx.for_each(|item| { +/// // do something with `item` +/// # drop(item); +/// # Ok(()) +/// }); +/// +/// # tokio::runtime::current_thread::block_on_all( +/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. +/// process.timeout(Duration::from_millis(10)) +/// # ).unwrap(); +/// # } +``` + +Given that this is a *type* level documentation test and the primary way users +of `tokio` will create an instance of `Timeout` is by using +`FutureExt::timeout`, this is how the documentation test is structured. + +Lines that start with `/// #` are removed when the documentation is generated. +They are only there to get the test to run. The `block_on_all` function is the +easiest way to execute a future from a test. + +If this were a documentation test for the `Timeout::new` function, then the +example would explicitly use `Timeout::new`. For example: + +``` +/// use tokio::timer::Timeout; +/// use futures::Future; +/// use futures::sync::oneshot; +/// use std::time::Duration; +/// +/// # fn main() { +/// let (tx, rx) = oneshot::channel(); +/// # tx.send(()).unwrap(); +/// +/// # tokio::runtime::current_thread::block_on_all( +/// // Wrap the future with a `Timeout` set to expire in 10 milliseconds. +/// Timeout::new(rx, Duration::from_millis(10)) +/// # ).unwrap(); +/// # } +``` + +### Commits + +It is a recommended best practice to keep your changes as logically grouped as +possible within individual commits. There is no limit to the number of commits +any single Pull Request may have, and many contributors find it easier to review +changes that are split across multiple commits. + +That said, if you have a number of commits that are "checkpoints" and don't +represent a single logical change, please squash those together. + +Note that multiple commits often get squashed when they are landed (see the +notes about [commit squashing](#commit-squashing)). + +#### Commit message guidelines + +A good commit message should describe what changed and why. + +1. The first line should: + + * contain a short description of the change (preferably 50 characters or less, + and no more than 72 characters) + * be entirely in lowercase with the exception of proper nouns, acronyms, and + the words that refer to code, like function/variable names + * be prefixed with the name of the sub crate being changed (without the `tokio-` + prefix) and start with an imperative verb. If modifying `tokio` proper, + omit the crate prefix. + + Examples: + + * timer: introduce `Timeout` and deprecate `Deadline` + * export `Encoder`, `Decoder`, `Framed*` from tokio_codec + +2. Keep the second line blank. +3. Wrap all other lines at 72 columns (except for long URLs). +4. If your patch fixes an open issue, you can add a reference to it at the end + of the log. Use the `Fixes: #` prefix and the issue number. For other + references use `Refs: #`. `Refs` may include multiple issues, separated by a + comma. + + Examples: + + - `Fixes: #1337` + - `Refs: #1234` + +Sample complete commit message: + +```txt +subcrate: explain the commit in one line + +Body of commit message is a few lines of text, explaining things +in more detail, possibly giving some background about the issue +being fixed, etc. + +The body of the commit message can be several paragraphs, and +please do proper word-wrap and keep columns shorter than about +72 characters or so. That way, `git log` will show things +nicely even when it is indented. + +Fixes: #1337 +Refs: #453, #154 +``` + +### Opening the Pull Request + +From within GitHub, opening a new Pull Request will present you with a +[template] that should be filled out. Please try to do your best at filling out +the details, but feel free to skip parts if you're not sure what to put. + +[template]: .github/PULL_REQUEST_TEMPLATE.md + +### Discuss and update + +You will probably get feedback or requests for changes to your Pull Request. +This is a big part of the submission process so don't be discouraged! Some +contributors may sign off on the Pull Request right away, others may have +more detailed comments or feedback. This is a necessary part of the process +in order to evaluate whether the changes are correct and necessary. + +**Any community member can review a PR and you might get conflicting feedback**. +Keep an eye out for comments from code owners to provide guidance on conflicting +feedback. + +**Once the PR is open, do not rebase the commits**. See [Commit Squashing](#commit-squashing) for +more details. + +### Commit Squashing + +In most cases, **do not squash commits that you add to your Pull Request during +the review process**. When the commits in your Pull Request land, they may be +squashed into one commit per logical change. Metadata will be added to the +commit message (including links to the Pull Request, links to relevant issues, +and the names of the reviewers). The commit history of your Pull Request, +however, will stay intact on the Pull Request page. + +## Reviewing Pull Requests + +**Any Tokio community member is welcome to review any pull request**. + +All Tokio contributors who choose to review and provide feedback on Pull +Requests have a responsibility to both the project and the individual making the +contribution. Reviews and feedback must be helpful, insightful, and geared +towards improving the contribution as opposed to simply blocking it. If there +are reasons why you feel the PR should not land, explain what those are. Do not +expect to be able to block a Pull Request from advancing simply because you say +"No" without giving an explanation. Be open to having your mind changed. Be open +to working with the contributor to make the Pull Request better. + +Reviews that are dismissive or disrespectful of the contributor or any other +reviewers are strictly counter to the Code of Conduct. + +When reviewing a Pull Request, the primary goals are for the codebase to improve +and for the person submitting the request to succeed. **Even if a Pull Request +does not land, the submitters should come away from the experience feeling like +their effort was not wasted or unappreciated**. Every Pull Request from a new +contributor is an opportunity to grow the community. + +### Review a bit at a time. + +Do not overwhelm new contributors. + +It is tempting to micro-optimize and make everything about relative performance, +perfect grammar, or exact style matches. Do not succumb to that temptation. + +Focus first on the most significant aspects of the change: + +1. Does this change make sense for Tokio? +2. Does this change make Tokio better, even if only incrementally? +3. Are there clear bugs or larger scale issues that need attending to? +4. Is the commit message readable and correct? If it contains a breaking change + is it clear enough? + +Note that only **incremental** improvement is needed to land a PR. This means +that the PR does not need to be perfect, only better than the status quo. Follow +up PRs may be opened to continue iterating. + +When changes are necessary, *request* them, do not *demand* them, and **do not +assume that the submitter already knows how to add a test or run a benchmark**. + +Specific performance optimization techniques, coding styles and conventions +change over time. The first impression you give to a new contributor never does. + +Nits (requests for small changes that are not essential) are fine, but try to +avoid stalling the Pull Request. Most nits can typically be fixed by the Tokio +Collaborator landing the Pull Request but they can also be an opportunity for +the contributor to learn a bit more about the project. + +It is always good to clearly indicate nits when you comment: e.g. +`Nit: change foo() to bar(). But this is not blocking.` + +If your comments were addressed but were not folded automatically after new +commits or if they proved to be mistaken, please, [hide them][hiding-a-comment] +with the appropriate reason to keep the conversation flow concise and relevant. + +### Be aware of the person behind the code + +Be aware that *how* you communicate requests and reviews in your feedback can +have a significant impact on the success of the Pull Request. Yes, we may land +a particular change that makes Tokio better, but the individual might just not +want to have anything to do with Tokio ever again. The goal is not just having +good code. + +### Abandoned or Stalled Pull Requests + +If a Pull Request appears to be abandoned or stalled, it is polite to first +check with the contributor to see if they intend to continue the work before +checking if they would mind if you took it over (especially if it just has nits +left). When doing so, it is courteous to give the original contributor credit +for the work they started (either by preserving their name and email address in +the commit log, or by using an `Author: ` meta-data tag in the commit. + +_Adapted from the [Node.js contributing guide][node]_. + +[node]: https://github.com/nodejs/node/blob/master/CONTRIBUTING.md +[hiding-a-comment]: https://help.github.com/articles/managing-disruptive-comments/#hiding-a-comment +[documentation test]: https://doc.rust-lang.org/rustdoc/documentation-tests.html + +## Keeping track of issues and PRs + +The Tokio GitHub repository has a lot of issues and PRs, which is not easy to +keep track of. This section explains the meaning of various labels, as well as +our [GitHub project][project]. The section is primarily targeted at maintainers. + +**Area.** The area label describes the crates relevant to this issue or PR. + + - **A-tokio** This issue concerns the main Tokio crate. + - **A-tokio-util** This issue concerns the `tokio-util` crate. + - **A-tokio-tls** This issue concerns the `tokio-tls` crate. Only used for + older issues, as the crate has been moved to another repository. + - **A-tokio-test** The issue concerns the `tokio-test` crate. + - **A-tokio-macros** This issue concerns the `tokio-macros` crate. Should only + be used for the procedural macros, and not `join!` or `select!`. + - **A-ci** This issue concerns our GitHub Actions setup. + +**Category.** The category label describes the category. + + - **C-bug** This is a bug-report. Bug-fix PRs use `C-enhancement` instead. + - **C-enhancement** This is a PR that adds a new features. + - **C-maintenance** This is an issue or PR about stuff such as documentation, + GitHub Actions or code quality. + - **C-feature-request** This is a feature request. Implementations of feature + requests use `C-enhancement` instead. + - **C-feature-accepted** If you submit a PR for this feature request, we wont + close it with the reason "we don't want this". Issues with this label should + also have the `C-feature-request` label. + - **C-musing** Stuff like tracking issues or roadmaps. "musings about a better + world" + - **C-proposal** A proposal of some kind, and a request for comments. + - **C-question** A user question. Large overlap with GitHub discussions. + - **C-request** A non-feature request, e.g. "please add deprecation notices to + `-alpha.*` versions of crates" + +**Call for participation.** I don't know why it's called `E-`. Many issues are +missing a difficulty rating, and you should feel free to add one. + + - **E-help-wanted** Stuff where we want help. Often seen together with `C-bug` + or `C-feature-accepted`. + - **E-easy** This is easy, ranging from quick documentation fixes to stuff you + can do after reading the tutorial on our website. + - **E-medium** This is not `E-easy` or `E-hard`. + - **E-hard** This either involves very tricky code, is something we don't know + how to solve, or is difficult for some other reason. + - **E-needs-mvce** This bug is missing a minimal complete and verifiable + example. + +**Module.** A more fine groaned categorization than area. + + - **M-blocking** Things relevant to `spawn_blocking`, `block_in_place`. + - **M-codec** The `tokio_util::codec` module. + - **M-compat** The `tokio_util::compat` module. + - **M-coop** Things relevant to coop. + - **M-fs** The `tokio::fs` module. + - **M-io** The `tokio::io` module. + - **M-macros** Issues about any kind of macro. + - **M-net** The `tokio::net` module. + - **M-process** The `tokio::process` module. + - **M-runtime** The `tokio::runtime` module. + - **M-signal** The `tokio::signal` module. + - **M-sync** The `tokio::sync` module. + - **M-task** The `tokio::task` module. + - **M-time** The `tokio::time` module. + - **M-tracing** Tracing support in Tokio. + +**Topic.** Some extra information. + + - **T-docs** This is about documentation. + - **T-performance** This is about performance. + - **T-v0.1.x** This is about old Tokio. + +Any label not listed here is not in active use. + +[project]: https://github.com/orgs/tokio-rs/projects/1 + +## Releasing + +Since the Tokio project consists of a number of crates, many of which depend on +each other, releasing new versions to crates.io can involve some complexities. +When releasing a new version of a crate, follow these steps: + +1. **Ensure that the release crate has no path dependencies.** When the HEAD + version of a Tokio crate requires unreleased changes in another Tokio crate, + the crates.io dependency on the second crate will be replaced with a path + dependency. Crates with path dependencies cannot be published, so before + publishing the dependent crate, any path dependencies must also be published. + This should be done through a form of depth-first tree traversal: + + 1. Starting with the first path dependency in the crate to be released, + inspect the `Cargo.toml` for the dependency. If the dependency has any + path dependencies of its own, repeat this step with the first such + dependency. + 2. Begin the release process for the path dependency. + 3. Once the path dependency has been published to crates.io, update the + dependent crate to depend on the crates.io version. + 4. When all path dependencies have been published, the dependent crate may + be published. + + To verify that a crate is ready to publish, run: + + ```bash + bin/publish --dry-run + ``` + +2. **Update Cargo metadata.** After releasing any path dependencies, update the + `version` field in `Cargo.toml` to the new version, and the `documentation` + field to the docs.rs URL of the new version. +3. **Update other documentation links.** Update the `#![doc(html_root_url)]` + attribute in the crate's `lib.rs` and the "Documentation" link in the crate's + `README.md` to point to the docs.rs URL of the new version. +4. **Update the changelog for the crate.** Each crate in the Tokio repository + has its own `CHANGELOG.md` in that crate's subdirectory. Any changes to that + crate since the last release should be added to the changelog. Change + descriptions may be taken from the Git history, but should be edited to + ensure a consistent format, based on [Keep A Changelog][keep-a-changelog]. + Other entries in that crate's changelog may also be used for reference. +5. **Perform a final audit for breaking changes.** Compare the HEAD version of + crate with the Git tag for the most recent release version. If there are any + breaking API changes, determine if those changes can be made without breaking + existing APIs. If so, resolve those issues. Otherwise, if it is necessary to + make a breaking release, update the version numbers to reflect this. +6. **Open a pull request with your changes.** Once that pull request has been + approved by a maintainer and the pull request has been merged, continue to + the next step. +7. **Release the crate.** Run the following command: + + ```bash + bin/publish + ``` + + Your editor and prompt you to edit a message for the tag. Copy the changelog + entry for that release version into your editor and close the window. + +[keep-a-changelog]: https://github.com/olivierlacan/keep-a-changelog/blob/master/CHANGELOG.md diff --git a/tokio-stream/tests/async_send_sync.rs b/tokio-stream/tests/async_send_sync.rs new file mode 100644 index 00000000000..c06bebd22e4 --- /dev/null +++ b/tokio-stream/tests/async_send_sync.rs @@ -0,0 +1,105 @@ +use std::rc::Rc; + +#[allow(dead_code)] +type BoxStream = std::pin::Pin>>; + +#[allow(dead_code)] +fn require_send(_t: &T) {} +#[allow(dead_code)] +fn require_sync(_t: &T) {} +#[allow(dead_code)] +fn require_unpin(_t: &T) {} + +#[allow(dead_code)] +struct Invalid; + +trait AmbiguousIfSend { + fn some_item(&self) {} +} +impl AmbiguousIfSend<()> for T {} +impl AmbiguousIfSend for T {} + +trait AmbiguousIfSync { + fn some_item(&self) {} +} +impl AmbiguousIfSync<()> for T {} +impl AmbiguousIfSync for T {} + +trait AmbiguousIfUnpin { + fn some_item(&self) {} +} +impl AmbiguousIfUnpin<()> for T {} +impl AmbiguousIfUnpin for T {} + +macro_rules! into_todo { + ($typ:ty) => {{ + let x: $typ = todo!(); + x + }}; +} + +macro_rules! async_assert_fn { + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_send(&f); + require_sync(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_send(&f); + AmbiguousIfSync::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfSend::some_item(&f); + require_sync(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfSend::some_item(&f); + AmbiguousIfSync::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + AmbiguousIfUnpin::some_item(&f); + }; + }; + ($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => { + #[allow(unreachable_code)] + #[allow(unused_variables)] + const _: fn() = || { + let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* ); + require_unpin(&f); + }; + }; +} + +async_assert_fn!(tokio_stream::empty>(): Send & Sync); +async_assert_fn!(tokio_stream::pending>(): Send & Sync); +async_assert_fn!(tokio_stream::iter(std::vec::IntoIter): Send & Sync); + +async_assert_fn!(tokio_stream::StreamExt::next(&mut BoxStream<()>): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::try_next(&mut BoxStream>): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin); +async_assert_fn!(tokio_stream::StreamExt::collect>(&mut BoxStream<()>): !Unpin); diff --git a/tokio/tests/stream_chain.rs b/tokio-stream/tests/stream_chain.rs similarity index 89% rename from tokio/tests/stream_chain.rs rename to tokio-stream/tests/stream_chain.rs index 98461a8ccb3..759de3068aa 100644 --- a/tokio/tests/stream_chain.rs +++ b/tokio-stream/tests/stream_chain.rs @@ -1,7 +1,12 @@ -use tokio::stream::{self, Stream, StreamExt}; -use tokio::sync::mpsc; +use tokio_stream::{self as stream, Stream, StreamExt}; use tokio_test::{assert_pending, assert_ready, task}; +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + #[tokio::test] async fn basic_usage() { let one = stream::iter(vec![1, 2, 3]); @@ -36,8 +41,8 @@ async fn basic_usage() { #[tokio::test] async fn pending_first() { - let (tx1, rx1) = mpsc::unbounded_channel(); - let (tx2, rx2) = mpsc::unbounded_channel(); + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); let mut stream = task::spawn(rx1.chain(rx2)); assert_eq!(stream.size_hint(), (0, None)); @@ -74,7 +79,7 @@ async fn pending_first() { fn size_overflow() { struct Monster; - impl tokio::stream::Stream for Monster { + impl tokio_stream::Stream for Monster { type Item = (); fn poll_next( self: std::pin::Pin<&mut Self>, diff --git a/tokio/tests/stream_collect.rs b/tokio-stream/tests/stream_collect.rs similarity index 89% rename from tokio/tests/stream_collect.rs rename to tokio-stream/tests/stream_collect.rs index 7ab1a34ed84..07659a1fc3d 100644 --- a/tokio/tests/stream_collect.rs +++ b/tokio-stream/tests/stream_collect.rs @@ -1,7 +1,12 @@ -use tokio::stream::{self, StreamExt}; -use tokio::sync::mpsc; +use tokio_stream::{self as stream, StreamExt}; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task}; +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + #[allow(clippy::let_unit_value)] #[tokio::test] async fn empty_unit() { @@ -37,7 +42,7 @@ async fn empty_result() { #[tokio::test] async fn collect_vec_items() { - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel_stream(); let mut fut = task::spawn(rx.collect::>()); assert_pending!(fut.poll()); @@ -58,7 +63,8 @@ async fn collect_vec_items() { #[tokio::test] async fn collect_string_items() { - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel_stream(); + let mut fut = task::spawn(rx.collect::()); assert_pending!(fut.poll()); @@ -79,7 +85,8 @@ async fn collect_string_items() { #[tokio::test] async fn collect_str_items() { - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel_stream(); + let mut fut = task::spawn(rx.collect::()); assert_pending!(fut.poll()); @@ -100,7 +107,8 @@ async fn collect_str_items() { #[tokio::test] async fn collect_results_ok() { - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel_stream(); + let mut fut = task::spawn(rx.collect::>()); assert_pending!(fut.poll()); @@ -121,7 +129,8 @@ async fn collect_results_ok() { #[tokio::test] async fn collect_results_err() { - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel_stream(); + let mut fut = task::spawn(rx.collect::>()); assert_pending!(fut.poll()); diff --git a/tokio/tests/stream_empty.rs b/tokio-stream/tests/stream_empty.rs similarity index 79% rename from tokio/tests/stream_empty.rs rename to tokio-stream/tests/stream_empty.rs index f278076d1ae..c06f5c41c01 100644 --- a/tokio/tests/stream_empty.rs +++ b/tokio-stream/tests/stream_empty.rs @@ -1,4 +1,4 @@ -use tokio::stream::{self, Stream, StreamExt}; +use tokio_stream::{self as stream, Stream, StreamExt}; #[tokio::test] async fn basic_usage() { diff --git a/tokio/tests/stream_fuse.rs b/tokio-stream/tests/stream_fuse.rs similarity index 96% rename from tokio/tests/stream_fuse.rs rename to tokio-stream/tests/stream_fuse.rs index 9d7d969f8ba..9b6cf054cfd 100644 --- a/tokio/tests/stream_fuse.rs +++ b/tokio-stream/tests/stream_fuse.rs @@ -1,4 +1,4 @@ -use tokio::stream::{Stream, StreamExt}; +use tokio_stream::{Stream, StreamExt}; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tokio/tests/stream_iter.rs b/tokio-stream/tests/stream_iter.rs similarity index 91% rename from tokio/tests/stream_iter.rs rename to tokio-stream/tests/stream_iter.rs index 45148a7a8b2..8b9ee3ce5b7 100644 --- a/tokio/tests/stream_iter.rs +++ b/tokio-stream/tests/stream_iter.rs @@ -1,4 +1,4 @@ -use tokio::stream; +use tokio_stream as stream; use tokio_test::task; use std::iter; diff --git a/tokio/tests/stream_merge.rs b/tokio-stream/tests/stream_merge.rs similarity index 86% rename from tokio/tests/stream_merge.rs rename to tokio-stream/tests/stream_merge.rs index 45ecdcb6625..69cd568a6bb 100644 --- a/tokio/tests/stream_merge.rs +++ b/tokio-stream/tests/stream_merge.rs @@ -1,8 +1,13 @@ -use tokio::stream::{self, Stream, StreamExt}; -use tokio::sync::mpsc; +use tokio_stream::{self as stream, Stream, StreamExt}; use tokio_test::task; use tokio_test::{assert_pending, assert_ready}; +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + #[tokio::test] async fn merge_sync_streams() { let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5])); @@ -18,8 +23,8 @@ async fn merge_sync_streams() { #[tokio::test] async fn merge_async_streams() { - let (tx1, rx1) = mpsc::unbounded_channel(); - let (tx2, rx2) = mpsc::unbounded_channel(); + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); let mut rx = task::spawn(rx1.merge(rx2)); @@ -57,7 +62,7 @@ async fn merge_async_streams() { fn size_overflow() { struct Monster; - impl tokio::stream::Stream for Monster { + impl tokio_stream::Stream for Monster { type Item = (); fn poll_next( self: std::pin::Pin<&mut Self>, diff --git a/tokio/tests/stream_once.rs b/tokio-stream/tests/stream_once.rs similarity index 82% rename from tokio/tests/stream_once.rs rename to tokio-stream/tests/stream_once.rs index bb4635ac9ef..f32bad3a120 100644 --- a/tokio/tests/stream_once.rs +++ b/tokio-stream/tests/stream_once.rs @@ -1,4 +1,4 @@ -use tokio::stream::{self, Stream, StreamExt}; +use tokio_stream::{self as stream, Stream, StreamExt}; #[tokio::test] async fn basic_usage() { diff --git a/tokio/tests/stream_pending.rs b/tokio-stream/tests/stream_pending.rs similarity index 85% rename from tokio/tests/stream_pending.rs rename to tokio-stream/tests/stream_pending.rs index f4d3080de82..87b5d03bda2 100644 --- a/tokio/tests/stream_pending.rs +++ b/tokio-stream/tests/stream_pending.rs @@ -1,4 +1,4 @@ -use tokio::stream::{self, Stream, StreamExt}; +use tokio_stream::{self as stream, Stream, StreamExt}; use tokio_test::{assert_pending, task}; #[tokio::test] diff --git a/tokio/tests/stream_stream_map.rs b/tokio-stream/tests/stream_stream_map.rs similarity index 93% rename from tokio/tests/stream_stream_map.rs rename to tokio-stream/tests/stream_stream_map.rs index 38bb0c5d0ae..53f3d86c768 100644 --- a/tokio/tests/stream_stream_map.rs +++ b/tokio-stream/tests/stream_stream_map.rs @@ -1,7 +1,12 @@ -use tokio::stream::{self, pending, Stream, StreamExt, StreamMap}; -use tokio::sync::mpsc; +use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap}; use tokio_test::{assert_ok, assert_pending, assert_ready, task}; +mod support { + pub(crate) mod mpsc; +} + +use support::mpsc; + use std::pin::Pin; macro_rules! assert_ready_some { @@ -38,7 +43,8 @@ async fn empty() { #[tokio::test] async fn single_entry() { let mut map = task::spawn(StreamMap::new()); - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel_stream(); + let rx = Box::pin(rx); assert_ready_none!(map.poll_next()); @@ -76,8 +82,11 @@ async fn single_entry() { #[tokio::test] async fn multiple_entries() { let mut map = task::spawn(StreamMap::new()); - let (tx1, rx1) = mpsc::unbounded_channel(); - let (tx2, rx2) = mpsc::unbounded_channel(); + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let rx1 = Box::pin(rx1); + let rx2 = Box::pin(rx2); map.insert("foo", rx1); map.insert("bar", rx2); @@ -132,7 +141,9 @@ async fn multiple_entries() { #[tokio::test] async fn insert_remove() { let mut map = task::spawn(StreamMap::new()); - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel_stream(); + + let rx = Box::pin(rx); assert_ready_none!(map.poll_next()); @@ -160,8 +171,11 @@ async fn insert_remove() { #[tokio::test] async fn replace() { let mut map = task::spawn(StreamMap::new()); - let (tx1, rx1) = mpsc::unbounded_channel(); - let (tx2, rx2) = mpsc::unbounded_channel(); + let (tx1, rx1) = mpsc::unbounded_channel_stream(); + let (tx2, rx2) = mpsc::unbounded_channel_stream(); + + let rx1 = Box::pin(rx1); + let rx2 = Box::pin(rx2); assert!(map.insert("foo", rx1).is_none()); diff --git a/tokio/tests/stream_timeout.rs b/tokio-stream/tests/stream_timeout.rs similarity index 98% rename from tokio/tests/stream_timeout.rs rename to tokio-stream/tests/stream_timeout.rs index 216b5f75d1d..5697ace6923 100644 --- a/tokio/tests/stream_timeout.rs +++ b/tokio-stream/tests/stream_timeout.rs @@ -1,7 +1,7 @@ #![cfg(feature = "full")] -use tokio::stream::{self, StreamExt}; use tokio::time::{self, sleep, Duration}; +use tokio_stream::{self, StreamExt}; use tokio_test::*; use futures::StreamExt as _; diff --git a/tokio-stream/tests/support/mpsc.rs b/tokio-stream/tests/support/mpsc.rs new file mode 100644 index 00000000000..09dbe04215e --- /dev/null +++ b/tokio-stream/tests/support/mpsc.rs @@ -0,0 +1,15 @@ +use async_stream::stream; +use tokio::sync::mpsc::{self, UnboundedSender}; +use tokio_stream::Stream; + +pub fn unbounded_channel_stream() -> (UnboundedSender, impl Stream) { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let stream = stream! { + while let Some(item) = rx.recv().await { + yield item; + } + }; + + (tx, stream) +} diff --git a/tokio/tests/time_throttle.rs b/tokio-stream/tests/time_throttle.rs similarity index 95% rename from tokio/tests/time_throttle.rs rename to tokio-stream/tests/time_throttle.rs index c886319f6d0..42a643bfa5d 100644 --- a/tokio/tests/time_throttle.rs +++ b/tokio-stream/tests/time_throttle.rs @@ -1,8 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::stream::StreamExt; use tokio::time; +use tokio_stream::StreamExt; use tokio_test::*; use std::time::Duration; diff --git a/tokio-test/Cargo.toml b/tokio-test/Cargo.toml index 79553c87159..43b5556b464 100644 --- a/tokio-test/Cargo.toml +++ b/tokio-test/Cargo.toml @@ -22,6 +22,7 @@ publish = false [dependencies] tokio = { version = "1.0.0", path = "../tokio", features = ["rt", "stream", "sync", "time", "test-util"] } +tokio-stream = { version = "0.1", path = "../tokio-stream" } bytes = "0.6.0" futures-core = "0.3.0" diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index b68929a84a3..5e73603bd58 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -1,4 +1,5 @@ #![cfg(not(loom))] +#![allow(dead_code)] //! A mock type implementing [`AsyncRead`] and [`AsyncWrite`]. //! @@ -24,6 +25,7 @@ use tokio::time::{self, Duration, Instant, Sleep}; use futures_core::ready; use std::collections::VecDeque; +use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -63,13 +65,13 @@ enum Action { WriteError(Option>), } -#[derive(Debug)] struct Inner { actions: VecDeque, waiting: Option, sleep: Option, read_wait: Option, rx: mpsc::UnboundedReceiver, + rx_fut: Option> + Send>>>, } impl Builder { @@ -191,6 +193,7 @@ impl Inner { sleep: None, read_wait: None, rx, + rx_fut: None, waiting: None, }; @@ -200,9 +203,11 @@ impl Inner { } fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll> { - use futures_core::stream::Stream; - - Pin::new(&mut self.rx).poll_next(cx) + if let Some(fut) = &mut self.rx_fut { + Pin::new(&mut *fut).poll(cx) + } else { + Poll::Ready(None) + } } fn read(&mut self, dst: &mut ReadBuf<'_>) -> io::Result<()> { @@ -485,3 +490,9 @@ fn is_task_ctx() -> bool { r } */ + +impl fmt::Debug for Inner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Inner {{...}}") + } +} diff --git a/tokio-test/src/task.rs b/tokio-test/src/task.rs index 6a73fd759e7..fa98bae0b01 100644 --- a/tokio-test/src/task.rs +++ b/tokio-test/src/task.rs @@ -9,7 +9,7 @@ use std::pin::Pin; use std::sync::{Arc, Condvar, Mutex}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -use tokio::stream::Stream; +use tokio_stream::Stream; /// TODO: dox pub fn spawn(task: T) -> Spawn { diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index f06bc0bae14..3bc7a064380 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -36,6 +36,7 @@ rt = ["tokio/rt"] [dependencies] tokio = { version = "1.0.0", path = "../tokio" } +tokio-stream = { version = "0.1", path = "../tokio-stream" } bytes = "0.6.0" futures-core = "0.3.0" diff --git a/tokio-util/src/codec/decoder.rs b/tokio-util/src/codec/decoder.rs index 84d27fbf14e..3d19332a221 100644 --- a/tokio-util/src/codec/decoder.rs +++ b/tokio-util/src/codec/decoder.rs @@ -153,7 +153,7 @@ pub trait Decoder { /// calling `split` on the [`Framed`] returned by this method, which will /// break them into separate objects, allowing them to interact more easily. /// - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream /// [`Sink`]: futures_sink::Sink /// [`Framed`]: crate::codec::Framed fn framed(self, io: T) -> Framed diff --git a/tokio-util/src/codec/framed.rs b/tokio-util/src/codec/framed.rs index 36370da2694..adfe06380e0 100644 --- a/tokio-util/src/codec/framed.rs +++ b/tokio-util/src/codec/framed.rs @@ -2,10 +2,8 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; use crate::codec::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame}; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - stream::Stream, -}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_stream::Stream; use bytes::BytesMut; use futures_sink::Sink; @@ -22,7 +20,7 @@ pin_project! { /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or /// by using the `new` function seen below. /// - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream /// [`Sink`]: futures_sink::Sink /// [`AsyncRead`]: tokio::io::AsyncRead /// [`Decoder::framed`]: crate::codec::Decoder::framed() @@ -54,7 +52,7 @@ where /// calling [`split`] on the `Framed` returned by this method, which will /// break them into separate objects, allowing them to interact more easily. /// - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream /// [`Sink`]: futures_sink::Sink /// [`Decode`]: crate::codec::Decoder /// [`Encoder`]: crate::codec::Encoder @@ -88,7 +86,7 @@ where /// calling [`split`] on the `Framed` returned by this method, which will /// break them into separate objects, allowing them to interact more easily. /// - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream /// [`Sink`]: futures_sink::Sink /// [`Decode`]: crate::codec::Decoder /// [`Encoder`]: crate::codec::Encoder @@ -133,7 +131,7 @@ impl Framed { /// calling [`split`] on the `Framed` returned by this method, which will /// break them into separate objects, allowing them to interact more easily. /// - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream /// [`Sink`]: futures_sink::Sink /// [`Decoder`]: crate::codec::Decoder /// [`Encoder`]: crate::codec::Encoder diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index 207e198d20a..69df6f2b15c 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -1,10 +1,8 @@ use crate::codec::decoder::Decoder; use crate::codec::encoder::Encoder; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - stream::Stream, -}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_stream::Stream; use bytes::BytesMut; use futures_core::ready; diff --git a/tokio-util/src/codec/framed_read.rs b/tokio-util/src/codec/framed_read.rs index 2077fbceec2..d95589f230f 100644 --- a/tokio-util/src/codec/framed_read.rs +++ b/tokio-util/src/codec/framed_read.rs @@ -1,7 +1,8 @@ use crate::codec::framed_impl::{FramedImpl, ReadFrame}; use crate::codec::Decoder; -use tokio::{io::AsyncRead, stream::Stream}; +use tokio::io::AsyncRead; +use tokio_stream::Stream; use bytes::BytesMut; use futures_sink::Sink; @@ -13,7 +14,7 @@ use std::task::{Context, Poll}; pin_project! { /// A [`Stream`] of messages decoded from an [`AsyncRead`]. /// - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream /// [`AsyncRead`]: tokio::io::AsyncRead pub struct FramedRead { #[pin] diff --git a/tokio-util/src/codec/framed_write.rs b/tokio-util/src/codec/framed_write.rs index 834eb6ed0f8..8bb9efe4109 100644 --- a/tokio-util/src/codec/framed_write.rs +++ b/tokio-util/src/codec/framed_write.rs @@ -1,7 +1,8 @@ use crate::codec::encoder::Encoder; use crate::codec::framed_impl::{FramedImpl, WriteFrame}; -use tokio::{io::AsyncWrite, stream::Stream}; +use tokio::io::AsyncWrite; +use tokio_stream::Stream; use futures_sink::Sink; use pin_project_lite::pin_project; diff --git a/tokio-util/src/codec/mod.rs b/tokio-util/src/codec/mod.rs index e89aa7c9ac8..12704eebfb0 100644 --- a/tokio-util/src/codec/mod.rs +++ b/tokio-util/src/codec/mod.rs @@ -11,7 +11,7 @@ //! //! [`AsyncRead`]: tokio::io::AsyncRead //! [`AsyncWrite`]: tokio::io::AsyncWrite -//! [`Stream`]: tokio::stream::Stream +//! [`Stream`]: tokio_stream::Stream //! [`Sink`]: futures_sink::Sink mod bytes_codec; diff --git a/tokio-util/src/either.rs b/tokio-util/src/either.rs index f5246af27b2..7f193de7a9b 100644 --- a/tokio-util/src/either.rs +++ b/tokio-util/src/either.rs @@ -167,10 +167,8 @@ where #[cfg(test)] mod tests { use super::*; - use tokio::{ - io::{repeat, AsyncReadExt, Repeat}, - stream::{once, Once, StreamExt}, - }; + use tokio::io::{repeat, AsyncReadExt, Repeat}; + use tokio_stream::{once, Once, StreamExt}; #[tokio::test] async fn either_is_stream() { diff --git a/tokio-util/src/io/read_buf.rs b/tokio-util/src/io/read_buf.rs index a5d46a7de24..d7938a3bc16 100644 --- a/tokio-util/src/io/read_buf.rs +++ b/tokio-util/src/io/read_buf.rs @@ -13,7 +13,7 @@ use tokio::io::AsyncRead; /// /// ``` /// use bytes::{Bytes, BytesMut}; -/// use tokio::stream; +/// use tokio_stream as stream; /// use tokio::io::Result; /// use tokio_util::io::{StreamReader, read_buf}; /// # #[tokio::main] diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index 3e6a05eff4d..7c25f3407e4 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -18,7 +18,7 @@ pin_project! { /// ``` /// # #[tokio::main] /// # async fn main() -> std::io::Result<()> { - /// use tokio::stream::StreamExt; + /// use tokio_stream::StreamExt; /// use tokio_util::io::ReaderStream; /// /// // Create a stream of data. @@ -40,7 +40,7 @@ pin_project! { /// /// [`AsyncRead`]: tokio::io::AsyncRead /// [`StreamReader`]: crate::io::StreamReader - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream #[derive(Debug)] pub struct ReaderStream { // Reader itself. @@ -58,7 +58,7 @@ impl ReaderStream { /// `Result`. /// /// [`AsyncRead`]: tokio::io::AsyncRead - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream pub fn new(reader: R) -> Self { ReaderStream { reader: Some(reader), diff --git a/tokio-util/src/io/stream_reader.rs b/tokio-util/src/io/stream_reader.rs index 99079c73d1a..12820de4af0 100644 --- a/tokio-util/src/io/stream_reader.rs +++ b/tokio-util/src/io/stream_reader.rs @@ -21,7 +21,7 @@ pin_project! { /// # async fn main() -> std::io::Result<()> { /// /// // Create a stream from an iterator. - /// let stream = tokio::stream::iter(vec![ + /// let stream = tokio_stream::iter(vec![ /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), /// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), @@ -51,7 +51,7 @@ pin_project! { /// ``` /// /// [`AsyncRead`]: tokio::io::AsyncRead - /// [`Stream`]: tokio::stream::Stream + /// [`Stream`]: tokio_stream::Stream /// [`ReaderStream`]: crate::io::ReaderStream #[derive(Debug)] pub struct StreamReader { diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 4ae9a13da70..999598c7a94 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -72,7 +72,7 @@ mod util { /// /// ``` /// use bytes::{Bytes, BytesMut}; - /// use tokio::stream; + /// use tokio_stream as stream; /// use tokio::io::Result; /// use tokio_util::io::{StreamReader, poll_read_buf}; /// use futures::future::poll_fn; diff --git a/tokio-util/src/udp/frame.rs b/tokio-util/src/udp/frame.rs index 249acb70607..b4ef3ca8c77 100644 --- a/tokio-util/src/udp/frame.rs +++ b/tokio-util/src/udp/frame.rs @@ -1,6 +1,7 @@ use crate::codec::{Decoder, Encoder}; -use tokio::{io::ReadBuf, net::UdpSocket, stream::Stream}; +use tokio::{io::ReadBuf, net::UdpSocket}; +use tokio_stream::Stream; use bytes::{BufMut, BytesMut}; use futures_core::ready; @@ -27,7 +28,7 @@ use std::{io, mem::MaybeUninit}; /// calling [`split`] on the `UdpFramed` returned by this method, which will break /// them into separate objects, allowing them to interact more easily. /// -/// [`Stream`]: tokio::stream::Stream +/// [`Stream`]: tokio_stream::Stream /// [`Sink`]: futures_sink::Sink /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split #[must_use = "sinks do nothing unless polled"] diff --git a/tokio-util/tests/framed.rs b/tokio-util/tests/framed.rs index 4c5f8418615..7e39e266a51 100644 --- a/tokio-util/tests/framed.rs +++ b/tokio-util/tests/framed.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] -use tokio::{prelude::*, stream::StreamExt}; +use tokio::prelude::*; +use tokio_stream::StreamExt; use tokio_test::assert_ok; use tokio_util::codec::{Decoder, Encoder, Framed, FramedParts}; diff --git a/tokio-util/tests/io_reader_stream.rs b/tokio-util/tests/io_reader_stream.rs index 91986c8e3b5..e30cd85164c 100644 --- a/tokio-util/tests/io_reader_stream.rs +++ b/tokio-util/tests/io_reader_stream.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, ReadBuf}; -use tokio::stream::StreamExt; +use tokio_stream::StreamExt; /// produces at most `remaining` zeros, that returns error. /// each time it reads at most 31 byte. diff --git a/tokio-util/tests/io_stream_reader.rs b/tokio-util/tests/io_stream_reader.rs index b0ed1d2d046..59759941c51 100644 --- a/tokio-util/tests/io_stream_reader.rs +++ b/tokio-util/tests/io_stream_reader.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use tokio::io::AsyncReadExt; -use tokio::stream::iter; +use tokio_stream::iter; use tokio_util::io::StreamReader; #[tokio::test] diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index 4820ac72d00..653d20deb51 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] -use tokio::{net::UdpSocket, stream::StreamExt}; +use tokio::net::UdpSocket; +use tokio_stream::StreamExt; use tokio_util::codec::{Decoder, Encoder, LinesCodec}; use tokio_util::udp::UdpFramed; diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 89b99f6dd39..1fb2a4713a4 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -119,9 +119,11 @@ optional = true [dev-dependencies] tokio-test = { version = "0.4.0", path = "../tokio-test" } +tokio-stream = { version = "0.1", path = "../tokio-stream" } futures = { version = "0.3.0", features = ["async-await"] } proptest = "0.10.0" tempfile = "3.1.0" +async-stream = "0.3" [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.3.5", features = ["futures", "checkpoint"] } diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 980cdf8c476..05b2ae83bdd 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -13,7 +13,7 @@ //! Consider a future like this one: //! //! ``` -//! # use tokio::stream::{Stream, StreamExt}; +//! # use tokio_stream::{Stream, StreamExt}; //! async fn drop_all(mut input: I) { //! while let Some(_) = input.next().await {} //! } @@ -25,7 +25,7 @@ //! opt-in yield points, this problem is alleviated: //! //! ```ignore -//! # use tokio::stream::{Stream, StreamExt}; +//! # use tokio_stream::{Stream, StreamExt}; //! async fn drop_all(mut input: I) { //! while let Some(_) = input.next().await { //! tokio::coop::proceed().await; diff --git a/tokio/src/fs/read_dir.rs b/tokio/src/fs/read_dir.rs index 8ca583bc2ff..8214ac75285 100644 --- a/tokio/src/fs/read_dir.rs +++ b/tokio/src/fs/read_dir.rs @@ -81,19 +81,6 @@ impl ReadDir { } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for ReadDir { - type Item = io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(self.poll_next_entry(cx)) { - Ok(Some(entry)) => Some(Ok(entry)), - Ok(None) => None, - Err(err) => Some(Err(err)), - }) - } -} - /// Entries returned by the [`ReadDir`] stream. /// /// [`ReadDir`]: struct@ReadDir diff --git a/tokio/src/io/util/async_buf_read_ext.rs b/tokio/src/io/util/async_buf_read_ext.rs index 9e87f2f0dcf..7977a0e8cc7 100644 --- a/tokio/src/io/util/async_buf_read_ext.rs +++ b/tokio/src/io/util/async_buf_read_ext.rs @@ -228,7 +228,6 @@ cfg_io_util! { /// /// ``` /// use tokio::io::AsyncBufReadExt; - /// use tokio::stream::StreamExt; /// /// use std::io::Cursor; /// @@ -236,12 +235,12 @@ cfg_io_util! { /// async fn main() { /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); /// - /// let mut lines = cursor.lines().map(|res| res.unwrap()); + /// let mut lines = cursor.lines(); /// - /// assert_eq!(lines.next().await, Some(String::from("lorem"))); - /// assert_eq!(lines.next().await, Some(String::from("ipsum"))); - /// assert_eq!(lines.next().await, Some(String::from("dolor"))); - /// assert_eq!(lines.next().await, None); + /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("lorem"))); + /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("ipsum"))); + /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("dolor"))); + /// assert_eq!(lines.next_line().await.unwrap(), None); /// } /// ``` /// diff --git a/tokio/src/io/util/lines.rs b/tokio/src/io/util/lines.rs index b41f04a807f..5ce249c0ed8 100644 --- a/tokio/src/io/util/lines.rs +++ b/tokio/src/io/util/lines.rs @@ -108,19 +108,6 @@ where } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for Lines { - type Item = io::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(self.poll_next_line(cx)) { - Ok(Some(line)) => Some(Ok(line)), - Ok(None) => None, - Err(err) => Some(Err(err)), - }) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/tokio/src/io/util/split.rs b/tokio/src/io/util/split.rs index 492e26a6305..75115aa08b9 100644 --- a/tokio/src/io/util/split.rs +++ b/tokio/src/io/util/split.rs @@ -89,19 +89,6 @@ where } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for Split { - type Item = io::Result>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(self.poll_next_segment(cx)) { - Ok(Some(segment)) => Some(Ok(segment)), - Ok(None) => None, - Err(err) => Some(Err(err)), - }) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 3ec654de338..9c6a9b15c49 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -378,10 +378,6 @@ cfg_signal_internal! { pub(crate) mod signal; } -cfg_stream! { - pub mod stream; -} - cfg_sync! { pub mod sync; } diff --git a/tokio/src/macros/pin.rs b/tokio/src/macros/pin.rs index ed844ef7d11..a32187e5a1d 100644 --- a/tokio/src/macros/pin.rs +++ b/tokio/src/macros/pin.rs @@ -71,7 +71,7 @@ /// /// ``` /// use tokio::{pin, select}; -/// use tokio::stream::{self, StreamExt}; +/// use tokio_stream::{self as stream, StreamExt}; /// /// async fn my_async_fn() { /// // async logic here diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs index b63abdd2df1..2131c8907b8 100644 --- a/tokio/src/macros/select.rs +++ b/tokio/src/macros/select.rs @@ -167,7 +167,7 @@ /// Basic stream selecting. /// /// ``` -/// use tokio::stream::{self, StreamExt}; +/// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { @@ -188,7 +188,7 @@ /// is complete, all calls to `next()` return `None`. /// /// ``` -/// use tokio::stream::{self, StreamExt}; +/// use tokio_stream::{self as stream, StreamExt}; /// /// #[tokio::main] /// async fn main() { @@ -220,7 +220,7 @@ /// Here, a stream is consumed for at most 1 second. /// /// ``` -/// use tokio::stream::{self, StreamExt}; +/// use tokio_stream::{self as stream, StreamExt}; /// use tokio::time::{self, Duration}; /// /// #[tokio::main] diff --git a/tokio/src/net/tcp/listener.rs b/tokio/src/net/tcp/listener.rs index 8b0a48036b2..7aa10f84c0f 100644 --- a/tokio/src/net/tcp/listener.rs +++ b/tokio/src/net/tcp/listener.rs @@ -47,24 +47,6 @@ cfg_net! { /// } /// } /// ``` - /// - /// Using `impl Stream`: - /// ```no_run - /// use tokio::{net::TcpListener, stream::StreamExt}; - /// - /// #[tokio::main] - /// async fn main() { - /// let mut listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); - /// while let Some(stream) = listener.next().await { - /// match stream { - /// Ok(stream) => { - /// println!("new client!"); - /// } - /// Err(e) => { /* connection failed */ } - /// } - /// } - /// } - /// ``` pub struct TcpListener { io: PollEvented, } @@ -323,16 +305,6 @@ impl TcpListener { } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for TcpListener { - type Item = io::Result; - - fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let (socket, _) = ready!(self.poll_accept(cx))?; - Poll::Ready(Some(Ok(socket))) - } -} - impl TryFrom for TcpListener { type Error = io::Error; diff --git a/tokio/src/net/unix/listener.rs b/tokio/src/net/unix/listener.rs index b1da0e3c511..c8468331b42 100644 --- a/tokio/src/net/unix/listener.rs +++ b/tokio/src/net/unix/listener.rs @@ -29,14 +29,13 @@ cfg_net_unix! { /// /// ```no_run /// use tokio::net::UnixListener; - /// use tokio::stream::StreamExt; /// /// #[tokio::main] /// async fn main() { - /// let mut listener = UnixListener::bind("/path/to/the/socket").unwrap(); - /// while let Some(stream) = listener.next().await { - /// match stream { - /// Ok(stream) => { + /// let listener = UnixListener::bind("/path/to/the/socket").unwrap(); + /// loop { + /// match listener.accept().await { + /// Ok((stream, _addr)) => { /// println!("new client!"); /// } /// Err(e) => { /* connection failed */ } @@ -127,16 +126,6 @@ impl UnixListener { } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for UnixListener { - type Item = io::Result; - - fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let (socket, _) = ready!(self.poll_accept(cx))?; - Poll::Ready(Some(Ok(socket))) - } -} - impl TryFrom for UnixListener { type Error = io::Error; diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index aaaa75edb74..fc0f16d4eb0 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -407,16 +407,6 @@ impl Signal { } } -cfg_stream! { - impl crate::stream::Stream for Signal { - type Item = (); - - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_recv(cx) - } - } -} - // Work around for abstracting streams internally pub(crate) trait InternalStream: Unpin { fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll>; diff --git a/tokio/src/signal/windows.rs b/tokio/src/signal/windows.rs index 7f2e4862ef9..9650461d0d6 100644 --- a/tokio/src/signal/windows.rs +++ b/tokio/src/signal/windows.rs @@ -351,7 +351,7 @@ pub fn ctrl_break() -> io::Result { mod tests { use super::*; use crate::runtime::Runtime; - use crate::stream::StreamExt; + use crate::StreamExt; use tokio_test::{assert_ok, assert_pending, assert_ready_ok, task}; diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index ee9aba07ac4..c98304e822d 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -949,7 +949,7 @@ impl Receiver { /// # Examples /// /// ``` - /// use tokio::stream::StreamExt; + /// use tokio_stream::StreamExt; /// use tokio::sync::broadcast; /// /// #[tokio::main] diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 06b371731cd..2a0eb5baaeb 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -250,16 +250,6 @@ impl fmt::Debug for Receiver { impl Unpin for Receiver {} -cfg_stream! { - impl crate::stream::Stream for Receiver { - type Item = T; - - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.chan.recv(cx) - } - } -} - impl Sender { pub(crate) fn new(chan: chan::Tx) -> Sender { Sender { chan } diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index fe882d5b5c0..80f665b8195 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -146,15 +146,6 @@ impl UnboundedReceiver { } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for UnboundedReceiver { - type Item = T; - - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_recv(cx) - } -} - impl UnboundedSender { pub(crate) fn new(chan: chan::Tx) -> UnboundedSender { UnboundedSender { chan } diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index c7c58e1797c..0ab6678065b 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -162,12 +162,3 @@ impl Interval { poll_fn(|cx| self.poll_tick(cx)).await } } - -#[cfg(feature = "stream")] -impl crate::stream::Stream for Interval { - type Item = Instant; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Some(ready!(self.poll_tick(cx)))) - } -} diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index 2ee38570025..c345d2acbd0 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -14,8 +14,7 @@ type BoxFutureSync = std::pin::Pin + type BoxFutureSend = std::pin::Pin + Send>>; #[allow(dead_code)] type BoxFuture = std::pin::Pin>>; -#[allow(dead_code)] -type BoxStream = std::pin::Pin>>; + #[allow(dead_code)] type BoxAsyncRead = std::pin::Pin>; #[allow(dead_code)] @@ -222,10 +221,6 @@ async_assert_fn!(tokio::signal::ctrl_c(): Send & Sync); #[cfg(unix)] async_assert_fn!(tokio::signal::unix::Signal::recv(_): Send & Sync); -async_assert_fn!(tokio::stream::empty>(): Send & Sync); -async_assert_fn!(tokio::stream::pending>(): Send & Sync); -async_assert_fn!(tokio::stream::iter(std::vec::IntoIter): Send & Sync); - async_assert_fn!(tokio::sync::Barrier::wait(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex::lock(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock(_): Send & Sync); @@ -285,13 +280,6 @@ async_assert_fn!(tokio::time::timeout_at(Instant, BoxFutureSend<()>): Send & !Sy async_assert_fn!(tokio::time::timeout_at(Instant, BoxFuture<()>): !Send & !Sync); async_assert_fn!(tokio::time::Interval::tick(_): Send & Sync); -async_assert_fn!(tokio::stream::StreamExt::next(&mut BoxStream<()>): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::try_next(&mut BoxStream>): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin); -async_assert_fn!(tokio::stream::StreamExt::collect>(&mut BoxStream<()>): !Unpin); - async_assert_fn!(tokio::io::AsyncBufReadExt::read_until(&mut BoxAsyncRead, u8, &mut Vec): !Unpin); async_assert_fn!(tokio::io::AsyncBufReadExt::read_line(&mut BoxAsyncRead, &mut String): !Unpin); async_assert_fn!(tokio::io::AsyncReadExt::read(&mut BoxAsyncRead, &mut [u8]): !Unpin); diff --git a/tokio/tests/fs_dir.rs b/tokio/tests/fs_dir.rs index 6355ef05fcb..21efe8c0eeb 100644 --- a/tokio/tests/fs_dir.rs +++ b/tokio/tests/fs_dir.rs @@ -85,35 +85,3 @@ async fn read_inherent() { vec!["aa".to_string(), "bb".to_string(), "cc".to_string()] ); } - -#[tokio::test] -async fn read_stream() { - use tokio::stream::StreamExt; - - let base_dir = tempdir().unwrap(); - - let p = base_dir.path(); - std::fs::create_dir(p.join("aa")).unwrap(); - std::fs::create_dir(p.join("bb")).unwrap(); - std::fs::create_dir(p.join("cc")).unwrap(); - - let files = Arc::new(Mutex::new(Vec::new())); - - let f = files.clone(); - let p = p.to_path_buf(); - - let mut entries = fs::read_dir(p).await.unwrap(); - - while let Some(res) = entries.next().await { - let e = assert_ok!(res); - let s = e.file_name().to_str().unwrap().to_string(); - f.lock().unwrap().push(s); - } - - let mut files = files.lock().unwrap(); - files.sort(); // because the order is not guaranteed - assert_eq!( - *files, - vec!["aa".to_string(), "bb".to_string(), "cc".to_string()] - ); -} diff --git a/tokio/tests/io_lines.rs b/tokio/tests/io_lines.rs index 2f6b3393b99..9996d81ca74 100644 --- a/tokio/tests/io_lines.rs +++ b/tokio/tests/io_lines.rs @@ -17,19 +17,3 @@ async fn lines_inherent() { assert_eq!(b, ""); assert!(assert_ok!(st.next_line().await).is_none()); } - -#[tokio::test] -async fn lines_stream() { - use tokio::stream::StreamExt; - - let rd: &[u8] = b"hello\r\nworld\n\n"; - let mut st = rd.lines(); - - let b = assert_ok!(st.next().await.unwrap()); - assert_eq!(b, "hello"); - let b = assert_ok!(st.next().await.unwrap()); - assert_eq!(b, "world"); - let b = assert_ok!(st.next().await.unwrap()); - assert_eq!(b, ""); - assert!(st.next().await.is_none()); -} diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 7b5b622b63f..977a838c208 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -2,12 +2,16 @@ #![cfg(feature = "full")] use tokio::runtime::Runtime; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; use std::thread; use std::time::Duration; +mod support { + pub(crate) mod mpsc_stream; +} + #[test] fn spawned_task_does_not_progress_without_block_on() { let (tx, mut rx) = oneshot::channel(); @@ -36,7 +40,7 @@ fn no_extra_poll() { Arc, }; use std::task::{Context, Poll}; - use tokio::stream::{Stream, StreamExt}; + use tokio_stream::{Stream, StreamExt}; pin_project! { struct TrackPolls { @@ -58,8 +62,8 @@ fn no_extra_poll() { } } - let (tx, rx) = mpsc::unbounded_channel(); - let mut rx = TrackPolls { + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<()>(); + let rx = TrackPolls { npolls: Arc::new(AtomicUsize::new(0)), s: rx, }; @@ -67,6 +71,9 @@ fn no_extra_poll() { let rt = rt(); + // TODO: could probably avoid this, but why not. + let mut rx = Box::pin(rx); + rt.spawn(async move { while rx.next().await.is_some() {} }); rt.block_on(async { tokio::task::yield_now().await; diff --git a/tokio/tests/support/mpsc_stream.rs b/tokio/tests/support/mpsc_stream.rs new file mode 100644 index 00000000000..3df541ff75c --- /dev/null +++ b/tokio/tests/support/mpsc_stream.rs @@ -0,0 +1,29 @@ +#![allow(dead_code)] + +use async_stream::stream; +use tokio::sync::mpsc::{self, Sender, UnboundedSender}; +use tokio_stream::Stream; + +pub fn unbounded_channel_stream() -> (UnboundedSender, impl Stream) { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let stream = stream! { + while let Some(item) = rx.recv().await { + yield item; + } + }; + + (tx, stream) +} + +pub fn channel_stream(size: usize) -> (Sender, impl Stream) { + let (tx, mut rx) = mpsc::channel(size); + + let stream = stream! { + while let Some(item) = rx.recv().await { + yield item; + } + }; + + (tx, stream) +} diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 84c77a7760f..5f79800a77c 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -89,46 +89,6 @@ fn send_two_recv() { assert_empty!(rx2); } -#[tokio::test] -async fn send_recv_into_stream_ready() { - use tokio::stream::StreamExt; - - let (tx, rx) = broadcast::channel::(8); - tokio::pin! { - let rx = rx.into_stream(); - } - - assert_ok!(tx.send(1)); - assert_ok!(tx.send(2)); - - assert_eq!(Some(Ok(1)), rx.next().await); - assert_eq!(Some(Ok(2)), rx.next().await); - - drop(tx); - - assert_eq!(None, rx.next().await); -} - -#[tokio::test] -async fn send_recv_into_stream_pending() { - use tokio::stream::StreamExt; - - let (tx, rx) = broadcast::channel::(8); - - tokio::pin! { - let rx = rx.into_stream(); - } - - let mut recv = task::spawn(rx.next()); - assert_pending!(recv.poll()); - - assert_ok!(tx.send(1)); - - assert!(recv.is_woken()); - let val = assert_ready!(recv.poll()); - assert_eq!(val, Some(Ok(1))); -} - #[test] fn send_recv_bounded() { let (tx, mut rx) = broadcast::channel(16); diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index ddbbf9ea0a0..8798e6f097c 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -13,6 +13,10 @@ use tokio_test::{ use std::sync::Arc; +mod support { + pub(crate) mod mpsc_stream; +} + trait AssertSend: Send {} impl AssertSend for mpsc::Sender {} impl AssertSend for mpsc::Receiver {} @@ -80,9 +84,10 @@ async fn reserve_disarm() { #[tokio::test] async fn send_recv_stream_with_buffer() { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; - let (tx, mut rx) = mpsc::channel::(16); + let (tx, rx) = support::mpsc_stream::channel_stream::(16); + let mut rx = Box::pin(rx); tokio::spawn(async move { assert_ok!(tx.send(1).await); @@ -178,9 +183,11 @@ async fn async_send_recv_unbounded() { #[tokio::test] async fn send_recv_stream_unbounded() { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::(); + + let mut rx = Box::pin(rx); tokio::spawn(async move { assert_ok!(tx.send(1)); diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index eec19cc16d8..82bef8a1d58 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -7,6 +7,10 @@ use tokio_test::assert_ok; use std::thread; use std::time::Duration; +mod support { + pub(crate) mod mpsc_stream; +} + #[tokio::test] async fn basic_blocking() { // Run a few times @@ -165,7 +169,8 @@ fn coop_disabled_in_block_in_place() { .build() .unwrap(); - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream(); + for i in 0..200 { tx.send(i).unwrap(); } @@ -175,7 +180,7 @@ fn coop_disabled_in_block_in_place() { let jh = tokio::spawn(async move { tokio::task::block_in_place(move || { futures::executor::block_on(async move { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); }) }) @@ -195,7 +200,8 @@ fn coop_disabled_in_block_in_place_in_block_on() { thread::spawn(move || { let outer = tokio::runtime::Runtime::new().unwrap(); - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + let (tx, rx) = support::mpsc_stream::unbounded_channel_stream(); + for i in 0..200 { tx.send(i).unwrap(); } @@ -204,7 +210,7 @@ fn coop_disabled_in_block_in_place_in_block_on() { outer.block_on(async move { tokio::task::block_in_place(move || { futures::executor::block_on(async move { - use tokio::stream::StreamExt; + use tokio_stream::StreamExt; assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); }) }) diff --git a/tokio/tests/tcp_accept.rs b/tokio/tests/tcp_accept.rs index 4c0d6822dd5..5ffb946f345 100644 --- a/tokio/tests/tcp_accept.rs +++ b/tokio/tests/tcp_accept.rs @@ -46,7 +46,7 @@ use std::sync::{ Arc, }; use std::task::{Context, Poll}; -use tokio::stream::{Stream, StreamExt}; +use tokio_stream::{Stream, StreamExt}; struct TrackPolls<'a> { npolls: Arc, @@ -88,7 +88,7 @@ async fn no_extra_poll() { assert_eq!(npolls.load(SeqCst), 1); let _ = assert_ok!(TcpStream::connect(&addr).await); - accepted_rx.next().await.unwrap(); + accepted_rx.recv().await.unwrap(); // should have been polled twice more: once to yield Some(), then once to yield Pending assert_eq!(npolls.load(SeqCst), 1 + 2); diff --git a/tokio/tests/time_interval.rs b/tokio/tests/time_interval.rs index a0787157677..a3c7f0874c6 100644 --- a/tokio/tests/time_interval.rs +++ b/tokio/tests/time_interval.rs @@ -44,21 +44,6 @@ async fn usage() { assert_pending!(poll_next(&mut i)); } -#[tokio::test] -async fn usage_stream() { - use tokio::stream::StreamExt; - - let start = Instant::now(); - let interval = time::interval(ms(10)); - tokio::pin!(interval); - - for _ in 0..3 { - interval.next().await.unwrap(); - } - - assert!(start.elapsed() > ms(20)); -} - fn poll_next(interval: &mut task::Spawn) -> Poll { interval.enter(|cx, mut interval| { tokio::pin! {