From 25cab70f862060a499b6cd0481259a85dc9b4b2d Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 27 Feb 2021 00:58:23 +0900 Subject: [PATCH] Revert import changes in tests After #2216, these redundant imports are unneeded. This reverts almost all of #2101. This also includes some minor cleanup of imports in tests. --- futures/tests/abortable.rs | 18 +- futures/tests/arc_wake.rs | 98 ++--- futures/tests/async_await_macros.rs | 125 +----- futures/tests/atomic_waker.rs | 18 +- futures/tests/buffer_unordered.rs | 14 +- futures/tests/eager_drop.rs | 179 ++++---- futures/tests/future_obj.rs | 4 +- futures/tests/future_try_flatten_stream.rs | 29 +- futures/tests/futures_ordered.rs | 55 ++- futures/tests/futures_unordered.rs | 68 +-- futures/tests/inspect.rs | 10 +- futures/tests/io_buf_reader.rs | 206 +++++---- futures/tests/io_buf_writer.rs | 148 ++++--- futures/tests/io_cursor.rs | 63 ++- futures/tests/io_lines.rs | 60 ++- futures/tests/io_read.rs | 55 +-- futures/tests/io_read_exact.rs | 8 +- futures/tests/io_read_line.rs | 39 +- futures/tests/io_read_to_end.rs | 7 +- futures/tests/io_read_to_string.rs | 20 +- futures/tests/io_read_until.rs | 39 +- futures/tests/io_write.rs | 69 ++-- futures/tests/join_all.rs | 39 +- futures/tests/macro_comma_support.rs | 32 +- futures/tests/mutex.rs | 33 +- futures/tests/oneshot.rs | 49 +-- futures/tests/ready_queue.rs | 55 +-- futures/tests/recurse.rs | 14 +- futures/tests/select_all.rs | 14 +- futures/tests/select_ok.rs | 22 +- futures/tests/shared.rs | 70 +--- futures/tests/sink.rs | 460 ++++++++------------- futures/tests/sink_fanout.rs | 12 +- futures/tests/split.rs | 43 +- futures/tests/stream.rs | 66 ++- futures/tests/stream_catch_unwind.rs | 9 +- futures/tests/stream_into_async_read.rs | 96 ++--- futures/tests/stream_peekable.rs | 8 +- futures/tests/stream_select_all.rs | 19 +- futures/tests/stream_select_next_some.rs | 26 +- futures/tests/try_join.rs | 3 +- futures/tests/try_join_all.rs | 53 ++- futures/tests/unfold.rs | 5 +- 43 files changed, 968 insertions(+), 1492 deletions(-) diff --git a/futures/tests/abortable.rs b/futures/tests/abortable.rs index 6b5a25c365..5925c9a27b 100644 --- a/futures/tests/abortable.rs +++ b/futures/tests/abortable.rs @@ -1,9 +1,11 @@ +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future::{abortable, Aborted, FutureExt}; +use futures::task::{Context, Poll}; +use futures_test::task::new_count_waker; + #[test] fn abortable_works() { - use futures::channel::oneshot; - use futures::future::{abortable, Aborted}; - use futures::executor::block_on; - let (_tx, a_rx) = oneshot::channel::<()>(); let (abortable_rx, abort_handle) = abortable(a_rx); @@ -13,11 +15,6 @@ fn abortable_works() { #[test] fn abortable_awakens() { - use futures::channel::oneshot; - use futures::future::{abortable, Aborted, FutureExt}; - use futures::task::{Context, Poll}; - use futures_test::task::new_count_waker; - let (_tx, a_rx) = oneshot::channel::<()>(); let (mut abortable_rx, abort_handle) = abortable(a_rx); @@ -33,9 +30,6 @@ fn abortable_awakens() { #[test] fn abortable_resolves() { - use futures::channel::oneshot; - use futures::future::abortable; - use futures::executor::block_on; let (tx, a_rx) = oneshot::channel::<()>(); let (abortable_rx, _abort_handle) = abortable(a_rx); diff --git a/futures/tests/arc_wake.rs b/futures/tests/arc_wake.rs index d19a83dfac..3c5724a239 100644 --- a/futures/tests/arc_wake.rs +++ b/futures/tests/arc_wake.rs @@ -1,69 +1,65 @@ -mod countingwaker { - use futures::task::{self, ArcWake, Waker}; - use std::sync::{Arc, Mutex}; +use futures::task::{self, ArcWake, Waker}; +use std::panic; +use std::sync::{Arc, Mutex}; - struct CountingWaker { - nr_wake: Mutex, - } +struct CountingWaker { + nr_wake: Mutex, +} - impl CountingWaker { - fn new() -> Self { - Self { - nr_wake: Mutex::new(0), - } +impl CountingWaker { + fn new() -> Self { + Self { + nr_wake: Mutex::new(0), } + } - fn wakes(&self) -> i32 { - *self.nr_wake.lock().unwrap() - } + fn wakes(&self) -> i32 { + *self.nr_wake.lock().unwrap() } +} - impl ArcWake for CountingWaker { - fn wake_by_ref(arc_self: &Arc) { - let mut lock = arc_self.nr_wake.lock().unwrap(); - *lock += 1; - } +impl ArcWake for CountingWaker { + fn wake_by_ref(arc_self: &Arc) { + let mut lock = arc_self.nr_wake.lock().unwrap(); + *lock += 1; } +} - #[test] - fn create_from_arc() { - let some_w = Arc::new(CountingWaker::new()); +#[test] +fn create_from_arc() { + let some_w = Arc::new(CountingWaker::new()); - let w1: Waker = task::waker(some_w.clone()); - assert_eq!(2, Arc::strong_count(&some_w)); - w1.wake_by_ref(); - assert_eq!(1, some_w.wakes()); + let w1: Waker = task::waker(some_w.clone()); + assert_eq!(2, Arc::strong_count(&some_w)); + w1.wake_by_ref(); + assert_eq!(1, some_w.wakes()); - let w2 = w1.clone(); - assert_eq!(3, Arc::strong_count(&some_w)); + let w2 = w1.clone(); + assert_eq!(3, Arc::strong_count(&some_w)); - w2.wake_by_ref(); - assert_eq!(2, some_w.wakes()); + w2.wake_by_ref(); + assert_eq!(2, some_w.wakes()); - drop(w2); - assert_eq!(2, Arc::strong_count(&some_w)); - drop(w1); - assert_eq!(1, Arc::strong_count(&some_w)); - } + drop(w2); + assert_eq!(2, Arc::strong_count(&some_w)); + drop(w1); + assert_eq!(1, Arc::strong_count(&some_w)); +} - #[test] - fn ref_wake_same() { - let some_w = Arc::new(CountingWaker::new()); +#[test] +fn ref_wake_same() { + let some_w = Arc::new(CountingWaker::new()); - let w1: Waker = task::waker(some_w.clone()); - let w2 = task::waker_ref(&some_w); - let w3 = w2.clone(); + let w1: Waker = task::waker(some_w.clone()); + let w2 = task::waker_ref(&some_w); + let w3 = w2.clone(); - assert!(w1.will_wake(&w2)); - assert!(w2.will_wake(&w3)); - } + assert!(w1.will_wake(&w2)); + assert!(w2.will_wake(&w3)); } #[test] fn proper_refcount_on_wake_panic() { - use futures::task::{self, ArcWake, Waker}; - use std::sync::Arc; - struct PanicWaker; impl ArcWake for PanicWaker { @@ -75,7 +71,13 @@ fn proper_refcount_on_wake_panic() { let some_w = Arc::new(PanicWaker); let w1: Waker = task::waker(some_w.clone()); - assert_eq!("WAKE UP", *std::panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap()); + assert_eq!( + "WAKE UP", + *panic::catch_unwind(|| w1.wake_by_ref()) + .unwrap_err() + .downcast::<&str>() + .unwrap() + ); assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1 drop(w1); assert_eq!(1, Arc::strong_count(&some_w)); // some_w diff --git a/futures/tests/async_await_macros.rs b/futures/tests/async_await_macros.rs index bd586d6e52..3e461b50eb 100644 --- a/futures/tests/async_await_macros.rs +++ b/futures/tests/async_await_macros.rs @@ -1,9 +1,14 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, FutureExt}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use futures::task::{Context, Poll}; +use futures::{join, pending, pin_mut, poll, select, select_biased, try_join}; +use std::mem; + #[test] fn poll_and_pending() { - use futures::{pending, pin_mut, poll}; - use futures::executor::block_on; - use futures::task::Poll; - let pending_once = async { pending!() }; block_on(async { pin_mut!(pending_once); @@ -14,11 +19,6 @@ fn poll_and_pending() { #[test] fn join() { - use futures::{pin_mut, poll, join}; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::task::Poll; - let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = oneshot::channel::(); @@ -39,11 +39,6 @@ fn join() { #[test] fn select() { - use futures::select; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx1, rx1) = oneshot::channel::(); let (_tx2, rx2) = oneshot::channel::(); tx1.send(1).unwrap(); @@ -62,11 +57,6 @@ fn select() { #[test] fn select_biased() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::select_biased; - let (tx1, rx1) = oneshot::channel::(); let (_tx2, rx2) = oneshot::channel::(); tx1.send(1).unwrap(); @@ -85,12 +75,6 @@ fn select_biased() { #[test] fn select_streams() { - use futures::select; - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - let (mut tx1, rx1) = mpsc::channel::(1); let (mut tx2, rx2) = mpsc::channel::(1); let mut rx1 = rx1.fuse(); @@ -134,11 +118,6 @@ fn select_streams() { #[test] fn select_can_move_uncompleted_futures() { - use futures::select; - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = oneshot::channel::(); tx1.send(1).unwrap(); @@ -165,10 +144,6 @@ fn select_can_move_uncompleted_futures() { #[test] fn select_nested() { - use futures::select; - use futures::executor::block_on; - use futures::future; - let mut outer_fut = future::ready(1); let mut inner_fut = future::ready(2); let res = block_on(async { @@ -185,16 +160,13 @@ fn select_nested() { #[test] fn select_size() { - use futures::select; - use futures::future; - let fut = async { let mut ready = future::ready(0i32); select! { _ = ready => {}, } }; - assert_eq!(::std::mem::size_of_val(&fut), 24); + assert_eq!(mem::size_of_val(&fut), 24); let fut = async { let mut ready1 = future::ready(0i32); @@ -204,19 +176,13 @@ fn select_size() { _ = ready2 => {}, } }; - assert_eq!(::std::mem::size_of_val(&fut), 40); + assert_eq!(mem::size_of_val(&fut), 40); } #[test] fn select_on_non_unpin_expressions() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let res = block_on(async { let select_res; @@ -231,14 +197,8 @@ fn select_on_non_unpin_expressions() { #[test] fn select_on_non_unpin_expressions_with_default() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let res = block_on(async { let select_res; @@ -254,13 +214,8 @@ fn select_on_non_unpin_expressions_with_default() { #[test] fn select_on_non_unpin_size() { - use futures::select; - use futures::future::FutureExt; - // The returned Future is !Unpin - let make_non_unpin_fut = || { async { - 5 - }}; + let make_non_unpin_fut = || async { 5 }; let fut = async { let select_res; @@ -271,15 +226,11 @@ fn select_on_non_unpin_size() { select_res }; - assert_eq!(32, std::mem::size_of_val(&fut)); + assert_eq!(32, mem::size_of_val(&fut)); } #[test] fn select_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future; - block_on(async { let res = select! { x = future::ready(7) => x, @@ -291,11 +242,6 @@ fn select_can_be_used_as_expression() { #[test] fn select_with_default_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future::{FutureExt, poll_fn}; - use futures::task::{Context, Poll}; - fn poll_always_pending(_cx: &mut Context<'_>) -> Poll { Poll::Pending } @@ -312,10 +258,6 @@ fn select_with_default_can_be_used_as_expression() { #[test] fn select_with_complete_can_be_used_as_expression() { - use futures::select; - use futures::executor::block_on; - use futures::future; - block_on(async { let res = select! { x = future::pending::() => x, @@ -330,10 +272,6 @@ fn select_with_complete_can_be_used_as_expression() { #[test] #[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - async fn require_mutable(_: &mut i32) {} async fn async_noop() {} @@ -351,10 +289,6 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block() { #[test] #[allow(unused_assignments)] fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { - use futures::select; - use futures::executor::block_on; - use futures::future::FutureExt; - async fn require_mutable(_: &mut i32) {} async fn async_noop() {} @@ -374,59 +308,42 @@ fn select_on_mutable_borrowing_future_with_same_borrow_in_block_and_default() { #[test] fn join_size() { - use futures::join; - use futures::future; - let fut = async { let ready = future::ready(0i32); join!(ready) }; - assert_eq!(::std::mem::size_of_val(&fut), 16); + assert_eq!(mem::size_of_val(&fut), 16); let fut = async { let ready1 = future::ready(0i32); let ready2 = future::ready(0i32); join!(ready1, ready2) }; - assert_eq!(::std::mem::size_of_val(&fut), 28); + assert_eq!(mem::size_of_val(&fut), 28); } #[test] fn try_join_size() { - use futures::try_join; - use futures::future; - let fut = async { let ready = future::ready(Ok::(0)); try_join!(ready) }; - assert_eq!(::std::mem::size_of_val(&fut), 16); + assert_eq!(mem::size_of_val(&fut), 16); let fut = async { let ready1 = future::ready(Ok::(0)); let ready2 = future::ready(Ok::(0)); try_join!(ready1, ready2) }; - assert_eq!(::std::mem::size_of_val(&fut), 28); + assert_eq!(mem::size_of_val(&fut), 28); } #[test] fn join_doesnt_require_unpin() { - use futures::join; - - let _ = async { - join!(async {}, async {}) - }; + let _ = async { join!(async {}, async {}) }; } #[test] fn try_join_doesnt_require_unpin() { - use futures::try_join; - - let _ = async { - try_join!( - async { Ok::<(), ()>(()) }, - async { Ok::<(), ()>(()) }, - ) - }; + let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) }; } diff --git a/futures/tests/atomic_waker.rs b/futures/tests/atomic_waker.rs index bf15d0f0c7..299270e63d 100644 --- a/futures/tests/atomic_waker.rs +++ b/futures/tests/atomic_waker.rs @@ -1,14 +1,14 @@ -#[test] -fn basic() { - use std::sync::atomic::AtomicUsize; - use std::sync::atomic::Ordering; - use std::sync::Arc; - use std::thread; - use futures::executor::block_on; - use futures::future::poll_fn; - use futures::task::{AtomicWaker, Poll}; +use futures::executor::block_on; +use futures::future::poll_fn; +use futures::task::{AtomicWaker, Poll}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::thread; +#[test] +fn basic() { let atomic_waker = Arc::new(AtomicWaker::new()); let atomic_waker_copy = atomic_waker.clone(); diff --git a/futures/tests/buffer_unordered.rs b/futures/tests/buffer_unordered.rs index 5c8b8bf7e5..9a2ee174ed 100644 --- a/futures/tests/buffer_unordered.rs +++ b/futures/tests/buffer_unordered.rs @@ -1,13 +1,13 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::{block_on, block_on_stream}; +use futures::sink::SinkExt; +use futures::stream::StreamExt; +use std::sync::mpsc as std_mpsc; +use std::thread; + #[test] #[ignore] // FIXME: https://github.com/rust-lang/futures-rs/issues/1790 fn works() { - use futures::channel::{oneshot, mpsc}; - use futures::executor::{block_on, block_on_stream}; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - use std::sync::mpsc as std_mpsc; - use std::thread; - const N: usize = 4; let (mut tx, rx) = mpsc::channel(1); diff --git a/futures/tests/eager_drop.rs b/futures/tests/eager_drop.rs index 11edb1b6de..45cf191d20 100644 --- a/futures/tests/eager_drop.rs +++ b/futures/tests/eager_drop.rs @@ -1,16 +1,23 @@ +use futures::channel::oneshot; +use futures::future::{self, Future, FutureExt, TryFutureExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use pin_project::pin_project; +use std::pin::Pin; +use std::sync::mpsc; + #[test] fn map_ok() { - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - // The closure given to `map_ok` should have been dropped by the time `map` // runs. let (tx1, rx1) = mpsc::channel::<()>(); let (tx2, rx2) = mpsc::channel::<()>(); future::ready::>(Err(1)) - .map_ok(move |_| { let _tx1 = tx1; panic!("should not run"); }) + .map_ok(move |_| { + let _tx1 = tx1; + panic!("should not run"); + }) .map(move |_| { assert!(rx1.recv().is_err()); tx2.send(()).unwrap() @@ -22,17 +29,16 @@ fn map_ok() { #[test] fn map_err() { - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - // The closure given to `map_err` should have been dropped by the time `map` // runs. let (tx1, rx1) = mpsc::channel::<()>(); let (tx2, rx2) = mpsc::channel::<()>(); future::ready::>(Ok(1)) - .map_err(move |_| { let _tx1 = tx1; panic!("should not run"); }) + .map_err(move |_| { + let _tx1 = tx1; + panic!("should not run"); + }) .map(move |_| { assert!(rx1.recv().is_err()); tx2.send(()).unwrap() @@ -42,96 +48,83 @@ fn map_err() { rx2.recv().unwrap(); } -mod channelled { - use futures::future::Future; - use futures::task::{Context,Poll}; - use pin_project::pin_project; - use std::pin::Pin; - - #[pin_project] - struct FutureData { - _data: T, - #[pin] - future: F, - } +#[pin_project] +struct FutureData { + _data: T, + #[pin] + future: F, +} - impl Future for FutureData { - type Output = F::Output; +impl Future for FutureData { + type Output = F::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().future.poll(cx) - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().future.poll(cx) } +} + +#[test] +fn then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::<()>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); - #[test] - fn then_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, FutureExt, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::<()>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .then(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready(()) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(()).unwrap(); - rx2.recv().unwrap(); + FutureData { + _data: tx1, + future: rx0.unwrap_or_else(|_| panic!()), } + .then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(()) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(()).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn and_then_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); - #[test] - fn and_then_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .and_then(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready(Ok(())) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(Ok(())).unwrap(); - rx2.recv().unwrap(); + FutureData { + _data: tx1, + future: rx0.unwrap_or_else(|_| panic!()), } + .and_then(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Ok(())).unwrap(); + rx2.recv().unwrap(); +} + +#[test] +fn or_else_drops_eagerly() { + let (tx0, rx0) = oneshot::channel::>(); + let (tx1, rx1) = mpsc::channel::<()>(); + let (tx2, rx2) = mpsc::channel::<()>(); - #[test] - fn or_else_drops_eagerly() { - use futures::channel::oneshot; - use futures::future::{self, TryFutureExt}; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - - let (tx0, rx0) = oneshot::channel::>(); - let (tx1, rx1) = mpsc::channel::<()>(); - let (tx2, rx2) = mpsc::channel::<()>(); - - FutureData { _data: tx1, future: rx0.unwrap_or_else(|_| { panic!() }) } - .or_else(move |_| { - assert!(rx1.recv().is_err()); // tx1 should have been dropped - tx2.send(()).unwrap(); - future::ready::>(Ok(())) - }) - .run_in_background(); - - assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); - tx0.send(Err(())).unwrap(); - rx2.recv().unwrap(); + FutureData { + _data: tx1, + future: rx0.unwrap_or_else(|_| panic!()), } + .or_else(move |_| { + assert!(rx1.recv().is_err()); // tx1 should have been dropped + tx2.send(()).unwrap(); + future::ready::>(Ok(())) + }) + .run_in_background(); + + assert_eq!(Err(mpsc::TryRecvError::Empty), rx2.try_recv()); + tx0.send(Err(())).unwrap(); + rx2.recv().unwrap(); } diff --git a/futures/tests/future_obj.rs b/futures/tests/future_obj.rs index c6b18fc85c..0e5253464e 100644 --- a/futures/tests/future_obj.rs +++ b/futures/tests/future_obj.rs @@ -1,6 +1,6 @@ -use futures::future::{Future, FutureObj, FutureExt}; -use std::pin::Pin; +use futures::future::{Future, FutureExt, FutureObj}; use futures::task::{Context, Poll}; +use std::pin::Pin; #[test] fn dropping_does_not_segfault() { diff --git a/futures/tests/future_try_flatten_stream.rs b/futures/tests/future_try_flatten_stream.rs index 4a614f9c14..82ae1baf2c 100644 --- a/futures/tests/future_try_flatten_stream.rs +++ b/futures/tests/future_try_flatten_stream.rs @@ -1,9 +1,14 @@ +use futures::executor::block_on_stream; +use futures::future::{err, ok, TryFutureExt}; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::stream::{self, StreamExt}; +use futures::task::{Context, Poll}; +use std::marker::PhantomData; +use std::pin::Pin; + #[test] fn successful_future() { - use futures::executor::block_on_stream; - use futures::future::{ok, TryFutureExt}; - use futures::stream::{self, StreamExt}; - let stream_items = vec![17, 19]; let future_of_a_stream = ok::<_, bool>(stream::iter(stream_items).map(Ok)); @@ -17,15 +22,8 @@ fn successful_future() { #[test] fn failed_future() { - use core::marker::PhantomData; - use core::pin::Pin; - use futures::executor::block_on_stream; - use futures::future::{err, TryFutureExt}; - use futures::stream::Stream; - use futures::task::{Context, Poll}; - struct PanickingStream { - _marker: PhantomData<(T, E)> + _marker: PhantomData<(T, E)>, } impl Stream for PanickingStream { @@ -45,13 +43,6 @@ fn failed_future() { #[test] fn assert_impls() { - use core::marker::PhantomData; - use core::pin::Pin; - use futures::sink::Sink; - use futures::stream::Stream; - use futures::task::{Context, Poll}; - use futures::future::{ok, TryFutureExt}; - struct StreamSink(PhantomData<(T, E, Item)>); impl Stream for StreamSink { diff --git a/futures/tests/futures_ordered.rs b/futures/tests/futures_ordered.rs index 7f21c829e3..1cdb197da4 100644 --- a/futures/tests/futures_ordered.rs +++ b/futures/tests/futures_ordered.rs @@ -1,15 +1,19 @@ +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt, TryFutureExt}; +use futures::stream::{FuturesOrdered, StreamExt}; +use futures_test::task::noop_context; +use std::any::Any; + #[test] fn works_1() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); - let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::>(); + let mut stream = vec![a_rx, b_rx, c_rx] + .into_iter() + .collect::>(); b_tx.send(99).unwrap(); assert!(stream.poll_next_unpin(&mut noop_context()).is_pending()); @@ -26,11 +30,6 @@ fn works_1() { #[test] fn works_2() { - use futures::channel::oneshot; - use futures::future::{join, FutureExt}; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -38,7 +37,9 @@ fn works_2() { let mut stream = vec![ a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed(), - ].into_iter().collect::>(); + ] + .into_iter() + .collect::>(); let mut cx = noop_context(); a_tx.send(33).unwrap(); @@ -51,37 +52,33 @@ fn works_2() { #[test] fn from_iterator() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{StreamExt, FuturesOrdered}; - let stream = vec![ future::ready::(1), future::ready::(2), - future::ready::(3) - ].into_iter().collect::>(); + future::ready::(3), + ] + .into_iter() + .collect::>(); assert_eq!(stream.len(), 3); - assert_eq!(block_on(stream.collect::>()), vec![1,2,3]); + assert_eq!(block_on(stream.collect::>()), vec![1, 2, 3]); } #[test] fn queue_never_unblocked() { - use futures::channel::oneshot; - use futures::future::{self, Future, TryFutureExt}; - use futures::stream::{StreamExt, FuturesOrdered}; - use futures_test::task::noop_context; - use std::any::Any; - let (_a_tx, a_rx) = oneshot::channel::>(); let (b_tx, b_rx) = oneshot::channel::>(); let (c_tx, c_rx) = oneshot::channel::>(); let mut stream = vec![ Box::new(a_rx) as Box + Unpin>, - Box::new(future::try_select(b_rx, c_rx) - .map_err(|e| e.factor_first().0) - .and_then(|e| future::ok(Box::new(e) as Box))) as _, - ].into_iter().collect::>(); + Box::new( + future::try_select(b_rx, c_rx) + .map_err(|e| e.factor_first().0) + .and_then(|e| future::ok(Box::new(e) as Box)), + ) as _, + ] + .into_iter() + .collect::>(); let cx = &mut noop_context(); for _ in 0..10 { diff --git a/futures/tests/futures_unordered.rs b/futures/tests/futures_unordered.rs index ac0817e079..e55a91bdd4 100644 --- a/futures/tests/futures_unordered.rs +++ b/futures/tests/futures_unordered.rs @@ -1,17 +1,17 @@ -use futures::future::Future; -use futures::stream::{FuturesUnordered, StreamExt}; +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{self, join, Future, FutureExt}; +use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; use futures_test::task::noop_context; +use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; use std::iter::FromIterator; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; #[test] fn is_terminated() { - use futures::future; - use futures::stream::{FusedStream, FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let mut cx = noop_context(); let mut tasks = FuturesUnordered::new(); @@ -39,10 +39,6 @@ fn is_terminated() { #[test] fn works_1() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -65,12 +61,6 @@ fn works_1() { #[test] fn works_2() { - use futures::channel::oneshot; - use futures::future::{join, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -94,10 +84,6 @@ fn works_2() { #[test] fn from_iterator() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - let stream = vec![ future::ready::(1), future::ready::(2), @@ -111,12 +97,6 @@ fn from_iterator() { #[test] fn finished_future() { - use std::marker::Unpin; - use futures::channel::oneshot; - use futures::future::{self, Future, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::task::noop_context; - let (_a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -142,10 +122,6 @@ fn finished_future() { #[test] fn iter_mut_cancel() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - let (a_tx, a_rx) = oneshot::channel::(); let (b_tx, b_rx) = oneshot::channel::(); let (c_tx, c_rx) = oneshot::channel::(); @@ -172,9 +148,6 @@ fn iter_mut_cancel() { #[test] fn iter_mut_len() { - use futures::future; - use futures::stream::FuturesUnordered; - let mut stream = vec![ future::pending::<()>(), future::pending::<()>(), @@ -196,15 +169,6 @@ fn iter_mut_len() { #[test] fn iter_cancel() { - use std::marker::Unpin; - use std::pin::Pin; - use std::sync::atomic::{AtomicBool, Ordering}; - - use futures::executor::block_on_stream; - use futures::future::{self, Future, FutureExt}; - use futures::stream::FuturesUnordered; - use futures::task::{Context, Poll}; - struct AtomicCancel { future: F, cancel: AtomicBool, @@ -224,7 +188,10 @@ fn iter_cancel() { impl AtomicCancel { fn new(future: F) -> Self { - Self { future, cancel: AtomicBool::new(false) } + Self { + future, + cancel: AtomicBool::new(false), + } } } @@ -250,9 +217,6 @@ fn iter_cancel() { #[test] fn iter_len() { - use futures::future; - use futures::stream::FuturesUnordered; - let stream = vec![ future::pending::<()>(), future::pending::<()>(), @@ -274,11 +238,6 @@ fn iter_len() { #[test] fn futures_not_moved_after_poll() { - use futures::future; - use futures::stream::FuturesUnordered; - use futures_test::future::FutureTestExt; - use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; - // Future that will be ready after being polled twice, // asserting that it does not move. let fut = future::ready(()).pending_once().assert_unmoved(); @@ -292,11 +251,6 @@ fn futures_not_moved_after_poll() { #[test] fn len_valid_during_out_of_order_completion() { - use futures::channel::oneshot; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - // Complete futures out-of-order and add new futures afterwards to ensure // length values remain correct. let (a_tx, a_rx) = oneshot::channel::(); diff --git a/futures/tests/inspect.rs b/futures/tests/inspect.rs index 375778b63d..eacd1f78a2 100644 --- a/futures/tests/inspect.rs +++ b/futures/tests/inspect.rs @@ -1,12 +1,14 @@ +use futures::executor::block_on; +use futures::future::{self, FutureExt}; + #[test] fn smoke() { - use futures::executor::block_on; - use futures::future::{self, FutureExt}; - let mut counter = 0; { - let work = future::ready::(40).inspect(|val| { counter += *val; }); + let work = future::ready::(40).inspect(|val| { + counter += *val; + }); assert_eq!(block_on(work), 40); } diff --git a/futures/tests/io_buf_reader.rs b/futures/tests/io_buf_reader.rs index f8f9d140e3..f7d6de1cf9 100644 --- a/futures/tests/io_buf_reader.rs +++ b/futures/tests/io_buf_reader.rs @@ -1,9 +1,17 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AllowStdIo, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, + BufReader, Cursor, SeekFrom, +}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::cmp; +use std::io; +use std::pin::Pin; + macro_rules! run_fill_buf { ($reader:expr) => {{ - use futures_test::task::noop_context; - use futures::task::Poll; - use std::pin::Pin; - let mut cx = noop_context(); loop { if let Poll::Ready(x) = Pin::new(&mut $reader).poll_fill_buf(&mut cx) { @@ -13,80 +21,69 @@ macro_rules! run_fill_buf { }}; } -mod util { - use futures::future::Future; - pub fn run(mut f: F) -> F::Output { - use futures_test::task::noop_context; - use futures::task::Poll; - use futures::future::FutureExt; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } } } -mod maybe_pending { - use futures::task::{Context,Poll}; - use std::{cmp,io}; - use std::pin::Pin; - use futures::io::{AsyncRead,AsyncBufRead}; - - pub struct MaybePending<'a> { - inner: &'a [u8], - ready_read: bool, - ready_fill_buf: bool, - } +struct MaybePending<'a> { + inner: &'a [u8], + ready_read: bool, + ready_fill_buf: bool, +} - impl<'a> MaybePending<'a> { - pub fn new(inner: &'a [u8]) -> Self { - Self { inner, ready_read: false, ready_fill_buf: false } +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { + inner, + ready_read: false, + ready_fill_buf: false, } } +} - impl AsyncRead for MaybePending<'_> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll> - { - if self.ready_read { - self.ready_read = false; - Pin::new(&mut self.inner).poll_read(cx, buf) - } else { - self.ready_read = true; - Poll::Pending - } +impl AsyncRead for MaybePending<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if self.ready_read { + self.ready_read = false; + Pin::new(&mut self.inner).poll_read(cx, buf) + } else { + self.ready_read = true; + Poll::Pending } } +} - impl AsyncBufRead for MaybePending<'_> { - fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) - -> Poll> - { - if self.ready_fill_buf { - self.ready_fill_buf = false; - if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } - let len = cmp::min(2, self.inner.len()); - Poll::Ready(Ok(&self.inner[0..len])) - } else { - self.ready_fill_buf = true; - Poll::Pending +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + if self.ready_fill_buf { + self.ready_fill_buf = false; + if self.inner.is_empty() { + return Poll::Ready(Ok(&[])); } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready_fill_buf = true; + Poll::Pending } + } - fn consume(mut self: Pin<&mut Self>, amt: usize) { - self.inner = &self.inner[amt..]; - } + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; } } #[test] fn test_buffered_reader() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, BufReader}; - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, inner); @@ -124,17 +121,15 @@ fn test_buffered_reader() { #[test] fn test_buffered_reader_seek() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncBufRead, BufReader, Cursor, SeekFrom}; - use std::pin::Pin; - use util::run; - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, Cursor::new(inner)); assert_eq!(block_on(reader.seek(SeekFrom::Start(3))).ok(), Some(3)); assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); + assert_eq!( + run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), + None + ); assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); assert_eq!(block_on(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); @@ -144,13 +139,9 @@ fn test_buffered_reader_seek() { #[test] fn test_buffered_reader_seek_underflow() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncBufRead, AllowStdIo, BufReader, SeekFrom}; - use std::io; - // gimmick reader that yields its position modulo 256 for each byte struct PositionReader { - pos: u64 + pos: u64, } impl io::Read for PositionReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { @@ -181,23 +172,28 @@ fn test_buffered_reader_seek_underflow() { let mut reader = BufReader::with_capacity(5, AllowStdIo::new(PositionReader { pos: 0 })); assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1, 2, 3, 4][..])); - assert_eq!(block_on(reader.seek(SeekFrom::End(-5))).ok(), Some(u64::max_value()-5)); + assert_eq!( + block_on(reader.seek(SeekFrom::End(-5))).ok(), + Some(u64::max_value() - 5) + ); assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); // the following seek will require two underlying seeks let expected = 9_223_372_036_854_775_802; - assert_eq!(block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), Some(expected)); + assert_eq!( + block_on(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), + Some(expected) + ); assert_eq!(run_fill_buf!(reader).ok().map(|s| s.len()), Some(5)); // seeking to 0 should empty the buffer. - assert_eq!(block_on(reader.seek(SeekFrom::Current(0))).ok(), Some(expected)); + assert_eq!( + block_on(reader.seek(SeekFrom::Current(0))).ok(), + Some(expected) + ); assert_eq!(reader.get_ref().get_ref().pos, expected); } #[test] fn test_short_reads() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, AllowStdIo, BufReader}; - use std::io; - /// A dummy reader intended at testing short-reads propagation. struct ShortReader { lengths: Vec, @@ -213,7 +209,9 @@ fn test_short_reads() { } } - let inner = ShortReader { lengths: vec![0, 1, 2, 0, 1, 0] }; + let inner = ShortReader { + lengths: vec![0, 1, 2, 0, 1, 0], + }; let mut reader = BufReader::new(AllowStdIo::new(inner)); let mut buf = [0, 0]; assert_eq!(block_on(reader.read(&mut buf)).unwrap(), 0); @@ -227,10 +225,6 @@ fn test_short_reads() { #[test] fn maybe_pending() { - use futures::io::{AsyncReadExt, BufReader}; - use util::run; - use maybe_pending::MaybePending; - let inner: &[u8] = &[5, 6, 7, 0, 1, 2, 3, 4]; let mut reader = BufReader::with_capacity(2, MaybePending::new(inner)); @@ -268,10 +262,6 @@ fn maybe_pending() { #[test] fn maybe_pending_buf_read() { - use futures::io::{AsyncBufReadExt, BufReader}; - use util::run; - use maybe_pending::MaybePending; - let inner = MaybePending::new(&[0, 1, 2, 3, 1, 0]); let mut reader = BufReader::with_capacity(2, inner); let mut v = Vec::new(); @@ -291,36 +281,35 @@ fn maybe_pending_buf_read() { // https://github.com/rust-lang/futures-rs/pull/1573#discussion_r281162309 #[test] fn maybe_pending_seek() { - use futures::io::{AsyncBufRead, AsyncSeek, AsyncSeekExt, AsyncRead, BufReader, - Cursor, SeekFrom - }; - use futures::task::{Context,Poll}; - use std::io; - use std::pin::Pin; - use util::run; - pub struct MaybePendingSeek<'a> { + struct MaybePendingSeek<'a> { inner: Cursor<&'a [u8]>, ready: bool, } impl<'a> MaybePendingSeek<'a> { - pub fn new(inner: &'a [u8]) -> Self { - Self { inner: Cursor::new(inner), ready: true } + fn new(inner: &'a [u8]) -> Self { + Self { + inner: Cursor::new(inner), + ready: true, + } } } impl AsyncRead for MaybePendingSeek<'_> { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) - -> Poll> - { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { Pin::new(&mut self.inner).poll_read(cx, buf) } } impl AsyncBufRead for MaybePendingSeek<'_> { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) - -> Poll> - { + fn poll_fill_buf( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let this: *mut Self = &mut *self as *mut _; Pin::new(&mut unsafe { &mut *this }.inner).poll_fill_buf(cx) } @@ -331,9 +320,11 @@ fn maybe_pending_seek() { } impl AsyncSeek for MaybePendingSeek<'_> { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll> - { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { if self.ready { self.ready = false; Pin::new(&mut self.inner).poll_seek(cx, pos) @@ -349,7 +340,10 @@ fn maybe_pending_seek() { assert_eq!(run(reader.seek(SeekFrom::Current(3))).ok(), Some(3)); assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); - assert_eq!(run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), None); + assert_eq!( + run(reader.seek(SeekFrom::Current(i64::min_value()))).ok(), + None + ); assert_eq!(run_fill_buf!(reader).ok(), Some(&[0, 1][..])); assert_eq!(run(reader.seek(SeekFrom::Current(1))).ok(), Some(4)); assert_eq!(run_fill_buf!(reader).ok(), Some(&[1, 2][..])); diff --git a/futures/tests/io_buf_writer.rs b/futures/tests/io_buf_writer.rs index d58a6d801f..dec4026fae 100644 --- a/futures/tests/io_buf_writer.rs +++ b/futures/tests/io_buf_writer.rs @@ -1,67 +1,62 @@ -mod maybe_pending { - use futures::io::AsyncWrite; - use futures::task::{Context, Poll}; - use std::io; - use std::pin::Pin; - - pub struct MaybePending { - pub inner: Vec, - ready: bool, - } +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{ + AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom, +}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::io; +use std::pin::Pin; + +struct MaybePending { + inner: Vec, + ready: bool, +} - impl MaybePending { - pub fn new(inner: Vec) -> Self { - Self { inner, ready: false } +impl MaybePending { + fn new(inner: Vec) -> Self { + Self { + inner, + ready: false, } } +} - impl AsyncWrite for MaybePending { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.ready { - self.ready = false; - Pin::new(&mut self.inner).poll_write(cx, buf) - } else { - self.ready = true; - Poll::Pending - } +impl AsyncWrite for MaybePending { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.ready { + self.ready = false; + Pin::new(&mut self.inner).poll_write(cx, buf) + } else { + self.ready = true; + Poll::Pending } + } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_flush(cx) - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.inner).poll_close(cx) - } + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) } } -mod util { - use futures::future::Future; - - pub fn run(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures::task::Poll; - use futures_test::task::noop_context; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } } } #[test] fn buf_writer() { - use futures::executor::block_on; - use futures::io::{AsyncWriteExt, BufWriter}; - let mut writer = BufWriter::with_capacity(2, Vec::new()); block_on(writer.write(&[0, 1])).unwrap(); @@ -104,9 +99,6 @@ fn buf_writer() { #[test] fn buf_writer_inner_flushes() { - use futures::executor::block_on; - use futures::io::{AsyncWriteExt, BufWriter}; - let mut w = BufWriter::with_capacity(3, Vec::new()); block_on(w.write(&[0, 1])).unwrap(); assert_eq!(*w.get_ref(), []); @@ -117,9 +109,6 @@ fn buf_writer_inner_flushes() { #[test] fn buf_writer_seek() { - use futures::executor::block_on; - use futures::io::{AsyncSeekExt, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; - // FIXME: when https://github.com/rust-lang/futures-rs/issues/1510 fixed, // use `Vec::new` instead of `vec![0; 8]`. let mut w = BufWriter::with_capacity(3, Cursor::new(vec![0; 8])); @@ -135,11 +124,6 @@ fn buf_writer_seek() { #[test] fn maybe_pending_buf_writer() { - use futures::io::{AsyncWriteExt, BufWriter}; - - use maybe_pending::MaybePending; - use util::run; - let mut writer = BufWriter::with_capacity(2, MaybePending::new(Vec::new())); run(writer.write(&[0, 1])).unwrap(); @@ -173,20 +157,21 @@ fn maybe_pending_buf_writer() { run(writer.write(&[9, 10, 11])).unwrap(); assert_eq!(writer.buffer(), []); - assert_eq!(writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + assert_eq!( + writer.get_ref().inner, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + ); run(writer.flush()).unwrap(); assert_eq!(writer.buffer(), []); - assert_eq!(&writer.get_ref().inner, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + assert_eq!( + &writer.get_ref().inner, + &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] + ); } #[test] fn maybe_pending_buf_writer_inner_flushes() { - use futures::io::{AsyncWriteExt, BufWriter}; - - use maybe_pending::MaybePending; - use util::run; - let mut w = BufWriter::with_capacity(3, MaybePending::new(Vec::new())); run(w.write(&[0, 1])).unwrap(); assert_eq!(&w.get_ref().inner, &[]); @@ -197,13 +182,6 @@ fn maybe_pending_buf_writer_inner_flushes() { #[test] fn maybe_pending_buf_writer_seek() { - use futures::io::{AsyncSeek, AsyncSeekExt, AsyncWrite, AsyncWriteExt, BufWriter, Cursor, SeekFrom}; - use futures::task::{Context, Poll}; - use std::io; - use std::pin::Pin; - - use util::run; - struct MaybePendingSeek { inner: Cursor>, ready_write: bool, @@ -212,7 +190,11 @@ fn maybe_pending_buf_writer_seek() { impl MaybePendingSeek { fn new(inner: Vec) -> Self { - Self { inner: Cursor::new(inner), ready_write: false, ready_seek: false } + Self { + inner: Cursor::new(inner), + ready_write: false, + ready_seek: false, + } } } @@ -241,9 +223,11 @@ fn maybe_pending_buf_writer_seek() { } impl AsyncSeek for MaybePendingSeek { - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) - -> Poll> - { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { if self.ready_seek { self.ready_seek = false; Pin::new(&mut self.inner).poll_seek(cx, pos) @@ -260,9 +244,15 @@ fn maybe_pending_buf_writer_seek() { run(w.write_all(&[0, 1, 2, 3, 4, 5])).unwrap(); run(w.write_all(&[6, 7])).unwrap(); assert_eq!(run(w.seek(SeekFrom::Current(0))).ok(), Some(8)); - assert_eq!(&w.get_ref().inner.get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!( + &w.get_ref().inner.get_ref()[..], + &[0, 1, 2, 3, 4, 5, 6, 7][..] + ); assert_eq!(run(w.seek(SeekFrom::Start(2))).ok(), Some(2)); run(w.write_all(&[8, 9])).unwrap(); run(w.flush()).unwrap(); - assert_eq!(&w.into_inner().inner.into_inner()[..], &[0, 1, 8, 9, 4, 5, 6, 7]); + assert_eq!( + &w.into_inner().inner.into_inner()[..], + &[0, 1, 8, 9, 4, 5, 6, 7] + ); } diff --git a/futures/tests/io_cursor.rs b/futures/tests/io_cursor.rs index 4ba6342525..906e00d1be 100644 --- a/futures/tests/io_cursor.rs +++ b/futures/tests/io_cursor.rs @@ -1,35 +1,54 @@ +use assert_matches::assert_matches; +use futures::executor::block_on; +use futures::future::lazy; +use futures::io::{AsyncWrite, Cursor}; +use futures::task::Poll; +use std::pin::Pin; + #[test] fn cursor_asyncwrite_vec() { - use assert_matches::assert_matches; - use futures::future::lazy; - use futures::io::{AsyncWrite, Cursor}; - use futures::task::Poll; - use std::pin::Pin; - let mut cursor = Cursor::new(vec![0; 5]); - futures::executor::block_on(lazy(|cx| { - assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); - assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); - assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(2))); - assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(2))); + block_on(lazy(|cx| { + assert_matches!( + Pin::new(&mut cursor).poll_write(cx, &[1, 2]), + Poll::Ready(Ok(2)) + ); + assert_matches!( + Pin::new(&mut cursor).poll_write(cx, &[3, 4]), + Poll::Ready(Ok(2)) + ); + assert_matches!( + Pin::new(&mut cursor).poll_write(cx, &[5, 6]), + Poll::Ready(Ok(2)) + ); + assert_matches!( + Pin::new(&mut cursor).poll_write(cx, &[6, 7]), + Poll::Ready(Ok(2)) + ); })); assert_eq!(cursor.into_inner(), [1, 2, 3, 4, 5, 6, 6, 7]); } #[test] fn cursor_asyncwrite_box() { - use assert_matches::assert_matches; - use futures::future::lazy; - use futures::io::{AsyncWrite, Cursor}; - use futures::task::Poll; - use std::pin::Pin; - let mut cursor = Cursor::new(vec![0; 5].into_boxed_slice()); - futures::executor::block_on(lazy(|cx| { - assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[1, 2]), Poll::Ready(Ok(2))); - assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[3, 4]), Poll::Ready(Ok(2))); - assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[5, 6]), Poll::Ready(Ok(1))); - assert_matches!(Pin::new(&mut cursor).poll_write(cx, &[6, 7]), Poll::Ready(Ok(0))); + block_on(lazy(|cx| { + assert_matches!( + Pin::new(&mut cursor).poll_write(cx, &[1, 2]), + Poll::Ready(Ok(2)) + ); + assert_matches!( + Pin::new(&mut cursor).poll_write(cx, &[3, 4]), + Poll::Ready(Ok(2)) + ); + assert_matches!( + Pin::new(&mut cursor).poll_write(cx, &[5, 6]), + Poll::Ready(Ok(1)) + ); + assert_matches!( + Pin::new(&mut cursor).poll_write(cx, &[6, 7]), + Poll::Ready(Ok(0)) + ); })); assert_eq!(&*cursor.into_inner(), [1, 2, 3, 4, 5]); } diff --git a/futures/tests/io_lines.rs b/futures/tests/io_lines.rs index 2552c7c40a..06e1ac9b14 100644 --- a/futures/tests/io_lines.rs +++ b/futures/tests/io_lines.rs @@ -1,32 +1,34 @@ -mod util { - use futures::future::Future; - - pub fn run(mut f: F) -> F::Output { - use futures_test::task::noop_context; - use futures::task::Poll; - use futures::future::FutureExt; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; } } } -#[test] -fn lines() { - use futures::executor::block_on; - use futures::stream::StreamExt; - use futures::io::{AsyncBufReadExt, Cursor}; +macro_rules! block_on_next { + ($expr:expr) => { + block_on($expr.next()).unwrap().unwrap() + }; +} - macro_rules! block_on_next { - ($expr:expr) => { - block_on($expr.next()).unwrap().unwrap() - }; - } +macro_rules! run_next { + ($expr:expr) => { + run($expr.next()).unwrap().unwrap() + }; +} +#[test] +fn lines() { let buf = Cursor::new(&b"12\r"[..]); let mut s = buf.lines(); assert_eq!(block_on_next!(s), "12\r".to_string()); @@ -41,18 +43,6 @@ fn lines() { #[test] fn maybe_pending() { - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - - use util::run; - - macro_rules! run_next { - ($expr:expr) => { - run($expr.next()).unwrap().unwrap() - }; - } - let buf = stream::iter(vec![&b"12"[..], &b"\r"[..]]) .map(Ok) .into_async_read() diff --git a/futures/tests/io_read.rs b/futures/tests/io_read.rs index 5902ad0ed9..d39a6ea790 100644 --- a/futures/tests/io_read.rs +++ b/futures/tests/io_read.rs @@ -1,27 +1,26 @@ -mod mock_reader { - use futures::io::AsyncRead; - use std::io; - use std::pin::Pin; - use std::task::{Context, Poll}; +use futures::io::AsyncRead; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; - pub struct MockReader { - fun: Box Poll>>, - } +struct MockReader { + fun: Box Poll>>, +} - impl MockReader { - pub fn new(fun: impl FnMut(&mut [u8]) -> Poll> + 'static) -> Self { - Self { fun: Box::new(fun) } - } +impl MockReader { + fn new(fun: impl FnMut(&mut [u8]) -> Poll> + 'static) -> Self { + Self { fun: Box::new(fun) } } +} - impl AsyncRead for MockReader { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut [u8] - ) -> Poll> { - (self.get_mut().fun)(buf) - } +impl AsyncRead for MockReader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + (self.get_mut().fun)(buf) } } @@ -29,14 +28,6 @@ mod mock_reader { /// calls `poll_read` with an empty slice if no buffers are provided. #[test] fn read_vectored_no_buffers() { - use futures::io::AsyncRead; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_reader::MockReader; - let mut reader = MockReader::new(|buf| { assert_eq!(buf, b""); Err(io::ErrorKind::BrokenPipe.into()).into() @@ -53,14 +44,6 @@ fn read_vectored_no_buffers() { /// calls `poll_read` with the first non-empty buffer. #[test] fn read_vectored_first_non_empty() { - use futures::io::AsyncRead; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_reader::MockReader; - let mut reader = MockReader::new(|buf| { assert_eq!(buf.len(), 4); buf.copy_from_slice(b"four"); diff --git a/futures/tests/io_read_exact.rs b/futures/tests/io_read_exact.rs index bd4b36deaf..6582e50b80 100644 --- a/futures/tests/io_read_exact.rs +++ b/futures/tests/io_read_exact.rs @@ -1,14 +1,14 @@ +use futures::executor::block_on; +use futures::io::AsyncReadExt; + #[test] fn read_exact() { - use futures::executor::block_on; - use futures::io::AsyncReadExt; - let mut reader: &[u8] = &[1, 2, 3, 4, 5]; let mut out = [0u8; 3]; let res = block_on(reader.read_exact(&mut out)); // read 3 bytes out assert!(res.is_ok()); - assert_eq!(out, [1,2,3]); + assert_eq!(out, [1, 2, 3]); assert_eq!(reader.len(), 2); let res = block_on(reader.read_exact(&mut out)); // read another 3 bytes, but only 2 bytes left diff --git a/futures/tests/io_read_line.rs b/futures/tests/io_read_line.rs index 51e8126ada..6d82497495 100644 --- a/futures/tests/io_read_line.rs +++ b/futures/tests/io_read_line.rs @@ -1,8 +1,22 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + #[test] fn read_line() { - use futures::executor::block_on; - use futures::io::{AsyncBufReadExt, Cursor}; - let mut buf = Cursor::new(b"12"); let mut v = String::new(); assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); @@ -22,25 +36,6 @@ fn read_line() { #[test] fn maybe_pending() { - use futures::future::Future; - - fn run(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures::task::Poll; - use futures_test::task::noop_context; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } - } - - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - let mut buf = b"12".interleave_pending(); let mut v = String::new(); assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); diff --git a/futures/tests/io_read_to_end.rs b/futures/tests/io_read_to_end.rs index 892d463c2d..7122511fcb 100644 --- a/futures/tests/io_read_to_end.rs +++ b/futures/tests/io_read_to_end.rs @@ -1,4 +1,5 @@ use futures::{ + executor::block_on, io::{self, AsyncRead, AsyncReadExt}, task::{Context, Poll}, }; @@ -12,7 +13,7 @@ fn issue2310() { } impl MyRead { - pub fn new() -> Self { + fn new() -> Self { MyRead { first: false } } } @@ -39,7 +40,7 @@ fn issue2310() { } impl VecWrapper { - pub fn new() -> Self { + fn new() -> Self { VecWrapper { inner: Vec::new() } } } @@ -55,7 +56,7 @@ fn issue2310() { } } - futures::executor::block_on(async { + block_on(async { let mut vec = VecWrapper::new(); let mut read = MyRead::new(); diff --git a/futures/tests/io_read_to_string.rs b/futures/tests/io_read_to_string.rs index 2e9c00a138..ae6aaa21d8 100644 --- a/futures/tests/io_read_to_string.rs +++ b/futures/tests/io_read_to_string.rs @@ -1,8 +1,13 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + #[test] fn read_to_string() { - use futures::executor::block_on; - use futures::io::{AsyncReadExt, Cursor}; - let mut c = Cursor::new(&b""[..]); let mut v = String::new(); assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0); @@ -20,16 +25,7 @@ fn read_to_string() { #[test] fn interleave_pending() { - use futures::future::Future; - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncReadExt; - use futures_test::io::AsyncReadTestExt; - fn run(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures_test::task::noop_context; - use futures::task::Poll; - let mut cx = noop_context(); loop { if let Poll::Ready(x) = f.poll_unpin(&mut cx) { diff --git a/futures/tests/io_read_until.rs b/futures/tests/io_read_until.rs index 6fa22eee65..71f857f4b0 100644 --- a/futures/tests/io_read_until.rs +++ b/futures/tests/io_read_until.rs @@ -1,8 +1,22 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::io::{AsyncBufReadExt, Cursor}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + #[test] fn read_until() { - use futures::executor::block_on; - use futures::io::{AsyncBufReadExt, Cursor}; - let mut buf = Cursor::new(b"12"); let mut v = Vec::new(); assert_eq!(block_on(buf.read_until(b'3', &mut v)).unwrap(), 2); @@ -22,25 +36,6 @@ fn read_until() { #[test] fn maybe_pending() { - use futures::future::Future; - - fn run(mut f: F) -> F::Output { - use futures::future::FutureExt; - use futures_test::task::noop_context; - use futures::task::Poll; - - let mut cx = noop_context(); - loop { - if let Poll::Ready(x) = f.poll_unpin(&mut cx) { - return x; - } - } - } - - use futures::stream::{self, StreamExt, TryStreamExt}; - use futures::io::AsyncBufReadExt; - use futures_test::io::AsyncReadTestExt; - let mut buf = b"12".interleave_pending(); let mut v = Vec::new(); assert_eq!(run(buf.read_until(b'3', &mut v)).unwrap(), 2); diff --git a/futures/tests/io_write.rs b/futures/tests/io_write.rs index 363f32b1a6..227b4f9670 100644 --- a/futures/tests/io_write.rs +++ b/futures/tests/io_write.rs @@ -1,35 +1,34 @@ -mod mock_writer { - use futures::io::AsyncWrite; - use std::io; - use std::pin::Pin; - use std::task::{Context, Poll}; +use futures::io::AsyncWrite; +use futures_test::task::panic_context; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; - pub struct MockWriter { - fun: Box Poll>>, - } +struct MockWriter { + fun: Box Poll>>, +} - impl MockWriter { - pub fn new(fun: impl FnMut(&[u8]) -> Poll> + 'static) -> Self { - Self { fun: Box::new(fun) } - } +impl MockWriter { + fn new(fun: impl FnMut(&[u8]) -> Poll> + 'static) -> Self { + Self { fun: Box::new(fun) } } +} - impl AsyncWrite for MockWriter { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - (self.get_mut().fun)(buf) - } +impl AsyncWrite for MockWriter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + (self.get_mut().fun)(buf) + } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - panic!() - } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + panic!() + } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - panic!() - } + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + panic!() } } @@ -37,14 +36,6 @@ mod mock_writer { /// calls `poll_write` with an empty slice if no buffers are provided. #[test] fn write_vectored_no_buffers() { - use futures::io::AsyncWrite; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_writer::MockWriter; - let mut writer = MockWriter::new(|buf| { assert_eq!(buf, b""); Err(io::ErrorKind::BrokenPipe.into()).into() @@ -61,14 +52,6 @@ fn write_vectored_no_buffers() { /// calls `poll_write` with the first non-empty buffer. #[test] fn write_vectored_first_non_empty() { - use futures::io::AsyncWrite; - use futures_test::task::panic_context; - use std::io; - use std::pin::Pin; - use std::task::Poll; - - use mock_writer::MockWriter; - let mut writer = MockWriter::new(|buf| { assert_eq!(buf, b"four"); Poll::Ready(Ok(4)) @@ -77,7 +60,7 @@ fn write_vectored_first_non_empty() { let bufs = &mut [ io::IoSlice::new(&[]), io::IoSlice::new(&[]), - io::IoSlice::new(b"four") + io::IoSlice::new(b"four"), ]; let res = Pin::new(&mut writer).poll_write_vectored(cx, bufs); diff --git a/futures/tests/join_all.rs b/futures/tests/join_all.rs index c322e58a13..ae05a21b7c 100644 --- a/futures/tests/join_all.rs +++ b/futures/tests/join_all.rs @@ -1,25 +1,20 @@ -mod util { - use std::future::Future; - use std::fmt::Debug; - - pub fn assert_done(actual_fut: F, expected: T) - where - T: PartialEq + Debug, - F: FnOnce() -> Box + Unpin>, - { - use futures::executor::block_on; - - let output = block_on(actual_fut()); - assert_eq!(output, expected); - } +use futures::executor::block_on; +use futures::future::{join_all, ready, Future, JoinAll}; +use std::fmt::Debug; + +fn assert_done(actual_fut: F, expected: T) +where + T: PartialEq + Debug, + F: FnOnce() -> Box + Unpin>, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); } #[test] fn collect_collects() { - use futures_util::future::{join_all,ready}; - - util::assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); - util::assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); + assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]); + assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]); // REVIEW: should this be implemented? // assert_done(|| Box::new(join_all(Vec::::new())), vec![]); @@ -28,8 +23,6 @@ fn collect_collects() { #[test] fn join_all_iter_lifetime() { - use futures_util::future::{join_all,ready}; - use std::future::Future; // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `JoinAll`. fn sizes(bufs: Vec<&[u8]>) -> Box> + Unpin> { @@ -37,14 +30,12 @@ fn join_all_iter_lifetime() { Box::new(join_all(iter)) } - util::assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), vec![3_usize, 0, 1]); + assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]); } #[test] fn join_all_from_iter() { - use futures_util::future::{JoinAll,ready}; - - util::assert_done( + assert_done( || Box::new(vec![ready(1), ready(2)].into_iter().collect::>()), vec![1, 2], ) diff --git a/futures/tests/macro_comma_support.rs b/futures/tests/macro_comma_support.rs index ca131639dd..85871e98be 100644 --- a/futures/tests/macro_comma_support.rs +++ b/futures/tests/macro_comma_support.rs @@ -1,12 +1,13 @@ +use futures::{ + executor::block_on, + future::{self, FutureExt}, + join, ready, + task::Poll, + try_join, +}; + #[test] fn ready() { - use futures::{ - executor::block_on, - future, - task::Poll, - ready, - }; - block_on(future::poll_fn(|_| { ready!(Poll::Ready(()),); Poll::Ready(()) @@ -15,11 +16,7 @@ fn ready() { #[test] fn poll() { - use futures::{ - executor::block_on, - future::FutureExt, - poll, - }; + use futures::poll; block_on(async { let _ = poll!(async {}.boxed(),); @@ -28,11 +25,6 @@ fn poll() { #[test] fn join() { - use futures::{ - executor::block_on, - join - }; - block_on(async { let future1 = async { 1 }; let future2 = async { 2 }; @@ -42,12 +34,6 @@ fn join() { #[test] fn try_join() { - use futures::{ - executor::block_on, - future::FutureExt, - try_join, - }; - block_on(async { let future1 = async { 1 }.never_error(); let future2 = async { 2 }.never_error(); diff --git a/futures/tests/mutex.rs b/futures/tests/mutex.rs index 68e0301426..7c33864c76 100644 --- a/futures/tests/mutex.rs +++ b/futures/tests/mutex.rs @@ -1,9 +1,15 @@ +use futures::channel::mpsc; +use futures::executor::{block_on, ThreadPool}; +use futures::future::{ready, FutureExt}; +use futures::lock::Mutex; +use futures::stream::StreamExt; +use futures::task::{Context, SpawnExt}; +use futures_test::future::FutureTestExt; +use futures_test::task::{new_count_waker, panic_context}; +use std::sync::Arc; + #[test] fn mutex_acquire_uncontested() { - use futures::future::FutureExt; - use futures::lock::Mutex; - use futures_test::task::panic_context; - let mutex = Mutex::new(()); for _ in 0..10 { assert!(mutex.lock().poll_unpin(&mut panic_context()).is_ready()); @@ -12,11 +18,6 @@ fn mutex_acquire_uncontested() { #[test] fn mutex_wakes_waiters() { - use futures::future::FutureExt; - use futures::lock::Mutex; - use futures::task::Context; - use futures_test::task::{new_count_waker, panic_context}; - let mutex = Mutex::new(()); let (waker, counter) = new_count_waker(); let lock = mutex.lock().poll_unpin(&mut panic_context()); @@ -35,20 +36,8 @@ fn mutex_wakes_waiters() { #[test] fn mutex_contested() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::ready; - use futures::lock::Mutex; - use futures::stream::StreamExt; - use futures::task::SpawnExt; - use futures_test::future::FutureTestExt; - use std::sync::Arc; - let (tx, mut rx) = mpsc::unbounded(); - let pool = futures::executor::ThreadPool::builder() - .pool_size(16) - .create() - .unwrap(); + let pool = ThreadPool::builder().pool_size(16).create().unwrap(); let tx = Arc::new(tx); let mutex = Arc::new(Mutex::new(0)); diff --git a/futures/tests/oneshot.rs b/futures/tests/oneshot.rs index 2494306fad..867cb45ff1 100644 --- a/futures/tests/oneshot.rs +++ b/futures/tests/oneshot.rs @@ -1,11 +1,11 @@ +use futures::channel::oneshot; +use futures::future::{FutureExt, TryFutureExt}; +use futures_test::future::FutureTestExt; +use std::sync::mpsc; +use std::thread; + #[test] fn oneshot_send1() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); @@ -17,65 +17,46 @@ fn oneshot_send1() { #[test] fn oneshot_send2() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); thread::spawn(|| tx1.send(1).unwrap()).join().unwrap(); - rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background(); + rx1.map_ok(move |x| tx2.send(x).unwrap()) + .run_in_background(); assert_eq!(1, rx2.recv().unwrap()); } #[test] fn oneshot_send3() { - use futures::channel::oneshot; - use futures::future::TryFutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); - rx1.map_ok(move |x| tx2.send(x).unwrap()).run_in_background(); + rx1.map_ok(move |x| tx2.send(x).unwrap()) + .run_in_background(); thread::spawn(|| tx1.send(1).unwrap()).join().unwrap(); assert_eq!(1, rx2.recv().unwrap()); } #[test] fn oneshot_drop_tx1() { - use futures::channel::oneshot; - use futures::future::FutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); drop(tx1); - rx1.map(move |result| tx2.send(result).unwrap()).run_in_background(); + rx1.map(move |result| tx2.send(result).unwrap()) + .run_in_background(); assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap()); } #[test] fn oneshot_drop_tx2() { - use futures::channel::oneshot; - use futures::future::FutureExt; - use futures_test::future::FutureTestExt; - use std::sync::mpsc; - use std::thread; - let (tx1, rx1) = oneshot::channel::(); let (tx2, rx2) = mpsc::channel(); let t = thread::spawn(|| drop(tx1)); - rx1.map(move |result| tx2.send(result).unwrap()).run_in_background(); + rx1.map(move |result| tx2.send(result).unwrap()) + .run_in_background(); t.join().unwrap(); assert_eq!(Err(oneshot::Canceled), rx2.recv().unwrap()); @@ -83,8 +64,6 @@ fn oneshot_drop_tx2() { #[test] fn oneshot_drop_rx() { - use futures::channel::oneshot; - let (tx, rx) = oneshot::channel::(); drop(rx); assert_eq!(Err(2), tx.send(2)); diff --git a/futures/tests/ready_queue.rs b/futures/tests/ready_queue.rs index 9aa36362d0..b2b9bec461 100644 --- a/futures/tests/ready_queue.rs +++ b/futures/tests/ready_queue.rs @@ -1,18 +1,15 @@ -mod assert_send_sync { - use futures::stream::FuturesUnordered; - - pub trait AssertSendSync: Send + Sync {} - impl AssertSendSync for FuturesUnordered<()> {} -} +use futures::channel::oneshot; +use futures::executor::{block_on, block_on_stream}; +use futures::future; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; +use std::panic::{self, AssertUnwindSafe}; +use std::sync::{Arc, Barrier}; +use std::thread; #[test] fn basic_usage() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); @@ -41,12 +38,6 @@ fn basic_usage() { #[test] fn resolving_errors() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); let (tx1, rx1) = oneshot::channel(); @@ -61,13 +52,19 @@ fn resolving_errors() { drop(tx2); - assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert_eq!( + Poll::Ready(Some(Err(oneshot::Canceled))), + queue.poll_next_unpin(cx) + ); assert!(!queue.poll_next_unpin(cx).is_ready()); drop(tx1); tx3.send("world2").unwrap(); - assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert_eq!( + Poll::Ready(Some(Err(oneshot::Canceled))), + queue.poll_next_unpin(cx) + ); assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); })); @@ -75,12 +72,6 @@ fn resolving_errors() { #[test] fn dropping_ready_queue() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future; - use futures::stream::FuturesUnordered; - use futures_test::task::noop_context; - block_on(future::lazy(move |_| { let queue = FuturesUnordered::new(); let (mut tx1, rx1) = oneshot::channel::<()>(); @@ -108,12 +99,6 @@ fn dropping_ready_queue() { #[test] fn stress() { - use futures::channel::oneshot; - use futures::executor::block_on_stream; - use futures::stream::FuturesUnordered; - use std::sync::{Arc, Barrier}; - use std::thread; - const ITER: usize = 300; for i in 0..ITER { @@ -157,12 +142,6 @@ fn stress() { #[test] fn panicking_future_dropped() { - use futures::executor::block_on; - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::Poll; - use std::panic::{self, AssertUnwindSafe}; - block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); queue.push(future::poll_fn(|_| -> Poll> { panic!() })); diff --git a/futures/tests/recurse.rs b/futures/tests/recurse.rs index a151f1b1d4..d81753c9d7 100644 --- a/futures/tests/recurse.rs +++ b/futures/tests/recurse.rs @@ -1,10 +1,10 @@ +use futures::executor::block_on; +use futures::future::{self, BoxFuture, FutureExt}; +use std::sync::mpsc; +use std::thread; + #[test] fn lots() { - use futures::executor::block_on; - use futures::future::{self, FutureExt, BoxFuture}; - use std::sync::mpsc; - use std::thread; - #[cfg(not(futures_sanitizer))] const N: i32 = 1_000; #[cfg(futures_sanitizer)] // If N is many, asan reports stack-overflow: https://gist.github.com/taiki-e/099446d21cbec69d4acbacf7a9646136 @@ -20,8 +20,6 @@ fn lots() { } let (tx, rx) = mpsc::channel(); - thread::spawn(|| { - block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap())) - }); + thread::spawn(|| block_on(do_it((N, 0)).map(move |x| tx.send(x).unwrap()))); assert_eq!((0..=N).sum::(), rx.recv().unwrap()); } diff --git a/futures/tests/select_all.rs b/futures/tests/select_all.rs index 540db2c410..299b479044 100644 --- a/futures/tests/select_all.rs +++ b/futures/tests/select_all.rs @@ -1,14 +1,10 @@ +use futures::executor::block_on; +use futures::future::{ready, select_all}; +use std::collections::HashSet; + #[test] fn smoke() { - use futures::executor::block_on; - use futures::future::{ready, select_all}; - use std::collections::HashSet; - - let v = vec![ - ready(1), - ready(2), - ready(3), - ]; + let v = vec![ready(1), ready(2), ready(3)]; let mut c = vec![1, 2, 3].into_iter().collect::>(); diff --git a/futures/tests/select_ok.rs b/futures/tests/select_ok.rs index 81cadb7bae..8aec00362d 100644 --- a/futures/tests/select_ok.rs +++ b/futures/tests/select_ok.rs @@ -1,14 +1,9 @@ +use futures::executor::block_on; +use futures::future::{err, ok, select_ok}; + #[test] fn ignore_err() { - use futures::executor::block_on; - use futures::future::{err, ok, select_ok}; - - let v = vec![ - err(1), - err(2), - ok(3), - ok(4), - ]; + let v = vec![err(1), err(2), ok(3), ok(4)]; let (i, v) = block_on(select_ok(v)).ok().unwrap(); assert_eq!(i, 3); @@ -23,14 +18,7 @@ fn ignore_err() { #[test] fn last_err() { - use futures::executor::block_on; - use futures::future::{err, ok, select_ok}; - - let v = vec![ - ok(1), - err(2), - err(3), - ]; + let v = vec![ok(1), err(2), err(3)]; let (i, v) = block_on(select_ok(v)).ok().unwrap(); assert_eq!(i, 1); diff --git a/futures/tests/shared.rs b/futures/tests/shared.rs index cc0c6a20cf..662798bff4 100644 --- a/futures/tests/shared.rs +++ b/futures/tests/shared.rs @@ -1,22 +1,22 @@ -mod count_clone { - use std::cell::Cell; - use std::rc::Rc; - - pub struct CountClone(pub Rc>); - - impl Clone for CountClone { - fn clone(&self) -> Self { - self.0.set(self.0.get() + 1); - Self(self.0.clone()) - } +use futures::channel::oneshot; +use futures::executor::{block_on, LocalPool}; +use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt}; +use futures::task::LocalSpawn; +use std::cell::{Cell, RefCell}; +use std::rc::Rc; +use std::task::Poll; +use std::thread; + +struct CountClone(Rc>); + +impl Clone for CountClone { + fn clone(&self) -> Self { + self.0.set(self.0.get() + 1); + Self(self.0.clone()) } } fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::thread; let (tx, rx) = oneshot::channel::(); let f = rx.shared(); let join_handles = (0..threads_number) @@ -53,11 +53,6 @@ fn many_threads() { #[test] fn drop_on_one_task_ok() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::{self, FutureExt, TryFutureExt}; - use std::thread; - let (tx, rx) = oneshot::channel::(); let f1 = rx.shared(); let f2 = f1.clone(); @@ -86,11 +81,6 @@ fn drop_on_one_task_ok() { #[test] fn drop_in_poll() { - use futures::executor::block_on; - use futures::future::{self, FutureExt, LocalFutureObj}; - use std::cell::RefCell; - use std::rc::Rc; - let slot1 = Rc::new(RefCell::new(None)); let slot2 = slot1.clone(); @@ -108,11 +98,6 @@ fn drop_in_poll() { #[test] fn peek() { - use futures::channel::oneshot; - use futures::executor::LocalPool; - use futures::future::{FutureExt, LocalFutureObj}; - use futures::task::LocalSpawn; - let mut local_pool = LocalPool::new(); let spawn = &mut local_pool.spawner(); @@ -145,10 +130,6 @@ fn peek() { #[test] fn downgrade() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - let (tx, rx) = oneshot::channel::(); let shared = rx.shared(); // Since there are outstanding `Shared`s, we can get a `WeakShared`. @@ -173,14 +154,6 @@ fn downgrade() { #[test] fn dont_clone_in_single_owner_shared_future() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::rc::Rc; - - use count_clone::CountClone; - let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -193,14 +166,6 @@ fn dont_clone_in_single_owner_shared_future() { #[test] fn dont_do_unnecessary_clones_on_output() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::rc::Rc; - - use count_clone::CountClone; - let counter = CountClone(Rc::new(Cell::new(0))); let (tx, rx) = oneshot::channel(); @@ -215,11 +180,6 @@ fn dont_do_unnecessary_clones_on_output() { #[test] fn shared_future_that_wakes_itself_until_pending_is_returned() { - use futures::executor::block_on; - use futures::future::FutureExt; - use std::cell::Cell; - use std::task::Poll; - let proceed = Cell::new(false); let fut = futures::future::poll_fn(|cx| { if proceed.get() { diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index 597ed34c7a..6c27e9ae1f 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -1,249 +1,222 @@ -mod sassert_next { - use futures::stream::{Stream, StreamExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::fmt; - - pub fn sassert_next(s: &mut S, item: S::Item) - where - S: Stream + Unpin, - S::Item: Eq + fmt::Debug, - { - match s.poll_next_unpin(&mut panic_context()) { - Poll::Ready(None) => panic!("stream is at its end"), - Poll::Ready(Some(e)) => assert_eq!(e, item), - Poll::Pending => panic!("stream wasn't ready"), - } +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt}; +use futures::never::Never; +use futures::ready; +use futures::sink::{self, Sink, SinkErrInto, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures_test::task::panic_context; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +fn sassert_next(s: &mut S, item: S::Item) +where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, +{ + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), } } -mod unwrap { - use futures::task::Poll; - use std::fmt; - - pub fn unwrap(x: Poll>) -> T { - match x { - Poll::Ready(Ok(x)) => x, - Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), - Poll::Pending => panic!("Poll::Pending"), - } +fn unwrap(x: Poll>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"), + Poll::Pending => panic!("Poll::Pending"), } } -mod flag_cx { - use futures::task::{self, ArcWake, Context}; - use std::sync::Arc; - use std::sync::atomic::{AtomicBool, Ordering}; - - // An Unpark struct that records unpark events for inspection - pub struct Flag(AtomicBool); +// An Unpark struct that records unpark events for inspection +struct Flag(AtomicBool); - impl Flag { - pub fn new() -> Arc { - Arc::new(Self(AtomicBool::new(false))) - } - - pub fn take(&self) -> bool { - self.0.swap(false, Ordering::SeqCst) - } +impl Flag { + fn new() -> Arc { + Arc::new(Self(AtomicBool::new(false))) + } - pub fn set(&self, v: bool) { - self.0.store(v, Ordering::SeqCst) - } + fn take(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) } - impl ArcWake for Flag { - fn wake_by_ref(arc_self: &Arc) { - arc_self.set(true) - } + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) } +} - pub fn flag_cx(f: F) -> R - where - F: FnOnce(Arc, &mut Context<'_>) -> R, - { - let flag = Flag::new(); - let waker = task::waker_ref(&flag); - let cx = &mut Context::from_waker(&waker); - f(flag.clone(), cx) +impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc) { + arc_self.set(true) } } -mod start_send_fut { - use futures::future::Future; - use futures::ready; - use futures::sink::Sink; - use futures::task::{Context, Poll}; - use std::pin::Pin; +fn flag_cx(f: F) -> R +where + F: FnOnce(Arc, &mut Context<'_>) -> R, +{ + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) +} - // Sends a value on an i32 channel sink - pub struct StartSendFut + Unpin, Item: Unpin>(Option, Option); +// Sends a value on an i32 channel sink +struct StartSendFut + Unpin, Item: Unpin>(Option, Option); - impl + Unpin, Item: Unpin> StartSendFut { - pub fn new(sink: S, item: Item) -> Self { - Self(Some(sink), Some(item)) - } +impl + Unpin, Item: Unpin> StartSendFut { + fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) } +} - impl + Unpin, Item: Unpin> Future for StartSendFut { - type Output = Result; +impl + Unpin, Item: Unpin> Future for StartSendFut { + type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self(inner, item) = self.get_mut(); - { - let mut inner = inner.as_mut().unwrap(); - ready!(Pin::new(&mut inner).poll_ready(cx))?; - Pin::new(&mut inner).start_send(item.take().unwrap())?; - } - Poll::Ready(Ok(inner.take().unwrap())) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; } + Poll::Ready(Ok(inner.take().unwrap())) } } -mod manual_flush { - use futures::sink::Sink; - use futures::task::{Context, Poll, Waker}; - use std::mem; - use std::pin::Pin; - - // Immediately accepts all requests to start pushing, but completion is managed - // by manually flushing - pub struct ManualFlush { - data: Vec, - waiting_tasks: Vec, - } +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush { + data: Vec, + waiting_tasks: Vec, +} - impl Sink> for ManualFlush { - type Error = (); +impl Sink> for ManualFlush { + type Error = (); - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } - fn start_send(mut self: Pin<&mut Self>, item: Option) -> Result<(), Self::Error> { - if let Some(item) = item { - self.data.push(item); - } else { - self.force_flush(); - } - Ok(()) + fn start_send(mut self: Pin<&mut Self>, item: Option) -> Result<(), Self::Error> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); } + Ok(()) + } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.data.is_empty() { - Poll::Ready(Ok(())) - } else { - self.waiting_tasks.push(cx.waker().clone()); - Poll::Pending - } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending } + } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_flush(cx) - } + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) } +} - impl ManualFlush { - pub fn new() -> Self { - Self { - data: Vec::new(), - waiting_tasks: Vec::new(), - } +impl ManualFlush { + fn new() -> Self { + Self { + data: Vec::new(), + waiting_tasks: Vec::new(), } + } - pub fn force_flush(&mut self) -> Vec { - for task in self.waiting_tasks.drain(..) { - task.wake() - } - mem::replace(&mut self.data, Vec::new()) + fn force_flush(&mut self) -> Vec { + for task in self.waiting_tasks.drain(..) { + task.wake() } + mem::replace(&mut self.data, Vec::new()) } } -mod allowance { - use futures::sink::Sink; - use futures::task::{Context, Poll, Waker}; - use std::cell::{Cell, RefCell}; - use std::pin::Pin; - use std::rc::Rc; - - pub struct ManualAllow { - pub data: Vec, - allow: Rc, - } +struct ManualAllow { + data: Vec, + allow: Rc, +} - pub struct Allow { - flag: Cell, - tasks: RefCell>, - } +struct Allow { + flag: Cell, + tasks: RefCell>, +} - impl Allow { - pub fn new() -> Self { - Self { - flag: Cell::new(false), - tasks: RefCell::new(Vec::new()), - } - } - - pub fn check(&self, cx: &mut Context<'_>) -> bool { - if self.flag.get() { - true - } else { - self.tasks.borrow_mut().push(cx.waker().clone()); - false - } +impl Allow { + fn new() -> Self { + Self { + flag: Cell::new(false), + tasks: RefCell::new(Vec::new()), } + } - pub fn start(&self) { - self.flag.set(true); - let mut tasks = self.tasks.borrow_mut(); - for task in tasks.drain(..) { - task.wake(); - } + fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false } } - impl Sink for ManualAllow { - type Error = (); - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.allow.check(cx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); } + } +} - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - self.data.push(item); - Ok(()) - } +impl Sink for ManualAllow { + type Error = (); - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.allow.check(cx) { Poll::Ready(Ok(())) + } else { + Poll::Pending } + } - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + self.data.push(item); + Ok(()) } - pub fn manual_allow() -> (ManualAllow, Rc) { - let allow = Rc::new(Allow::new()); - let manual_allow = ManualAllow { - data: Vec::new(), - allow: allow.clone(), - }; - (manual_allow, allow) + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } } +fn manual_allow() -> (ManualAllow, Rc) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { + data: Vec::new(), + allow: allow.clone(), + }; + (manual_allow, allow) +} + #[test] fn either_sink() { - use futures::sink::{Sink, SinkExt}; - use std::collections::VecDeque; - use std::pin::Pin; - let mut s = if true { Vec::::new().left_sink() } else { @@ -255,10 +228,6 @@ fn either_sink() { #[test] fn vec_sink() { - use futures::executor::block_on; - use futures::sink::{Sink, SinkExt}; - use std::pin::Pin; - let mut v = Vec::new(); Pin::new(&mut v).start_send(0).unwrap(); Pin::new(&mut v).start_send(1).unwrap(); @@ -269,10 +238,6 @@ fn vec_sink() { #[test] fn vecdeque_sink() { - use futures::sink::Sink; - use std::collections::VecDeque; - use std::pin::Pin; - let mut deque = VecDeque::new(); Pin::new(&mut deque).start_send(2).unwrap(); Pin::new(&mut deque).start_send(3).unwrap(); @@ -284,9 +249,6 @@ fn vecdeque_sink() { #[test] fn send() { - use futures::executor::block_on; - use futures::sink::SinkExt; - let mut v = Vec::new(); block_on(v.send(0)).unwrap(); @@ -301,10 +263,6 @@ fn send() { #[test] fn send_all() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let mut v = Vec::new(); block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap(); @@ -321,15 +279,6 @@ fn send_all() { // channel is full #[test] fn mpsc_blocking_start_send() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::{self, FutureExt}; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use sassert_next::sassert_next; - use unwrap::unwrap; - let (mut tx, mut rx) = mpsc::channel::(0); block_on(future::lazy(|_| { @@ -353,17 +302,6 @@ fn mpsc_blocking_start_send() { // until a oneshot is completed #[test] fn with_flush() { - use futures::channel::oneshot; - use futures::executor::block_on; - use futures::future::{self, FutureExt, TryFutureExt}; - use futures::never::Never; - use futures::sink::{Sink, SinkExt}; - use std::mem; - use std::pin::Pin; - - use flag_cx::flag_cx; - use unwrap::unwrap; - let (tx, rx) = oneshot::channel(); let mut block = rx.boxed(); let mut sink = Vec::new().with(|elem| { @@ -390,11 +328,6 @@ fn with_flush() { // test simple use of with to change data #[test] fn with_as_map() { - use futures::executor::block_on; - use futures::future; - use futures::never::Never; - use futures::sink::SinkExt; - let mut sink = Vec::new().with(|item| future::ok::(item * 2)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -405,10 +338,6 @@ fn with_as_map() { // test simple use of with_flat_map #[test] fn with_flat_map() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -421,16 +350,6 @@ fn with_flat_map() { // Regression test for the issue #1834. #[test] fn with_propagates_poll_ready() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future; - use futures::sink::{Sink, SinkExt}; - use futures::task::Poll; - use std::pin::Pin; - - use flag_cx::flag_cx; - use sassert_next::sassert_next; - let (tx, mut rx) = mpsc::channel::(0); let mut tx = tx.with(|item: i32| future::ok::(item + 10)); @@ -457,14 +376,6 @@ fn with_propagates_poll_ready() { // but doesn't claim to be flushed until the underlying sink is #[test] fn with_flush_propagate() { - use futures::future::{self, FutureExt}; - use futures::sink::{Sink, SinkExt}; - use std::pin::Pin; - - use manual_flush::ManualFlush; - use flag_cx::flag_cx; - use unwrap::unwrap; - let mut sink = ManualFlush::new().with(future::ok::, ()>); flag_cx(|flag, cx| { unwrap(Pin::new(&mut sink).poll_ready(cx)); @@ -486,11 +397,6 @@ fn with_flush_propagate() { // test that `Clone` is implemented on `with` sinks #[test] fn with_implements_clone() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future; - use futures::{SinkExt, StreamExt}; - let (mut tx, rx) = mpsc::channel(5); { @@ -521,9 +427,6 @@ fn with_implements_clone() { // test that a buffer is a no-nop around a sink that always accepts sends #[test] fn buffer_noop() { - use futures::executor::block_on; - use futures::sink::SinkExt; - let mut sink = Vec::new().buffer(0); block_on(sink.send(0)).unwrap(); block_on(sink.send(1)).unwrap(); @@ -539,15 +442,6 @@ fn buffer_noop() { // and writing out when the underlying sink is ready #[test] fn buffer() { - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::sink::SinkExt; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use unwrap::unwrap; - use allowance::manual_allow; - let (sink, allow) = manual_allow::(); let sink = sink.buffer(2); @@ -567,10 +461,6 @@ fn buffer() { #[test] fn fanout_smoke() { - use futures::executor::block_on; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let sink1 = Vec::new(); let sink2 = Vec::new(); let mut sink = sink1.fanout(sink2); @@ -582,16 +472,6 @@ fn fanout_smoke() { #[test] fn fanout_backpressure() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::FutureExt; - use futures::sink::SinkExt; - use futures::stream::StreamExt; - - use start_send_fut::StartSendFut; - use flag_cx::flag_cx; - use unwrap::unwrap; - let (left_send, mut left_recv) = mpsc::channel(0); let (right_send, mut right_recv) = mpsc::channel(0); let sink = left_send.fanout(right_send); @@ -624,12 +504,6 @@ fn fanout_backpressure() { #[test] fn sink_map_err() { - use futures::channel::mpsc; - use futures::sink::{Sink, SinkExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::pin::Pin; - { let cx = &mut panic_context(); let (tx, _rx) = mpsc::channel(1); @@ -647,12 +521,6 @@ fn sink_map_err() { #[test] fn sink_unfold() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::poll_fn; - use futures::sink::{self, Sink, SinkExt}; - use futures::task::Poll; - block_on(poll_fn(|cx| { let (tx, mut rx) = mpsc::channel(1); let unfold = sink::unfold((), |(), i: i32| { @@ -685,14 +553,8 @@ fn sink_unfold() { #[test] fn err_into() { - use futures::channel::mpsc; - use futures::sink::{Sink, SinkErrInto, SinkExt}; - use futures::task::Poll; - use futures_test::task::panic_context; - use std::pin::Pin; - #[derive(Copy, Clone, Debug, PartialEq, Eq)] - pub struct ErrIntoTest; + struct ErrIntoTest; impl From for ErrIntoTest { fn from(_: mpsc::SendError) -> Self { diff --git a/futures/tests/sink_fanout.rs b/futures/tests/sink_fanout.rs index 7d1fa43790..e57b2d8c7b 100644 --- a/futures/tests/sink_fanout.rs +++ b/futures/tests/sink_fanout.rs @@ -1,11 +1,11 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::join3; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; + #[test] fn it_works() { - use futures::channel::mpsc; - use futures::executor::block_on; - use futures::future::join3; - use futures::sink::SinkExt; - use futures::stream::{self, StreamExt}; - let (tx1, rx1) = mpsc::channel(1); let (tx2, rx2) = mpsc::channel(2); let tx = tx1.fanout(tx2).sink_map_err(|_| ()); diff --git a/futures/tests/split.rs b/futures/tests/split.rs index 86c2fc6b82..7cee9ba6a2 100644 --- a/futures/tests/split.rs +++ b/futures/tests/split.rs @@ -1,12 +1,12 @@ +use futures::executor::block_on; +use futures::sink::{Sink, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{Context, Poll}; +use pin_project::pin_project; +use std::pin::Pin; + #[test] fn test_split() { - use futures::executor::block_on; - use futures::sink::{Sink, SinkExt}; - use futures::stream::{self, Stream, StreamExt}; - use futures::task::{Context, Poll}; - use pin_project::pin_project; - use std::pin::Pin; - #[pin_project] struct Join { #[pin] @@ -18,10 +18,7 @@ fn test_split() { impl Stream for Join { type Item = T::Item; - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().stream.poll_next(cx) } } @@ -29,40 +26,28 @@ fn test_split() { impl, Item> Sink for Join { type Error = U::Error; - fn poll_ready( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().sink.poll_ready(cx) } - fn start_send( - self: Pin<&mut Self>, - item: Item, - ) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { self.project().sink.start_send(item) } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().sink.poll_flush(cx) } - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().sink.poll_close(cx) } } let mut dest: Vec = Vec::new(); { - let join = Join { + let join = Join { stream: stream::iter(vec![10, 20, 30]), - sink: &mut dest + sink: &mut dest, }; let (sink, stream) = join.split(); diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index 14b283db7a..f2027b5d58 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -1,8 +1,14 @@ +use futures::channel::mpsc; +use futures::executor::block_on; +use futures::future::{self, Future}; +use futures::sink::SinkExt; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use futures::FutureExt; +use futures_test::task::noop_context; + #[test] fn select() { - use futures::executor::block_on; - use futures::stream::{self, StreamExt}; - fn select_and_compare(a: Vec, b: Vec, expected: Vec) { let a = stream::iter(a); let b = stream::iter(b); @@ -17,9 +23,7 @@ fn select() { #[test] fn flat_map() { - use futures::stream::{self, StreamExt}; - - futures::executor::block_on(async { + block_on(async { let st = stream::iter(vec![ stream::iter(0..=4u8), stream::iter(6..=10), @@ -37,28 +41,21 @@ fn flat_map() { #[test] fn scan() { - use futures::stream::{self, StreamExt}; - - futures::executor::block_on(async { - assert_eq!( - stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) - .scan(1, |state, e| { - *state += 1; - futures::future::ready(if e < *state { Some(e) } else { None }) - }) - .collect::>() - .await, - vec![1u8, 2, 3, 4] - ); + block_on(async { + let values = stream::iter(vec![1u8, 2, 3, 4, 6, 8, 2]) + .scan(1, |state, e| { + *state += 1; + futures::future::ready(if e < *state { Some(e) } else { None }) + }) + .collect::>() + .await; + + assert_eq!(values, vec![1u8, 2, 3, 4]); }); } #[test] fn take_until() { - use futures::future::{self, Future}; - use futures::stream::{self, StreamExt}; - use futures::task::Poll; - fn make_stop_fut(stop_on: u32) -> impl Future { let mut i = 0; future::poll_fn(move |_cx| { @@ -71,7 +68,7 @@ fn take_until() { }) } - futures::executor::block_on(async { + block_on(async { // Verify stopping works: let stream = stream::iter(1u32..=10); let stop_fut = make_stop_fut(5); @@ -123,10 +120,15 @@ fn take_until() { #[test] #[should_panic] -fn ready_chunks_panic_on_cap_zero() { - use futures::channel::mpsc; - use futures::stream::StreamExt; +fn chunks_panic_on_cap_zero() { + let (_, rx1) = mpsc::channel::<()>(1); + + let _ = rx1.chunks(0); +} +#[test] +#[should_panic] +fn ready_chunks_panic_on_cap_zero() { let (_, rx1) = mpsc::channel::<()>(1); let _ = rx1.ready_chunks(0); @@ -134,12 +136,6 @@ fn ready_chunks_panic_on_cap_zero() { #[test] fn ready_chunks() { - use futures::channel::mpsc; - use futures::stream::StreamExt; - use futures::sink::SinkExt; - use futures::FutureExt; - use futures_test::task::noop_context; - let (mut tx, rx1) = mpsc::channel::(16); let mut s = rx1.ready_chunks(2); @@ -147,14 +143,14 @@ fn ready_chunks() { let mut cx = noop_context(); assert!(s.next().poll_unpin(&mut cx).is_pending()); - futures::executor::block_on(async { + block_on(async { tx.send(1).await.unwrap(); assert_eq!(s.next().await.unwrap(), vec![1]); tx.send(2).await.unwrap(); tx.send(3).await.unwrap(); tx.send(4).await.unwrap(); - assert_eq!(s.next().await.unwrap(), vec![2,3]); + assert_eq!(s.next().await.unwrap(), vec![2, 3]); assert_eq!(s.next().await.unwrap(), vec![4]); }); } diff --git a/futures/tests/stream_catch_unwind.rs b/futures/tests/stream_catch_unwind.rs index 272558cc61..8b23a0a7ef 100644 --- a/futures/tests/stream_catch_unwind.rs +++ b/futures/tests/stream_catch_unwind.rs @@ -1,8 +1,8 @@ +use futures::executor::block_on_stream; +use futures::stream::{self, StreamExt}; + #[test] fn panic_in_the_middle_of_the_stream() { - use futures::executor::block_on_stream; - use futures::stream::{self, StreamExt}; - let stream = stream::iter(vec![Some(10), None, Some(11)]); // panic on second element @@ -16,9 +16,6 @@ fn panic_in_the_middle_of_the_stream() { #[test] fn no_panic() { - use futures::executor::block_on_stream; - use futures::stream::{self, StreamExt}; - let stream = stream::iter(vec![10, 11, 12]); let mut iter = block_on_stream(stream.catch_unwind()); diff --git a/futures/tests/stream_into_async_read.rs b/futures/tests/stream_into_async_read.rs index 222c985706..60188d3e58 100644 --- a/futures/tests/stream_into_async_read.rs +++ b/futures/tests/stream_into_async_read.rs @@ -1,31 +1,51 @@ -#[test] -fn test_into_async_read() { - use core::pin::Pin; - use futures::io::AsyncRead; - use futures::stream::{self, TryStreamExt}; - use futures::task::Poll; - use futures_test::{task::noop_context, stream::StreamTestExt}; - - macro_rules! assert_read { - ($reader:expr, $buf:expr, $item:expr) => { - let mut cx = noop_context(); - loop { - match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { - Poll::Ready(Ok(x)) => { - assert_eq!(x, $item); - break; - } - Poll::Ready(Err(err)) => { - panic!("assertion failed: expected value but got {}", err); - } - Poll::Pending => { - continue; - } +use core::pin::Pin; +use futures::io::{AsyncBufRead, AsyncRead}; +use futures::stream::{self, TryStreamExt}; +use futures::task::Poll; +use futures_test::{stream::StreamTestExt, task::noop_context}; + +macro_rules! assert_read { + ($reader:expr, $buf:expr, $item:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_read(&mut cx, $buf) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $item); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; + } + } + } + }; +} + +macro_rules! assert_fill_buf { + ($reader:expr, $buf:expr) => { + let mut cx = noop_context(); + loop { + match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { + Poll::Ready(Ok(x)) => { + assert_eq!(x, $buf); + break; + } + Poll::Ready(Err(err)) => { + panic!("assertion failed: expected value but got {}", err); + } + Poll::Pending => { + continue; } } - }; - } + } + }; +} +#[test] +fn test_into_async_read() { let stream = stream::iter((1..=3).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); let mut reader = stream.interleave_pending().into_async_read(); let mut buf = vec![0; 3]; @@ -53,32 +73,6 @@ fn test_into_async_read() { #[test] fn test_into_async_bufread() { - use core::pin::Pin; - use futures::io::AsyncBufRead; - use futures::stream::{self, TryStreamExt}; - use futures::task::Poll; - use futures_test::{task::noop_context, stream::StreamTestExt}; - - macro_rules! assert_fill_buf { - ($reader:expr, $buf:expr) => { - let mut cx = noop_context(); - loop { - match Pin::new(&mut $reader).poll_fill_buf(&mut cx) { - Poll::Ready(Ok(x)) => { - assert_eq!(x, $buf); - break; - } - Poll::Ready(Err(err)) => { - panic!("assertion failed: expected value but got {}", err); - } - Poll::Pending => { - continue; - } - } - } - }; - } - let stream = stream::iter((1..=2).flat_map(|_| vec![Ok(vec![]), Ok(vec![1, 2, 3, 4, 5])])); let mut reader = stream.interleave_pending().into_async_read(); diff --git a/futures/tests/stream_peekable.rs b/futures/tests/stream_peekable.rs index 66a7385ae9..b65a0572cb 100644 --- a/futures/tests/stream_peekable.rs +++ b/futures/tests/stream_peekable.rs @@ -1,9 +1,9 @@ +use futures::executor::block_on; +use futures::pin_mut; +use futures::stream::{self, Peekable, StreamExt}; + #[test] fn peekable() { - use futures::executor::block_on; - use futures::pin_mut; - use futures::stream::{self, Peekable, StreamExt}; - block_on(async { let peekable: Peekable<_> = stream::iter(vec![1u8, 2, 3]).peekable(); pin_mut!(peekable); diff --git a/futures/tests/stream_select_all.rs b/futures/tests/stream_select_all.rs index 6178412f4d..eb711dda0c 100644 --- a/futures/tests/stream_select_all.rs +++ b/futures/tests/stream_select_all.rs @@ -1,10 +1,12 @@ +use futures::channel::mpsc; +use futures::executor::block_on_stream; +use futures::future::{self, FutureExt}; +use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; + #[test] fn is_terminated() { - use futures::future::{self, FutureExt}; - use futures::stream::{FusedStream, SelectAll, StreamExt}; - use futures::task::Poll; - use futures_test::task::noop_context; - let mut cx = noop_context(); let mut tasks = SelectAll::new(); @@ -30,9 +32,6 @@ fn is_terminated() { #[test] fn issue_1626() { - use futures::executor::block_on_stream; - use futures::stream; - let a = stream::iter(0..=2); let b = stream::iter(10..=14); @@ -51,10 +50,6 @@ fn issue_1626() { #[test] fn works_1() { - use futures::channel::mpsc; - use futures::executor::block_on_stream; - use futures::stream::select_all; - let (a_tx, a_rx) = mpsc::unbounded::(); let (b_tx, b_rx) = mpsc::unbounded::(); let (c_tx, c_rx) = mpsc::unbounded::(); diff --git a/futures/tests/stream_select_next_some.rs b/futures/tests/stream_select_next_some.rs index bec5262c1d..8252ad7b54 100644 --- a/futures/tests/stream_select_next_some.rs +++ b/futures/tests/stream_select_next_some.rs @@ -1,11 +1,13 @@ +use futures::executor::block_on; +use futures::future::{self, FusedFuture, FutureExt}; +use futures::select; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use futures_test::future::FutureTestExt; +use futures_test::task::new_count_waker; + #[test] fn is_terminated() { - use futures::future; - use futures::future::{FusedFuture, FutureExt}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures::task::{Context, Poll}; - use futures_test::task::new_count_waker; - let (waker, counter) = new_count_waker(); let mut cx = Context::from_waker(&waker); @@ -30,15 +32,11 @@ fn is_terminated() { #[test] fn select() { - use futures::{future, select}; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::future::FutureTestExt; - // Checks that even though `async_tasks` will yield a `None` and return // `is_terminated() == true` during the first poll, it manages to toggle // back to having items after a future is pushed into it during the second // poll (after pending_once completes). - futures::executor::block_on(async { + block_on(async { let mut fut = future::ready(1).pending_once(); let mut async_tasks = FuturesUnordered::new(); let mut total = 0; @@ -61,17 +59,13 @@ fn select() { // Check that `select!` macro does not fail when importing from `futures_util`. #[test] fn futures_util_select() { - use futures::future; - use futures::stream::{FuturesUnordered, StreamExt}; - use futures_test::future::FutureTestExt; - use futures_util::select; // Checks that even though `async_tasks` will yield a `None` and return // `is_terminated() == true` during the first poll, it manages to toggle // back to having items after a future is pushed into it during the second // poll (after pending_once completes). - futures::executor::block_on(async { + block_on(async { let mut fut = future::ready(1).pending_once(); let mut async_tasks = FuturesUnordered::new(); let mut total = 0; diff --git a/futures/tests/try_join.rs b/futures/tests/try_join.rs index 6c6d0843d5..8b0b38c1a3 100644 --- a/futures/tests/try_join.rs +++ b/futures/tests/try_join.rs @@ -1,6 +1,6 @@ #![deny(unreachable_code)] -use futures::{try_join, executor::block_on}; +use futures::{executor::block_on, try_join}; // TODO: This abuses https://github.com/rust-lang/rust/issues/58733 in order to // test behaviour of the `try_join!` macro with the never type before it is @@ -14,7 +14,6 @@ impl MyTrait for fn() -> T { } type Never = ! as MyTrait>::Output; - #[test] fn try_join_never_error() { block_on(async { diff --git a/futures/tests/try_join_all.rs b/futures/tests/try_join_all.rs index 8e579a2800..d80a2c634a 100644 --- a/futures/tests/try_join_all.rs +++ b/futures/tests/try_join_all.rs @@ -1,27 +1,28 @@ -mod util { - use std::future::Future; - use futures::executor::block_on; - use std::fmt::Debug; - - pub fn assert_done(actual_fut: F, expected: T) - where - T: PartialEq + Debug, - F: FnOnce() -> Box + Unpin>, - { - let output = block_on(actual_fut()); - assert_eq!(output, expected); - } +use futures::executor::block_on; +use futures_util::future::{err, ok, try_join_all, TryJoinAll}; +use std::fmt::Debug; +use std::future::Future; + +fn assert_done(actual_fut: F, expected: T) +where + T: PartialEq + Debug, + F: FnOnce() -> Box + Unpin>, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); } #[test] fn collect_collects() { - use futures_util::future::{err, ok, try_join_all}; - - use util::assert_done; - - assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2])); + assert_done( + || Box::new(try_join_all(vec![ok(1), ok(2)])), + Ok::<_, usize>(vec![1, 2]), + ); assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2)); - assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1])); + assert_done( + || Box::new(try_join_all(vec![ok(1)])), + Ok::<_, usize>(vec![1]), + ); // REVIEW: should this be implemented? // assert_done(|| Box::new(try_join_all(Vec::::new())), Ok(vec![])); @@ -30,11 +31,6 @@ fn collect_collects() { #[test] fn try_join_all_iter_lifetime() { - use futures_util::future::{ok, try_join_all}; - use std::future::Future; - - use util::assert_done; - // In futures-rs version 0.1, this function would fail to typecheck due to an overly // conservative type parameterization of `TryJoinAll`. fn sizes(bufs: Vec<&[u8]>) -> Box, ()>> + Unpin> { @@ -42,15 +38,14 @@ fn try_join_all_iter_lifetime() { Box::new(try_join_all(iter)) } - assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3_usize, 0, 1])); + assert_done( + || sizes(vec![&[1, 2, 3], &[], &[0]]), + Ok(vec![3_usize, 0, 1]), + ); } #[test] fn try_join_all_from_iter() { - use futures_util::future::{ok, TryJoinAll}; - - use util::assert_done; - assert_done( || Box::new(vec![ok(1), ok(2)].into_iter().collect::>()), Ok::<_, usize>(vec![1, 2]), diff --git a/futures/tests/unfold.rs b/futures/tests/unfold.rs index 95722cf8a6..16b10813b1 100644 --- a/futures/tests/unfold.rs +++ b/futures/tests/unfold.rs @@ -1,10 +1,7 @@ use futures::future; use futures::stream; - use futures_test::future::FutureTestExt; -use futures_test::{ - assert_stream_done, assert_stream_next, assert_stream_pending, -}; +use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending}; #[test] fn unfold1() {