From f8e8506499df291846ae203333918ec6e802fa3c Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Wed, 3 Jul 2019 19:11:22 +0900 Subject: [PATCH] [WIP] Re-add disabled tests --- futures/tests/all.rs | 461 +++++++++++++++++ .../buffer_unordered.rs | 8 +- futures/{tests_disabled => tests}/eventual.rs | 65 ++- .../{tests_disabled => tests}/ready_queue.rs | 84 ++- futures/tests/sink.rs | 487 ++++++++++++++++++ futures/tests/stream.rs | 393 +++++++++++++- .../stream_catch_unwind.rs | 14 +- futures/tests/stream_select_all.rs | 32 +- futures/tests_disabled/all.rs | 351 ------------- futures/tests_disabled/sink.rs | 469 ----------------- futures/tests_disabled/stream.rs | 393 -------------- futures/tests_disabled/stream_select_all.rs | 35 -- 12 files changed, 1460 insertions(+), 1332 deletions(-) create mode 100644 futures/tests/all.rs rename futures/{tests_disabled => tests}/buffer_unordered.rs (91%) rename futures/{tests_disabled => tests}/eventual.rs (69%) rename futures/{tests_disabled => tests}/ready_queue.rs (58%) create mode 100644 futures/tests/sink.rs rename futures/{tests_disabled => tests}/stream_catch_unwind.rs (53%) delete mode 100644 futures/tests_disabled/all.rs delete mode 100644 futures/tests_disabled/sink.rs delete mode 100644 futures/tests_disabled/stream.rs delete mode 100644 futures/tests_disabled/stream_select_all.rs diff --git a/futures/tests/all.rs b/futures/tests/all.rs new file mode 100644 index 0000000000..54a815fc15 --- /dev/null +++ b/futures/tests/all.rs @@ -0,0 +1,461 @@ +use futures::channel::oneshot::{self, Canceled}; +use futures::executor::block_on; +use futures::future::{ + self, err, join, ok, try_join, try_select, Either, Future, FutureExt, TryFutureExt, +}; +use futures::task::Poll; +use futures_test::task::{noop_context, panic_context}; +use std::fmt; +use std::sync::mpsc::{channel, TryRecvError}; + +fn assert_done(actual_fut: F, expected: T) +where + T: PartialEq + fmt::Debug, + F: FnOnce() -> Fut, + Fut: Future + Unpin, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); +} + +fn assert_pending(mut f: F) +where + F: FnMut() -> Fut, + Fut: Future + Unpin, +{ + assert!(f().poll_unpin(&mut panic_context()).is_pending()); +} + +fn unwrap(x: Poll>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + _ => panic!(), + } +} + +fn unwrap_err(x: Poll>) -> E { + match x { + Poll::Ready(Err(x)) => x, + _ => panic!(), + } +} + +fn f_ok(a: i32) -> future::Ready> { + ok(a) +} +fn f_err(a: u32) -> future::Ready> { + err(a) +} +fn r_ok(a: i32) -> Result { + Ok(a) +} +fn r_err(a: u32) -> Result { + Err(a) +} + +fn unselect( + r: Result, Either<(E, B), (E, A)>>, +) -> future::Ready> { + match r { + Ok(Either::Left((t, _))) | Ok(Either::Right((t, _))) => ok(t), + Err(Either::Left((e, _))) | Err(Either::Right((e, _))) => err(e), + } +} + +#[test] +fn result_smoke() { + fn is_future_v(_: C) + where + A: Send + 'static, + B: Send + 'static, + C: Future>, + { + } + + is_future_v::(f_ok(1).map_ok(|a| a + 1)); + is_future_v::(f_ok(1).map_err(|a| a + 1)); + is_future_v::(f_ok(1).and_then(ok)); + is_future_v::(f_ok(1).or_else(err)); + is_future_v::<(i32, i32), u32, _>(try_join(f_ok(1), err(3))); + // is_future_v::(f_ok(1).map_ok(f_ok).flatten()); + + assert_done(|| f_ok(1), r_ok(1)); + assert_done(|| f_err(1), r_err(1)); + assert_done(|| future::ready(Ok(1)), r_ok(1)); + assert_done(|| future::ready(Err(1)), r_err(1)); + assert_done(|| ok(1), r_ok(1)); + assert_done(|| err(1), r_err(1)); + assert_done(|| f_ok(1).map_ok(|a| a + 2), r_ok(3)); + assert_done(|| f_err(1).map_ok(|a| a + 2), r_err(1)); + assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1)); + assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3)); + assert_done(|| f_ok(1).and_then(|a| ok(a + 2)), r_ok(3)); + assert_done(|| f_err(1).and_then(|a| ok(a + 2)), r_err(1)); + assert_done(|| f_ok(1).and_then(|a| err(a as u32 + 3)), r_err(4)); + assert_done(|| f_err(1).and_then(|a| err(a as u32 + 4)), r_err(1)); + assert_done(|| f_ok(1).or_else(|a| ok(a as i32 + 2)), r_ok(1)); + assert_done(|| f_err(1).or_else(|a| ok(a as i32 + 2)), r_ok(3)); + assert_done(|| f_ok(1).or_else(|a| err(a + 3)), r_ok(1)); + assert_done(|| f_err(1).or_else(|a| err(a + 4)), r_err(5)); + assert_done(|| try_select(f_ok(1), f_err(2)).then(unselect), r_ok(1)); + assert_done(|| try_select(f_ok(1), ok(2)).then(unselect), r_ok(1)); + assert_done(|| try_select(f_err(1), f_ok(1)).then(unselect), r_err(1)); + assert_done( + || try_select(f_ok(1), future::pending()).then(unselect), + Ok(1), + ); + assert_done( + || try_select(future::pending(), f_ok(1)).then(unselect), + Ok(1), + ); + assert_done(|| try_join(f_ok(1), f_err(1)), Err(1)); + assert_done(|| try_join(f_ok(1), ok(2)), Ok((1, 2))); + assert_done(|| try_join(f_err(1), f_ok(1)), Err(1)); + assert_done(|| f_ok(1).then(|_| ok(2)), r_ok(2)); + assert_done(|| f_ok(1).then(|_| err(2)), r_err(2)); + assert_done(|| f_err(1).then(|_| ok(2)), r_ok(2)); + assert_done(|| f_err(1).then(|_| err(2)), r_err(2)); +} + +#[test] +fn test_pending() { + fn pending() -> future::Pending> { + future::pending() + } + + assert_pending(|| pending()); + assert_pending(|| try_select(pending(), pending())); + assert_pending(|| join(pending(), pending())); + assert_pending(|| join(pending(), f_ok(1))); + assert_pending(|| join(f_ok(1), pending())); + assert_pending(|| pending().or_else(move |_| pending())); + assert_pending(|| pending().and_then(move |_| pending())); + assert_pending(|| f_err(1).or_else(move |_| pending())); + assert_pending(|| f_ok(1).and_then(move |_| pending())); + assert_pending(|| pending().map_ok(|a| a + 1)); + assert_pending(|| pending().map_err(|a| a + 1)); + assert_pending(|| pending().then(|a| future::ready(a))); +} + +#[test] +fn test_ok() { + assert_done(|| ok(1), r_ok(1)); + assert_done(|| err(1), r_err(1)); +} + +/* TODO: TryFutureExt::try_flatten +#[test] +fn flatten() { + fn ok(a: T) -> future::Ready> { + future::ok(a) + } + fn err(b: E) -> future::Ready> { + future::err(b) + } + + assert_done(|| ok(ok(1)).flatten(), r_ok(1)); + assert_done(|| ok(err(1)).flatten(), r_err(1)); + assert_done(|| err(1u32).map_ok(ok).flatten(), r_err(1)); + assert_done(|| ok(ok(1)).flatten(), r_ok(1)); + assert_pending(|| ok(future::pending::>()).flatten()); + assert_pending(|| future::pending::>().map_ok(ok).flatten()); +} +*/ + +#[test] +fn smoke_oneshot() { + assert_done( + || { + let (c, p) = oneshot::channel(); + c.send(1).unwrap(); + p + }, + Ok(1), + ); + assert_done( + || { + let (c, p) = oneshot::channel::(); + drop(c); + p + }, + Err(Canceled), + ); + let mut completes = Vec::new(); + assert_pending(|| { + let (a, b) = oneshot::channel::(); + completes.push(a); + b + }); + + let (c, mut p) = oneshot::channel::(); + drop(c); + unwrap_err(p.poll_unpin(&mut panic_context())); + let (c, p) = oneshot::channel::(); + drop(c); + let (tx, rx) = channel(); + p.then(move |_| tx.send(())); //.forget(); + rx.recv().unwrap(); +} + +#[test] +fn select_cancels() { + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map_ok(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map_ok(move |d| { + dtx.send(d).unwrap(); + d + }); + + let mut f = try_select(b, d).then(unselect); + // assert!(f.poll(&mut Task::new()).is_pending()); + assert!(brx.try_recv().is_err()); + assert!(drx.try_recv().is_err()); + a.send(1).unwrap(); + let cx = &mut noop_context(); + unwrap(f.poll_unpin(cx)); + assert_eq!(brx.recv().unwrap(), 1); + drop(c); + assert!(drx.recv().is_err()); + + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map_ok(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map_ok(move |d| { + dtx.send(d).unwrap(); + d + }); + + let mut f = try_select(b, d).then(unselect); + assert!(f.poll_unpin(cx).is_pending()); + assert!(f.poll_unpin(cx).is_pending()); + a.send(1).unwrap(); + unwrap(f.poll_unpin(cx)); + drop((c, f)); + assert!(drx.recv().is_err()); +} + +#[test] +fn join_cancels() { + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); + + let mut f = try_join(b, d); + drop(a); + unwrap_err(f.poll_unpin(&mut panic_context())); + drop(c); + assert!(drx.recv().is_err()); + + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, _brx), (dtx, drx)) = (channel(), channel()); + let b = b.map(move |b| { + btx.send(b).unwrap(); + b + }); + let d = d.map(move |d| { + dtx.send(d).unwrap(); + d + }); + + let (tx, rx) = channel(); + let f = try_join(b, d); + f.then(move |_| { + tx.send(()).unwrap(); + ok(()) + }); // .forget(); + assert!(rx.try_recv().is_err()); + drop(a); + rx.recv().unwrap(); + drop(c); + assert!(drx.recv().is_err()); +} + +#[test] +fn join_incomplete() { + let (a, b) = oneshot::channel::(); + let (tx, rx) = channel(); + let cx = &mut noop_context(); + let mut f = try_join(ok(1), b).map_ok(move |r| tx.send(r).unwrap()); + assert!(f.poll_unpin(cx).is_pending()); + assert!(rx.try_recv().is_err()); + a.send(2).unwrap(); + unwrap(f.poll_unpin(cx)); + assert_eq!(rx.recv().unwrap(), (1, 2)); + + let (a, b) = oneshot::channel::(); + let (tx, rx) = channel(); + let mut f = try_join(b, ok(2)).map_ok(move |r| tx.send(r).unwrap()); + assert!(f.poll_unpin(cx).is_pending()); + assert!(rx.try_recv().is_err()); + a.send(1).unwrap(); + unwrap(f.poll_unpin(cx)); + assert_eq!(rx.recv().unwrap(), (1, 2)); + + let (a, b) = oneshot::channel::(); + let (tx, rx) = channel(); + let mut f = try_join(ok(1), b).map_err(move |_r| tx.send(2).unwrap()); + assert!(f.poll_unpin(cx).is_pending()); + assert!(rx.try_recv().is_err()); + drop(a); + unwrap_err(f.poll_unpin(cx)); + assert_eq!(rx.recv().unwrap(), 2); + + let (a, b) = oneshot::channel::(); + let (tx, rx) = channel(); + let mut f = try_join(b, ok(2)).map_err(move |_r| tx.send(1).unwrap()); + assert!(f.poll_unpin(cx).is_pending()); + assert!(rx.try_recv().is_err()); + drop(a); + unwrap_err(f.poll_unpin(cx)); + assert_eq!(rx.recv().unwrap(), 1); +} + +#[test] +fn select() { + assert_done( + || try_select(f_ok(2), future::pending()).then(unselect), + Ok(2), + ); + assert_done( + || try_select(future::pending(), f_ok(2)).then(unselect), + Ok(2), + ); + assert_done( + || try_select(f_err(2), future::pending()).then(unselect), + Err(2), + ); + assert_done( + || try_select(future::pending(), f_err(2)).then(unselect), + Err(2), + ); + + assert_done( + || { + try_select(f_ok(1), f_ok(2)) + .map_err(|_| 0) + .and_then(|either_tup| { + let (a, b) = either_tup.into_inner(); + b.map_ok(move |b| a + b) + }) + }, + Ok(3), + ); + + // Finish one half of a select and then fail the second, ensuring that we + // get the notification of the second one. + { + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let f = try_select(b, d); + let (tx, rx) = channel(); + f.map_ok(move |r| tx.send(r).unwrap()); + //.forget(); + a.send(1).unwrap(); + let val = rx.recv().unwrap(); + assert_eq!(val, 1); + let (tx, rx) = channel(); + rx.map_err(move |_r| tx.send(2).unwrap()); //.forget(); + assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); + drop(c); + assert_eq!(rx.recv().unwrap(), 2); + } + + // Fail the second half and ensure that we see the first one finish + { + let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); + let f = try_select(b, d); + let (tx, rx) = channel(); + f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()); // .forget(); + drop(c); + assert_eq!(rx.recv().unwrap(), 1); + let (tx2, rx2) = channel(); + rx.map_ok(move |r| tx2.send(r).unwrap()); //.forget(); + assert_eq!(rx2.try_recv().err().unwrap(), TryRecvError::Empty); + a.send(2).unwrap(); + assert_eq!(rx2.recv().unwrap(), 2); + } + + // Cancelling the first half should cancel the second + { + let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map_ok(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map_ok(move |v| { + dtx.send(v).unwrap(); + v + }); + let f = try_select(b, d); + drop(f); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + } + + // Cancel after a schedule + { + let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map_ok(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map_ok(move |v| { + dtx.send(v).unwrap(); + v + }); + let mut f = try_select(b, d); + let _res = f.poll_unpin(&mut noop_context()); + drop(f); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + } + + // Cancel propagates + { + let ((a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); + let ((btx, brx), (dtx, drx)) = (channel(), channel()); + let b = b.map_ok(move |v| { + btx.send(v).unwrap(); + v + }); + let d = d.map_ok(move |v| { + dtx.send(v).unwrap(); + v + }); + let (tx, rx) = channel(); + try_select(b, d).map_ok(move |_| tx.send(()).unwrap()); //.forget(); + drop(a); + assert!(drx.recv().is_err()); + assert!(brx.recv().is_err()); + assert!(rx.recv().is_err()); + } + + // Cancel on early drop + { + let (tx, rx) = channel(); + let f = try_select( + f_ok(1), + future::pending::>().map_ok(move |()| { + tx.send(()).unwrap(); + 1 + }), + ); + drop(f); + assert!(rx.recv().is_err()); + } +} diff --git a/futures/tests_disabled/buffer_unordered.rs b/futures/tests/buffer_unordered.rs similarity index 91% rename from futures/tests_disabled/buffer_unordered.rs rename to futures/tests/buffer_unordered.rs index bcc012fed3..0aba0c5ca9 100644 --- a/futures/tests_disabled/buffer_unordered.rs +++ b/futures/tests/buffer_unordered.rs @@ -1,6 +1,6 @@ -use futures::SinkExt; +use futures::sink::SinkExt; use futures::executor::{block_on, block_on_stream}; -use futures::stream::StreamExt; +use futures::stream::{StreamExt, TryStreamExt}; use futures::channel::oneshot; use futures::channel::mpsc; use std::sync::mpsc as std_mpsc; @@ -17,13 +17,13 @@ fn works() { let t1 = thread::spawn(move || { for _ in 0..N+1 { let (mytx, myrx) = oneshot::channel(); - tx = block_on(tx.send(myrx)).unwrap(); + block_on(tx.send(myrx)).unwrap(); tx3.send(mytx).unwrap(); } rx2.recv().unwrap(); for _ in 0..N { let (mytx, myrx) = oneshot::channel(); - tx = block_on(tx.send(myrx)).unwrap(); + block_on(tx.send(myrx)).unwrap(); tx3.send(mytx).unwrap(); } }); diff --git a/futures/tests_disabled/eventual.rs b/futures/tests/eventual.rs similarity index 69% rename from futures/tests_disabled/eventual.rs rename to futures/tests/eventual.rs index c68828ec12..5779849470 100644 --- a/futures/tests_disabled/eventual.rs +++ b/futures/tests/eventual.rs @@ -1,18 +1,26 @@ use futures::channel::oneshot; -use futures::future::{ok, err}; +use futures::future::{self, ok, Future,FutureExt, TryFutureExt}; +use futures::executor::block_on; use std::sync::mpsc; use std::thread; - -mod support; -use support::*; - +use std::fmt; + +fn assert_done(actual_fut: F, expected: T) +where + T: PartialEq + fmt::Debug, + F: FnOnce() -> Fut, + Fut: Future + Unpin, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); +} #[test] fn join1() { let (tx, rx) = mpsc::channel(); - ok::(1).join(ok(2)) - .map(move |v| tx.send(v).unwrap()) - .forget(); + future::try_join(ok::(1), ok(2)) + .map(move |v| tx.send(v).unwrap()) + ;//.forget(); assert_eq!(rx.recv(), Ok((1, 2))); assert!(rx.recv().is_err()); } @@ -22,7 +30,9 @@ fn join2() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.join(p2).map(move |v| tx.send(v).unwrap()).forget(); + future::try_join(p1, p2) + .map(move |v| tx.send(v).unwrap()) + ;//.forget(); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); assert!(rx.try_recv().is_err()); @@ -36,7 +46,9 @@ fn join3() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.join(p2).map_err(move |_v| tx.send(1).unwrap()).forget(); + future::try_join(p1, p2) + .map_err(move |_v| tx.send(1).unwrap()) + ;//.forget(); assert!(rx.try_recv().is_err()); drop(c1); assert_eq!(rx.recv(), Ok(1)); @@ -49,7 +61,9 @@ fn join4() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.join(p2).map_err(move |v| tx.send(v).unwrap()).forget(); + future::try_join(p1, p2) + .map_err(move |v| tx.send(v).unwrap()) + ;//.forget(); assert!(rx.try_recv().is_err()); drop(c1); assert!(rx.recv().is_ok()); @@ -63,7 +77,9 @@ fn join5() { let (c2, p2) = oneshot::channel::(); let (c3, p3) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.join(p2).join(p3).map(move |v| tx.send(v).unwrap()).forget(); + future::try_join(future::try_join(p1, p2), p3) + .map(move |v| tx.send(v).unwrap()) + ;//.forget(); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); assert!(rx.try_recv().is_err()); @@ -79,7 +95,9 @@ fn select1() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.select(p2).map(move |v| tx.send(v).unwrap()).forget(); + future::try_select(p1, p2) + .map(move |v| tx.send(v).unwrap()) + ;//.forget(); assert!(rx.try_recv().is_err()); c1.send(1).unwrap(); let (v, p2) = rx.recv().unwrap().into_inner(); @@ -87,7 +105,8 @@ fn select1() { assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); - p2.map(move |v| tx.send(v).unwrap()).forget(); + p2.map(move |v| tx.send(v).unwrap()) + ;//.forget(); c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); @@ -98,7 +117,9 @@ fn select2() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.select(p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()).forget(); + future::try_select(p1, p2) + .map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()) + ;//.forget(); assert!(rx.try_recv().is_err()); drop(c1); let (v, p2) = rx.recv().unwrap(); @@ -106,7 +127,8 @@ fn select2() { assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); - p2.map(move |v| tx.send(v).unwrap()).forget(); + p2.map(move |v| tx.send(v).unwrap()) + ;//.forget(); c2.send(2).unwrap(); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); @@ -117,7 +139,9 @@ fn select3() { let (c1, p1) = oneshot::channel::(); let (c2, p2) = oneshot::channel::(); let (tx, rx) = mpsc::channel(); - p1.select(p2).map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()).forget(); + future::try_select(p1, p2) + .map_err(move |v| tx.send((1, v.into_inner().1)).unwrap()) + ;//.forget(); assert!(rx.try_recv().is_err()); drop(c1); let (v, p2) = rx.recv().unwrap(); @@ -125,7 +149,8 @@ fn select3() { assert!(rx.recv().is_err()); let (tx, rx) = mpsc::channel(); - p2.map_err(move |_v| tx.send(2).unwrap()).forget(); + p2.map_err(move |_v| tx.send(2).unwrap()) + ;//.forget(); drop(c2); assert_eq!(rx.recv(), Ok(2)); assert!(rx.recv().is_err()); @@ -147,7 +172,9 @@ fn select4() { let (c2, p2) = oneshot::channel::(); let tx3 = tx2.clone(); - p1.select(p2).map(move |_| tx3.send(()).unwrap()).forget(); + future::try_select(p1, p2) + .map(move |_| tx3.send(()).unwrap()) + ;//.forget(); tx.send(c1).unwrap(); rx2.recv().unwrap(); drop(c2); diff --git a/futures/tests_disabled/ready_queue.rs b/futures/tests/ready_queue.rs similarity index 58% rename from futures/tests_disabled/ready_queue.rs rename to futures/tests/ready_queue.rs index c3e07baabf..ae0b7c356d 100644 --- a/futures/tests_disabled/ready_queue.rs +++ b/futures/tests/ready_queue.rs @@ -1,11 +1,12 @@ +use futures::channel::oneshot; use futures::executor::{block_on, block_on_stream}; -use futures::Async::*; use futures::future; -use futures::stream::FuturesUnordered; -use futures::channel::oneshot; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::Poll; +use futures_test::task::noop_context; use std::panic::{self, AssertUnwindSafe}; - -mod support; +use std::sync::{Arc, Barrier}; +use std::thread; trait AssertSendSync: Send + Sync {} impl AssertSendSync for FuturesUnordered<()> {} @@ -22,22 +23,20 @@ fn basic_usage() { queue.push(rx2); queue.push(rx3); - assert!(!queue.poll_next(cx).unwrap().is_ready()); + assert!(!queue.poll_next_unpin(cx).is_ready()); tx2.send("hello").unwrap(); - assert_eq!(Ready(Some("hello")), queue.poll_next(cx).unwrap()); - assert!(!queue.poll_next(cx).unwrap().is_ready()); + assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx)); + assert!(!queue.poll_next_unpin(cx).is_ready()); tx1.send("world").unwrap(); tx3.send("world2").unwrap(); - assert_eq!(Ready(Some("world")), queue.poll_next(cx).unwrap()); - assert_eq!(Ready(Some("world2")), queue.poll_next(cx).unwrap()); - assert_eq!(Ready(None), queue.poll_next(cx).unwrap()); - - Ok::<_, ()>(()) - })).unwrap(); + assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); } #[test] @@ -52,22 +51,20 @@ fn resolving_errors() { queue.push(rx2); queue.push(rx3); - assert!(!queue.poll_next(cx).unwrap().is_ready()); + assert!(!queue.poll_next_unpin(cx).is_ready()); drop(tx2); - assert!(queue.poll_next(cx).is_err()); - assert!(!queue.poll_next(cx).unwrap().is_ready()); + assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert!(!queue.poll_next_unpin(cx).is_ready()); drop(tx1); tx3.send("world2").unwrap(); - assert!(queue.poll_next(cx).is_err()); - assert_eq!(Ready(Some("world2")), queue.poll_next(cx).unwrap()); - assert_eq!(Ready(None), queue.poll_next(cx).unwrap()); - - Ok::<_, ()>(()) - })).unwrap(); + assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx)); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); } #[test] @@ -82,29 +79,25 @@ fn dropping_ready_queue() { queue.push(rx2); queue.push(rx3); - support::noop_waker_lw(|cx| { - assert!(!tx1.poll_cancel(cx).unwrap().is_ready()); - assert!(!tx2.poll_cancel(cx).unwrap().is_ready()); - assert!(!tx3.poll_cancel(cx).unwrap().is_ready()); + { + let cx = &mut noop_context(); + assert!(!tx1.poll_cancel(cx).is_ready()); + assert!(!tx2.poll_cancel(cx).is_ready()); + assert!(!tx3.poll_cancel(cx).is_ready()); drop(queue); - assert!(tx1.poll_cancel(cx).unwrap().is_ready()); - assert!(tx2.poll_cancel(cx).unwrap().is_ready()); - assert!(tx3.poll_cancel(cx).unwrap().is_ready()); - }); - - Ok::<_, ()>(()).into_future() - })).unwrap(); + assert!(tx1.poll_cancel(cx).is_ready()); + assert!(tx2.poll_cancel(cx).is_ready()); + assert!(tx3.poll_cancel(cx).is_ready()); + } + })); } #[test] fn stress() { const ITER: usize = 300; - use std::sync::{Arc, Barrier}; - use std::thread; - for i in 0..ITER { let n = (i % 10) + 1; @@ -129,10 +122,7 @@ fn stress() { let mut sync = block_on_stream(queue); - let mut rx: Vec<_> = (&mut sync) - .take(n) - .map(|res| res.unwrap()) - .collect(); + let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect(); assert_eq!(rx.len(), n); @@ -151,15 +141,11 @@ fn stress() { fn panicking_future_dropped() { block_on(future::lazy(move |cx| { let mut queue = FuturesUnordered::new(); - queue.push(future::poll_fn(|_| -> Poll { - panic!() - })); + queue.push(future::poll_fn(|_| -> Poll> { panic!() })); - let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next(cx))); + let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx))); assert!(r.is_err()); assert!(queue.is_empty()); - assert_eq!(Ready(None), queue.poll_next(cx).unwrap()); - - Ok::<_, ()>(()) - })).unwrap(); + assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx)); + })); } diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs new file mode 100644 index 0000000000..46af6bff60 --- /dev/null +++ b/futures/tests/sink.rs @@ -0,0 +1,487 @@ +use futures::channel::{mpsc, oneshot}; +use futures::executor::block_on; +use futures::future::{self, Future, FutureExt, TryFutureExt}; +use futures::ready; +use futures::never::Never; +use futures::sink::{Sink, SinkErrInto, SinkExt}; +use futures::stream::{self, Stream, StreamExt}; +use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures_test::task::panic_context; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::fmt; +use std::mem; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +fn sassert_next(s: &mut S, item: S::Item) +where + S: Stream + Unpin, + S::Item: Eq + fmt::Debug, +{ + match s.poll_next_unpin(&mut panic_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(e)) => assert_eq!(e, item), + Poll::Pending => panic!("stream wasn't ready"), + } +} + +fn unwrap(x: Poll>) -> T { + match x { + Poll::Ready(Ok(x)) => x, + _ => panic!(), + } +} + +#[test] +fn either_sink() { + let mut s = if true { + Vec::::new().left_sink() + } else { + VecDeque::::new().right_sink() + }; + + Pin::new(&mut s).start_send(0).unwrap(); +} + +#[test] +fn vec_sink() { + let mut v = Vec::new(); + Pin::new(&mut v).start_send(0).unwrap(); + Pin::new(&mut v).start_send(1).unwrap(); + assert_eq!(v, vec![0, 1]); + block_on(v.flush()).unwrap(); + assert_eq!(v, vec![0, 1]); +} + +#[test] +fn vecdeque_sink() { + let mut deque = VecDeque::new(); + Pin::new(&mut deque).start_send(2).unwrap(); + Pin::new(&mut deque).start_send(3).unwrap(); + + assert_eq!(deque.pop_front(), Some(2)); + assert_eq!(deque.pop_front(), Some(3)); + assert_eq!(deque.pop_front(), None); +} + +#[test] +fn send() { + let mut v = Vec::new(); + + block_on(v.send(0)).unwrap(); + assert_eq!(v, vec![0]); + + block_on(v.send(1)).unwrap(); + assert_eq!(v, vec![0, 1]); + + block_on(v.send(2)).unwrap(); + assert_eq!(v, vec![0, 1, 2]); +} + +#[test] +fn send_all() { + let mut v = Vec::new(); + + block_on(v.send_all(&mut stream::iter(vec![0, 1]))).unwrap(); + assert_eq!(v, vec![0, 1]); + + block_on(v.send_all(&mut stream::iter(vec![2, 3]))).unwrap(); + assert_eq!(v, vec![0, 1, 2, 3]); + + block_on(v.send_all(&mut stream::iter(vec![4, 5]))).unwrap(); + assert_eq!(v, vec![0, 1, 2, 3, 4, 5]); +} + +// An Unpark struct that records unpark events for inspection +struct Flag(AtomicBool); + +impl Flag { + fn new() -> Arc { + Arc::new(Self(AtomicBool::new(false))) + } + + fn get(&self) -> bool { + self.0.load(Ordering::SeqCst) + } + + fn set(&self, v: bool) { + self.0.store(v, Ordering::SeqCst) + } +} + +impl ArcWake for Flag { + fn wake_by_ref(arc_self: &Arc) { + arc_self.set(true) + } +} + +fn flag_cx(f: F) -> R +where + F: FnOnce(Arc, &mut Context<'_>) -> R, +{ + let flag = Flag::new(); + let waker = task::waker_ref(&flag); + let cx = &mut Context::from_waker(&waker); + f(flag.clone(), cx) +} + +// Sends a value on an i32 channel sink +struct StartSendFut + Unpin, Item: Unpin>(Option, Option); + +impl + Unpin, Item: Unpin> StartSendFut { + fn new(sink: S, item: Item) -> Self { + Self(Some(sink), Some(item)) + } +} + +impl + Unpin, Item: Unpin> Future for StartSendFut { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self(inner, item) = self.get_mut(); + { + let mut inner = inner.as_mut().unwrap(); + ready!(Pin::new(&mut inner).poll_ready(cx))?; + Pin::new(&mut inner).start_send(item.take().unwrap())?; + } + Poll::Ready(Ok(inner.take().unwrap())) + } +} + +// Test that `start_send` on an `mpsc` channel does indeed block when the +// channel is full +#[test] +fn mpsc_blocking_start_send() { + let (mut tx, mut rx) = mpsc::channel::(0); + + block_on(future::lazy(|_| { + tx.start_send(0).unwrap(); + + flag_cx(|flag, cx| { + let mut task = StartSendFut::new(tx, 1); + + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.get()); + sassert_next(&mut rx, 0); + assert!(flag.get()); + flag.set(false); + unwrap(task.poll_unpin(cx)); + assert!(!flag.get()); + sassert_next(&mut rx, 1); + }) + })); +} + +// test `flush` by using `with` to make the first insertion into a sink block +// until a oneshot is completed +#[test] +fn with_flush() { + let (tx, rx) = oneshot::channel(); + let mut block = rx.boxed(); + let mut sink = Vec::new().with(|elem| { + mem::replace(&mut block, future::ok(()).boxed()) + .map_ok(move |()| elem + 1) + .map_err(|_| -> Never { panic!() }) + }); + + assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(())); + + flag_cx(|flag, cx| { + let mut task = sink.flush(); + assert!(task.poll_unpin(cx).is_pending()); + tx.send(()).unwrap(); + assert!(flag.get()); + + unwrap(task.poll_unpin(cx)); + + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[1, 2]); + }) +} + +// test simple use of with to change data +#[test] +fn with_as_map() { + let mut sink = Vec::new().with(|item| future::ok::(item * 2)); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + block_on(sink.send(2)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 2, 4]); +} + +// test simple use of with_flat_map +#[test] +fn with_flat_map() { + let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok)); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + block_on(sink.send(2)).unwrap(); + block_on(sink.send(3)).unwrap(); + assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]); +} + +// Immediately accepts all requests to start pushing, but completion is managed +// by manually flushing +struct ManualFlush { + data: Vec, + waiting_tasks: Vec, +} + +impl Sink> for ManualFlush { + type SinkError = (); + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(mut self: Pin<&mut Self>, item: Option) -> Result<(), Self::SinkError> { + if let Some(item) = item { + self.data.push(item); + } else { + self.force_flush(); + } + Ok(()) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + if self.data.is_empty() { + Poll::Ready(Ok(())) + } else { + self.waiting_tasks.push(cx.waker().clone()); + Poll::Pending + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} + +impl ManualFlush { + fn new() -> Self { + Self { + data: Vec::new(), + waiting_tasks: Vec::new(), + } + } + + fn force_flush(&mut self) -> Vec { + for task in self.waiting_tasks.drain(..) { + task.wake() + } + mem::replace(&mut self.data, Vec::new()) + } +} + +// test that the `with` sink doesn't require the underlying sink to flush, +// but doesn't claim to be flushed until the underlying sink is +#[test] +fn with_flush_propagate() { + let mut sink = ManualFlush::new().with(|x| future::ok::, ()>(x)); + flag_cx(|flag, cx| { + unwrap(Pin::new(&mut sink).poll_ready(cx)); + Pin::new(&mut sink).start_send(Some(0)).unwrap(); + unwrap(Pin::new(&mut sink).poll_ready(cx)); + Pin::new(&mut sink).start_send(Some(1)).unwrap(); + + { + let mut task = sink.flush(); + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.get()); + } + assert_eq!(sink.get_mut().force_flush(), vec![0, 1]); + assert!(flag.get()); + unwrap(sink.flush().poll_unpin(cx)); + }) +} + +// test that a buffer is a no-nop around a sink that always accepts sends +#[test] +fn buffer_noop() { + let mut sink = Vec::new().buffer(0); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); + + let mut sink = Vec::new().buffer(1); + block_on(sink.send(0)).unwrap(); + block_on(sink.send(1)).unwrap(); + assert_eq!(sink.get_ref(), &[0, 1]); +} + +struct ManualAllow { + data: Vec, + allow: Rc, +} + +struct Allow { + flag: Cell, + tasks: RefCell>, +} + +impl Allow { + fn new() -> Self { + Self { + flag: Cell::new(false), + tasks: RefCell::new(Vec::new()), + } + } + + fn check(&self, cx: &mut Context<'_>) -> bool { + if self.flag.get() { + true + } else { + self.tasks.borrow_mut().push(cx.waker().clone()); + false + } + } + + fn start(&self) { + self.flag.set(true); + let mut tasks = self.tasks.borrow_mut(); + for task in tasks.drain(..) { + task.wake(); + } + } +} + +impl Sink for ManualAllow { + type SinkError = (); + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.allow.check(cx) { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::SinkError> { + self.data.push(item); + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +fn manual_allow() -> (ManualAllow, Rc) { + let allow = Rc::new(Allow::new()); + let manual_allow = ManualAllow { + data: Vec::new(), + allow: allow.clone(), + }; + (manual_allow, allow) +} + +// test basic buffer functionality, including both filling up to capacity, +// and writing out when the underlying sink is ready +#[test] +fn buffer() { + let (sink, allow) = manual_allow::(); + let sink = sink.buffer(2); + + let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); + let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap(); + + flag_cx(|flag, cx| { + let mut task = sink.send(2); + assert!(task.poll_unpin(cx).is_pending()); + assert!(!flag.get()); + allow.start(); + assert!(flag.get()); + unwrap(task.poll_unpin(cx)); + assert_eq!(sink.get_ref().data, vec![0, 1, 2]); + }) +} + +#[test] +fn fanout_smoke() { + let sink1 = Vec::new(); + let sink2 = Vec::new(); + let mut sink = sink1.fanout(sink2); + block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]))).unwrap(); + let (sink1, sink2) = sink.into_inner(); + assert_eq!(sink1, vec![1, 2, 3]); + assert_eq!(sink2, vec![1, 2, 3]); +} + +#[test] +fn fanout_backpressure() { + let (left_send, mut left_recv) = mpsc::channel(0); + let (right_send, mut right_recv) = mpsc::channel(0); + let sink = left_send.fanout(right_send); + + let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap(); + + flag_cx(|flag, cx| { + let mut task = sink.send(2); + assert!(!flag.get()); + assert!(task.poll_unpin(cx).is_pending()); + let item = block_on(left_recv.next()); + assert_eq!(item, Some(0)); + assert!(flag.get()); + assert!(task.poll_unpin(cx).is_pending()); + let item = block_on(right_recv.next()); + assert_eq!(item, Some(0)); + assert!(flag.get()); + unwrap(task.poll_unpin(cx)); + // make sure receivers live until end of test to prevent send errors + drop(left_recv); + drop(right_recv); + }) +} + +#[test] +fn sink_map_err() { + { + let cx = &mut panic_context(); + let (tx, _rx) = mpsc::channel(1); + let mut tx = tx.sink_map_err(|_| ()); + assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); + assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!( + Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), + Err(()) + ); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct FromErrTest; + +impl From for FromErrTest { + fn from(_: mpsc::SendError) -> Self { + Self + } +} + +#[test] +fn from_err() { + { + let cx = &mut panic_context(); + let (tx, _rx) = mpsc::channel(1); + let mut tx: SinkErrInto, _, FromErrTest> = tx.sink_err_into(); + assert_eq!(Pin::new(&mut tx).start_send(()), Ok(())); + assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(()))); + } + + let tx = mpsc::channel(0).0; + assert_eq!( + Pin::new(&mut tx.sink_err_into()).start_send(()), + Err(FromErrTest) + ); +} diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index ae9cdfa56e..04c01aebd3 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -1,8 +1,393 @@ -#![feature(async_await)] +use futures::channel::{mpsc, oneshot}; +use futures::executor::{block_on, block_on_stream}; +use futures::future::{ok, err, ready, Future, FutureExt,TryFutureExt}; +use futures::sink::SinkExt; +use futures::stream::{iter, empty, poll_fn, Peekable, Stream, StreamExt, TryStreamExt}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::fmt::Debug; +use std::pin::Pin; -use futures::executor::block_on; -use futures::stream; -use futures_util::StreamExt; +fn assert_done(actual_fut: F, expected: T) +where + T: PartialEq + Debug, + F: FnOnce() -> Fut, + Fut: Future + Unpin, +{ + let output = block_on(actual_fut()); + assert_eq!(output, expected); +} + +fn sassert_empty(s: &mut S) { + match s.poll_next_unpin(&mut noop_context()) { + Poll::Ready(None) => panic!("stream is at its end"), + Poll::Ready(Some(_)) => panic!("stream had more elements"), + Poll::Pending => {} + } +} + +fn ok_list() -> impl Stream> + Send { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)) + .and_then(|tx| tx.send(Ok(2))) + .and_then(|tx| tx.send(Ok(3))) + .forget(); + rx.map(|r| r.unwrap()) +} + +fn err_list() -> impl Stream> + Send { + let (tx, rx) = mpsc::channel(1); + tx.send(Ok(1)) + .and_then(|tx| tx.send(Ok(2))) + .and_then(|tx| tx.send(Err(3))) + .forget(); + rx.map(|r| r.unwrap()) +} + +// #[test] +// fn map() { +// assert_done(|| ok_list().map_ok(|a| a + 1).collect(), Ok(vec![2, 3, 4])); +// } + +#[test] +fn map_ok() { + assert_done(|| ok_list().map_ok(|a| a + 1).collect(), Ok(vec![2, 3, 4])); +} + +#[test] +fn map_err() { + assert_done(|| err_list().map_err(|a| a + 1).collect::>(), Err(4)); +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +struct FromErrTest(u32); + +impl From for FromErrTest { + fn from(i: u32) -> FromErrTest { + FromErrTest(i) + } +} + +#[test] +fn from_err() { + assert_done(|| err_list().err_into().collect::>(), Err(FromErrTest(3))); +} + +#[test] +fn fold() { + assert_done(|| list().fold(0, |a, b| ok::(a + b)), Ok(6)); + assert_done(|| err_list().fold(0, |a, b| ok::(a + b)), Err(3)); +} + +#[test] +fn filter() { + assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2])); +} + +#[test] +fn filter_map() { + assert_done(|| list().filter_map(|x| { + ok(if x % 2 == 0 { + Some(x + 10) + } else { + None + }) + }).collect(), Ok(vec![12])); +} + +#[test] +fn and_then() { + assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); + assert_done(|| list().and_then(|a| err::(a as u32)).collect::>(), + Err(1)); +} + +#[test] +fn then() { + assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); + +} + +#[test] +fn or_else() { + assert_done(|| err_list().or_else(|a| { + ok::(a as i32) + }).collect(), Ok(vec![1, 2, 3])); +} + +#[test] +fn flatten() { + assert_done(|| list().map(|_| list()).flatten().collect(), + Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); + +} + +#[test] +fn skip() { + assert_done(|| list().skip(2).collect(), Ok(vec![3])); +} + +#[test] +fn skip_passes_errors_through() { + let mut s = block_on_stream( + iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1) + ); + assert_eq!(s.next(), Some(Err(1))); + assert_eq!(s.next(), Some(Err(2))); + assert_eq!(s.next(), Some(Ok(4))); + assert_eq!(s.next(), Some(Ok(5))); + assert_eq!(s.next(), None); +} + +#[test] +fn skip_while() { + assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), + Ok(vec![2, 3])); +} +#[test] +fn take() { + assert_done(|| list().take(2).collect(), Ok(vec![1, 2])); +} + +#[test] +fn take_while() { + assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), + Ok(vec![1, 2])); +} + +#[test] +fn take_passes_errors_through() { + let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]).take(1)); + assert_eq!(s.next(), Some(Err(1))); + assert_eq!(s.next(), Some(Err(2))); + assert_eq!(s.next(), Some(Ok(3))); + assert_eq!(s.next(), None); + + let mut s = block_on_stream(iter(vec![Ok(1), Err(2)]).take(1)); + assert_eq!(s.next(), Some(Ok(1))); + assert_eq!(s.next(), None); +} + +#[test] +fn peekable() { + assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3])); +} + +#[test] +fn fuse() { + let mut stream = block_on_stream(list().fuse()); + assert_eq!(stream.next(), Some(Ok(1))); + assert_eq!(stream.next(), Some(Ok(2))); + assert_eq!(stream.next(), Some(Ok(3))); + assert_eq!(stream.next(), None); + assert_eq!(stream.next(), None); + assert_eq!(stream.next(), None); +} + +#[test] +fn buffered() { + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::(); + let (c, d) = oneshot::channel::(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); + + let mut rx = rx.buffered(2); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); + + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::(); + let (c, d) = oneshot::channel::(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) + .forget(); + + let mut rx = rx.buffered(1); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); +} + +#[test] +fn unordered() { + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::(); + let (c, d) = oneshot::channel::(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); + + let mut rx = rx.buffer_unordered(2); + sassert_empty(&mut rx); + let mut rx = block_on_stream(rx); + c.send(3).unwrap(); + assert_eq!(rx.next(), Some(Ok(3))); + a.send(5).unwrap(); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), None); + + let (tx, rx) = mpsc::channel(1); + let (a, b) = oneshot::channel::(); + let (c, d) = oneshot::channel::(); + + tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) + .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) + .forget(); + + // We don't even get to see `c` until `a` completes. + let mut rx = rx.buffer_unordered(1); + sassert_empty(&mut rx); + c.send(3).unwrap(); + sassert_empty(&mut rx); + a.send(5).unwrap(); + let mut rx = block_on_stream(rx); + assert_eq!(rx.next(), Some(Ok(5))); + assert_eq!(rx.next(), Some(Ok(3))); + assert_eq!(rx.next(), None); +} + +#[test] +fn zip() { + assert_done(|| list().zip(list()).collect(), + Ok(vec![(1, 1), (2, 2), (3, 3)])); + assert_done(|| list().zip(list().take(2)).collect(), + Ok(vec![(1, 1), (2, 2)])); + assert_done(|| list().take(2).zip(list()).collect(), + Ok(vec![(1, 1), (2, 2)])); + assert_done(|| err_list().zip(list()).collect::>(), Err(3)); + assert_done(|| list().zip(list().map(|x| x + 1)).collect(), + Ok(vec![(1, 2), (2, 3), (3, 4)])); +} + +#[test] +fn peek() { + struct Peek { + inner: Peekable + Send>> + } + + impl Future for Peek { + type Item = (); + type Error = u32; + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(), u32> { + { + let res = ready!(self.inner.peek(cx))?; + assert_eq!(res, Some(&1)); + } + assert_eq!(self.inner.peek(cx).unwrap(), Some(&1).into()); + assert_eq!(self.inner.poll_next(cx).unwrap(), Some(1).into()); + Ok(Poll::Ready(())) + } + } + + block_on(Peek { + inner: list().peekable(), + }).unwrap() +} + +#[test] +fn wait() { + assert_eq!(block_on_stream(list()).collect::, _>>(), + Ok(vec![1, 2, 3])); +} + +#[test] +fn chunks() { + assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]])); + assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); + assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]])); + let mut list = block_on_stream(err_list().chunks(3)); + let i = list.next().unwrap().unwrap(); + assert_eq!(i, vec![1, 2]); + let i = list.next().unwrap().unwrap_err(); + assert_eq!(i, 3); +} + +#[test] +#[should_panic] +fn chunks_panic_on_cap_zero() { + let _ = list().chunks(0); +} + +#[test] +fn forward() { + let v = Vec::new(); + let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1; + assert_eq!(v, vec![0, 1]); + + let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1; + assert_eq!(v, vec![0, 1, 2, 3]); + + assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), + Ok(vec![0, 1, 2, 3, 4, 5])); +} + +#[test] +#[allow(deprecated)] +fn concat() { + let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); + assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + + let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); + assert_done(move || b.concat(), Err(())); +} + +#[test] +fn concat2() { + let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); + assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); + + let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); + assert_done(move || b.concat(), Err(())); + + let c = empty::, ()>(); + assert_done(move || c.concat(), Ok(vec![])) +} + +#[test] +fn stream_poll_fn() { + let mut counter = 5usize; + + let read_stream = poll_fn(move |_| -> Poll, std::io::Error> { + if counter == 0 { + return Ok(Poll::Ready(None)); + } + counter -= 1; + Ok(Poll::Ready(Some(counter))) + }); + + assert_eq!(block_on_stream(read_stream).count(), 5); +} + +#[test] +fn inspect() { + let mut seen = vec![]; + assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3])); + assert_eq!(seen, [1, 2, 3]); +} + +#[test] +fn inspect_err() { + let mut seen = vec![]; + assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect::>(), Err(3)); + assert_eq!(seen, [3]); +} #[test] fn select() { diff --git a/futures/tests_disabled/stream_catch_unwind.rs b/futures/tests/stream_catch_unwind.rs similarity index 53% rename from futures/tests_disabled/stream_catch_unwind.rs rename to futures/tests/stream_catch_unwind.rs index 54d7cf4b0c..8b23a0a7ef 100644 --- a/futures/tests_disabled/stream_catch_unwind.rs +++ b/futures/tests/stream_catch_unwind.rs @@ -1,27 +1,27 @@ use futures::executor::block_on_stream; -use futures::stream; +use futures::stream::{self, StreamExt}; #[test] fn panic_in_the_middle_of_the_stream() { - let stream = stream::iter_ok::<_, bool>(vec![Some(10), None, Some(11)]); + let stream = stream::iter(vec![Some(10), None, Some(11)]); // panic on second element let stream_panicking = stream.map(|o| o.unwrap()); let mut iter = block_on_stream(stream_panicking.catch_unwind()); - assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); + assert_eq!(10, iter.next().unwrap().ok().unwrap()); assert!(iter.next().unwrap().is_err()); assert!(iter.next().is_none()); } #[test] fn no_panic() { - let stream = stream::iter_ok::<_, bool>(vec![10, 11, 12]); + let stream = stream::iter(vec![10, 11, 12]); let mut iter = block_on_stream(stream.catch_unwind()); - assert_eq!(Ok(10), iter.next().unwrap().ok().unwrap()); - assert_eq!(Ok(11), iter.next().unwrap().ok().unwrap()); - assert_eq!(Ok(12), iter.next().unwrap().ok().unwrap()); + assert_eq!(10, iter.next().unwrap().ok().unwrap()); + assert_eq!(11, iter.next().unwrap().ok().unwrap()); + assert_eq!(12, iter.next().unwrap().ok().unwrap()); assert!(iter.next().is_none()); } diff --git a/futures/tests/stream_select_all.rs b/futures/tests/stream_select_all.rs index c6db7084e9..667929011f 100644 --- a/futures/tests/stream_select_all.rs +++ b/futures/tests/stream_select_all.rs @@ -1,8 +1,9 @@ #![feature(async_await)] +use futures::channel::mpsc; use futures::executor::block_on_stream; use futures::future::{self, FutureExt}; -use futures::stream::{self, FusedStream, SelectAll, StreamExt}; +use futures::stream::{self, select_all, FusedStream, SelectAll, StreamExt}; use futures::task::Poll; use futures_test::task::noop_context; @@ -48,3 +49,32 @@ fn issue_1626() { assert_eq!(s.next(), Some(14)); assert_eq!(s.next(), None); } + +#[test] +fn works_1() { + let (a_tx, a_rx) = mpsc::unbounded::(); + let (b_tx, b_rx) = mpsc::unbounded::(); + let (c_tx, c_rx) = mpsc::unbounded::(); + + let streams = vec![a_rx, b_rx, c_rx]; + + let mut stream = block_on_stream(select_all(streams)); + + b_tx.unbounded_send(99).unwrap(); + a_tx.unbounded_send(33).unwrap(); + assert_eq!(Some(33), stream.next()); + assert_eq!(Some(99), stream.next()); + + b_tx.unbounded_send(99).unwrap(); + a_tx.unbounded_send(33).unwrap(); + assert_eq!(Some(33), stream.next()); + assert_eq!(Some(99), stream.next()); + + c_tx.unbounded_send(42).unwrap(); + assert_eq!(Some(42), stream.next()); + a_tx.unbounded_send(43).unwrap(); + assert_eq!(Some(43), stream.next()); + + drop((a_tx, b_tx, c_tx)); + assert_eq!(None, stream.next()); +} diff --git a/futures/tests_disabled/all.rs b/futures/tests_disabled/all.rs deleted file mode 100644 index 6c7e11cf7b..0000000000 --- a/futures/tests_disabled/all.rs +++ /dev/null @@ -1,351 +0,0 @@ -use futures::future; -use futures::executor::block_on; -use futures::channel::oneshot::{self, Canceled}; -use std::sync::mpsc::{channel, TryRecvError}; - -mod support; -use support::*; - -fn unselect(r: Result, Either<(E, B), (E, A)>>) -> Result { - match r { - Ok(Either::Left((t, _))) | - Ok(Either::Right((t, _))) => Ok(t), - Err(Either::Left((e, _))) | - Err(Either::Right((e, _))) => Err(e), - } -} - -#[test] -fn result_smoke() { - fn is_future_v(_: C) - where A: Send + 'static, - B: Send + 'static, - C: Future - {} - - is_future_v::(f_ok(1).map(|a| a + 1)); - is_future_v::(f_ok(1).map_err(|a| a + 1)); - is_future_v::(f_ok(1).and_then(Ok)); - is_future_v::(f_ok(1).or_else(Err)); - is_future_v::<(i32, i32), u32, _>(f_ok(1).join(Err(3))); - is_future_v::(f_ok(1).map(f_ok).flatten()); - - assert_done(|| f_ok(1), r_ok(1)); - assert_done(|| f_err(1), r_err(1)); - assert_done(|| result(Ok(1)), r_ok(1)); - assert_done(|| result(Err(1)), r_err(1)); - assert_done(|| ok(1), r_ok(1)); - assert_done(|| err(1), r_err(1)); - assert_done(|| f_ok(1).map(|a| a + 2), r_ok(3)); - assert_done(|| f_err(1).map(|a| a + 2), r_err(1)); - assert_done(|| f_ok(1).map_err(|a| a + 2), r_ok(1)); - assert_done(|| f_err(1).map_err(|a| a + 2), r_err(3)); - assert_done(|| f_ok(1).and_then(|a| Ok(a + 2)), r_ok(3)); - assert_done(|| f_err(1).and_then(|a| Ok(a + 2)), r_err(1)); - assert_done(|| f_ok(1).and_then(|a| Err(a as u32 + 3)), r_err(4)); - assert_done(|| f_err(1).and_then(|a| Err(a as u32 + 4)), r_err(1)); - assert_done(|| f_ok(1).or_else(|a| Ok(a as i32 + 2)), r_ok(1)); - assert_done(|| f_err(1).or_else(|a| Ok(a as i32 + 2)), r_ok(3)); - assert_done(|| f_ok(1).or_else(|a| Err(a + 3)), r_ok(1)); - assert_done(|| f_err(1).or_else(|a| Err(a + 4)), r_err(5)); - assert_done(|| f_ok(1).select(f_err(2)).then(unselect), r_ok(1)); - assert_done(|| f_ok(1).select(Ok(2)).then(unselect), r_ok(1)); - assert_done(|| f_err(1).select(f_ok(1)).then(unselect), r_err(1)); - assert_done(|| f_ok(1).select(empty()).then(unselect), Ok(1)); - assert_done(|| empty().select(f_ok(1)).then(unselect), Ok(1)); - assert_done(|| f_ok(1).join(f_err(1)), Err(1)); - assert_done(|| f_ok(1).join(Ok(2)), Ok((1, 2))); - assert_done(|| f_err(1).join(f_ok(1)), Err(1)); - assert_done(|| f_ok(1).then(|_| Ok(2)), r_ok(2)); - assert_done(|| f_ok(1).then(|_| Err(2)), r_err(2)); - assert_done(|| f_err(1).then(|_| Ok(2)), r_ok(2)); - assert_done(|| f_err(1).then(|_| Err(2)), r_err(2)); -} - -#[test] -fn test_empty() { - fn empty() -> Empty { future::empty() } - - assert_empty(|| empty()); - assert_empty(|| empty().select(empty())); - assert_empty(|| empty().join(empty())); - assert_empty(|| empty().join(f_ok(1))); - assert_empty(|| f_ok(1).join(empty())); - assert_empty(|| empty().or_else(move |_| empty())); - assert_empty(|| empty().and_then(move |_| empty())); - assert_empty(|| f_err(1).or_else(move |_| empty())); - assert_empty(|| f_ok(1).and_then(move |_| empty())); - assert_empty(|| empty().map(|a| a + 1)); - assert_empty(|| empty().map_err(|a| a + 1)); - assert_empty(|| empty().then(|a| a)); -} - -#[test] -fn test_ok() { - assert_done(|| ok(1), r_ok(1)); - assert_done(|| err(1), r_err(1)); -} - -#[test] -fn flatten() { - fn ok(a: T) -> FutureResult { - future::ok(a) - } - fn err(b: E) -> FutureResult { - future::err(b) - } - - assert_done(|| ok(ok(1)).flatten(), r_ok(1)); - assert_done(|| ok(err(1)).flatten(), r_err(1)); - assert_done(|| err(1u32).map(ok).flatten(), r_err(1)); - assert_done(|| future::ok(future::ok(1)).flatten(), r_ok(1)); - assert_empty(|| ok(empty::()).flatten()); - assert_empty(|| empty::().map(ok).flatten()); -} - -#[test] -fn smoke_oneshot() { - assert_done(|| { - let (c, p) = oneshot::channel(); - c.send(1).unwrap(); - p - }, Ok(1)); - assert_done(|| { - let (c, p) = oneshot::channel::(); - drop(c); - p - }, Err(Canceled)); - let mut completes = Vec::new(); - assert_empty(|| { - let (a, b) = oneshot::channel::(); - completes.push(a); - b - }); - - let (c, mut p) = oneshot::channel::(); - drop(c); - let res = panic_waker_lw(|lw| p.poll(lw)); - assert!(res.is_err()); - let (c, p) = oneshot::channel::(); - drop(c); - let (tx, rx) = channel(); - p.then(move |_| { - tx.send(()) - }).forget(); - rx.recv().unwrap(); -} - -#[test] -fn select_cancels() { - let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); - let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); - - let mut f = b.select(d).then(unselect); - // assert!(f.poll(&mut Task::new()).is_pending()); - assert!(brx.try_recv().is_err()); - assert!(drx.try_recv().is_err()); - a.send(1).unwrap(); - noop_waker_lw(|lw| { - let res = f.poll(lw); - assert!(res.ok().unwrap().is_ready()); - assert_eq!(brx.recv().unwrap(), 1); - drop(c); - assert!(drx.recv().is_err()); - - let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); - let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); - - let mut f = b.select(d).then(unselect); - assert!(f.poll(lw).ok().unwrap().is_pending()); - assert!(f.poll(lw).ok().unwrap().is_pending()); - a.send(1).unwrap(); - assert!(f.poll(lw).ok().unwrap().is_ready()); - drop((c, f)); - assert!(drx.recv().is_err()); - }) -} - -#[test] -fn join_cancels() { - let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); - let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); - - let mut f = b.join(d); - drop(a); - let res = panic_waker_lw(|lw| f.poll(lw)); - assert!(res.is_err()); - drop(c); - assert!(drx.recv().is_err()); - - let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); - let ((btx, _brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |b| { btx.send(b).unwrap(); b }); - let d = d.map(move |d| { dtx.send(d).unwrap(); d }); - - let (tx, rx) = channel(); - let f = b.join(d); - f.then(move |_| { - tx.send(()).unwrap(); - let res: Result<(), ()> = Ok(()); - res - }).forget(); - assert!(rx.try_recv().is_err()); - drop(a); - rx.recv().unwrap(); - drop(c); - assert!(drx.recv().is_err()); -} - -#[test] -fn join_incomplete() { - let (a, b) = oneshot::channel::(); - let (tx, rx) = channel(); - noop_waker_lw(|lw| { - let mut f = ok(1).join(b).map(move |r| tx.send(r).unwrap()); - assert!(f.poll(lw).ok().unwrap().is_pending()); - assert!(rx.try_recv().is_err()); - a.send(2).unwrap(); - assert!(f.poll(lw).ok().unwrap().is_ready()); - assert_eq!(rx.recv().unwrap(), (1, 2)); - - let (a, b) = oneshot::channel::(); - let (tx, rx) = channel(); - let mut f = b.join(Ok(2)).map(move |r| tx.send(r).unwrap()); - assert!(f.poll(lw).ok().unwrap().is_pending()); - assert!(rx.try_recv().is_err()); - a.send(1).unwrap(); - assert!(f.poll(lw).ok().unwrap().is_ready()); - assert_eq!(rx.recv().unwrap(), (1, 2)); - - let (a, b) = oneshot::channel::(); - let (tx, rx) = channel(); - let mut f = ok(1).join(b).map_err(move |_r| tx.send(2).unwrap()); - assert!(f.poll(lw).ok().unwrap().is_pending()); - assert!(rx.try_recv().is_err()); - drop(a); - assert!(f.poll(lw).is_err()); - assert_eq!(rx.recv().unwrap(), 2); - - let (a, b) = oneshot::channel::(); - let (tx, rx) = channel(); - let mut f = b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap()); - assert!(f.poll(lw).ok().unwrap().is_pending()); - assert!(rx.try_recv().is_err()); - drop(a); - assert!(f.poll(lw).is_err()); - assert_eq!(rx.recv().unwrap(), 1); - }) -} - - -#[test] -fn select2() { - assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2)); - assert_done(|| empty().select(f_ok(2)).then(unselect), Ok(2)); - assert_done(|| f_err(2).select(empty()).then(unselect), Err(2)); - assert_done(|| empty().select(f_err(2)).then(unselect), Err(2)); - - assert_done(|| { - f_ok(1).select(f_ok(2)) - .map_err(|_| 0) - .and_then(|either_tup| { - let (a, b) = either_tup.into_inner(); - b.map(move |b| a + b) - }) - }, Ok(3)); - - // Finish one half of a select and then fail the second, ensuring that we - // get the notification of the second one. - { - let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); - let f = b.select(d); - let (tx, rx) = channel(); - f.map(move |r| tx.send(r).unwrap()).forget(); - a.send(1).unwrap(); - let (val, next) = rx.recv().unwrap().into_inner(); - assert_eq!(val, 1); - let (tx, rx) = channel(); - next.map_err(move |_r| tx.send(2).unwrap()).forget(); - assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); - drop(c); - assert_eq!(rx.recv().unwrap(), 2); - } - - // Fail the second half and ensure that we see the first one finish - { - let ((a, b), (c, d)) = (oneshot::channel::(), oneshot::channel::()); - let f = b.select(d); - let (tx, rx) = channel(); - f.map_err(move |r| tx.send((1, r.into_inner().1)).unwrap()).forget(); - drop(c); - let (val, next) = rx.recv().unwrap(); - assert_eq!(val, 1); - let (tx, rx) = channel(); - next.map(move |r| tx.send(r).unwrap()).forget(); - assert_eq!(rx.try_recv().err().unwrap(), TryRecvError::Empty); - a.send(2).unwrap(); - assert_eq!(rx.recv().unwrap(), 2); - } - - // Cancelling the first half should cancel the second - { - let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); - let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); - let f = b.select(d); - drop(f); - assert!(drx.recv().is_err()); - assert!(brx.recv().is_err()); - } - - // Cancel after a schedule - { - let ((_a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); - let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); - let mut f = b.select(d); - let _res = noop_waker_lw(|lw| f.poll(lw)); - drop(f); - assert!(drx.recv().is_err()); - assert!(brx.recv().is_err()); - } - - // Cancel propagates - { - let ((a, b), (_c, d)) = (oneshot::channel::(), oneshot::channel::()); - let ((btx, brx), (dtx, drx)) = (channel(), channel()); - let b = b.map(move |v| { btx.send(v).unwrap(); v }); - let d = d.map(move |v| { dtx.send(v).unwrap(); v }); - let (tx, rx) = channel(); - b.select(d).map(move |_| tx.send(()).unwrap()).forget(); - drop(a); - assert!(drx.recv().is_err()); - assert!(brx.recv().is_err()); - assert!(rx.recv().is_err()); - } - - // Cancel on early drop - { - let (tx, rx) = channel(); - let f = f_ok(1).select(empty::<_, ()>().map(move |()| { - tx.send(()).unwrap(); - 1 - })); - drop(f); - assert!(rx.recv().is_err()); - } -} - -#[test] -fn option() { - assert_eq!(Ok(Some(())), block_on(Some(ok::<(), ()>(())).into_future())); - assert_eq!(Ok::<_, ()>(None::<()>), block_on(None::>.into_future())); -} diff --git a/futures/tests_disabled/sink.rs b/futures/tests_disabled/sink.rs deleted file mode 100644 index b4d69e39ca..0000000000 --- a/futures/tests_disabled/sink.rs +++ /dev/null @@ -1,469 +0,0 @@ -use futures::future::ok; -use futures::stream; -use futures::channel::{oneshot, mpsc}; -use futures::task::{self, Wake, Waker}; -use futures::executor::block_on; -use futures::sink::SinkErrInto; -use std::cell::{Cell, RefCell}; -use std::collections::VecDeque; -use std::mem; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::atomic::{Ordering, AtomicBool}; - -mod support; -use support::*; - -#[test] -fn either_sink() { - let mut s = if true { - Vec::::new().left_sink() - } else { - VecDeque::::new().right_sink() - }; - - s.start_send(0).unwrap(); -} - -#[test] -fn vec_sink() { - let mut v = Vec::new(); - v.start_send(0).unwrap(); - v.start_send(1).unwrap(); - assert_eq!(v, vec![0, 1]); - assert_done(move || v.flush(), Ok(vec![0, 1])); -} - -#[test] -fn vecdeque_sink() { - let mut deque = VecDeque::new(); - deque.start_send(2).unwrap(); - deque.start_send(3).unwrap(); - - assert_eq!(deque.pop_front(), Some(2)); - assert_eq!(deque.pop_front(), Some(3)); - assert_eq!(deque.pop_front(), None); -} - -#[test] -fn send() { - let v = Vec::new(); - - let v = block_on(v.send(0)).unwrap(); - assert_eq!(v, vec![0]); - - let v = block_on(v.send(1)).unwrap(); - assert_eq!(v, vec![0, 1]); - - assert_done(move || v.send(2), - Ok(vec![0, 1, 2])); -} - -#[test] -fn send_all() { - let v = Vec::new(); - - let (v, _) = block_on(v.send_all(stream::iter_ok(vec![0, 1]))).unwrap(); - assert_eq!(v, vec![0, 1]); - - let (v, _) = block_on(v.send_all(stream::iter_ok(vec![2, 3]))).unwrap(); - assert_eq!(v, vec![0, 1, 2, 3]); - - assert_done( - move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v), - Ok(vec![0, 1, 2, 3, 4, 5])); -} - -// An Unpark struct that records unpark events for inspection -struct Flag(pub AtomicBool); - -impl Flag { - fn new() -> Arc { - Arc::new(Flag(AtomicBool::new(false))) - } - - fn get(&self) -> bool { - self.0.load(Ordering::SeqCst) - } - - fn set(&self, v: bool) { - self.0.store(v, Ordering::SeqCst) - } -} - -impl Wake for Flag { - fn wake(arc_self: &Arc) { - arc_self.set(true) - } -} - -fn flag_cx(f: F) -> R - where F: FnOnce(Arc, &mut Context<'_>) -> R -{ - let flag = Flag::new(); - let map = &mut task::LocalMap::new(); - let waker = Waker::from(flag.clone()); - let exec = &mut support::PanicExec; - - let cx = &mut Context::new(map, &waker, exec); - f(flag, cx) -} - -// Sends a value on an i32 channel sink -struct StartSendFut(Option, Option); - -impl StartSendFut { - fn new(sink: S, item: S::SinkItem) -> StartSendFut { - StartSendFut(Some(sink), Some(item)) - } -} - -impl Future for StartSendFut { - type Item = S; - type Error = S::SinkError; - - fn poll(&mut self, cx: &mut Context<'_>) -> Poll { - { - let inner = self.0.as_mut().unwrap(); - ready!(inner.poll_ready(cx))?; - inner.start_send(self.1.take().unwrap())?; - } - Ok(Poll::Ready(self.0.take().unwrap())) - } -} - -#[test] -// Test that `start_send` on an `mpsc` channel does indeed block when the -// channel is full -fn mpsc_blocking_start_send() { - let (mut tx, mut rx) = mpsc::channel::(0); - - block_on(futures::future::lazy(|_| { - tx.start_send(0).unwrap(); - - flag_cx(|flag, cx| { - let mut task = StartSendFut::new(tx, 1); - - assert!(task.poll(cx).unwrap().is_pending()); - assert!(!flag.get()); - sassert_next(&mut rx, 0); - assert!(flag.get()); - flag.set(false); - assert!(task.poll(cx).unwrap().is_ready()); - assert!(!flag.get()); - sassert_next(&mut rx, 1); - - Ok::<(), ()>(()) - }) - })).unwrap(); -} - -#[test] -// test `flush` by using `with` to make the first insertion into a sink block -// until a oneshot is completed -fn with_flush() { - let (tx, rx) = oneshot::channel(); - let mut block = Box::new(rx) as Box>; - let mut sink = Vec::new().with(|elem| { - mem::replace(&mut block, Box::new(ok(()))) - .map(move |_| elem + 1).map_err(|_| -> Never { panic!() }) - }); - - assert_eq!(sink.start_send(0), Ok(())); - - flag_cx(|flag, cx| { - let mut task = sink.flush(); - assert!(task.poll(cx).unwrap().is_pending()); - tx.send(()).unwrap(); - assert!(flag.get()); - - let sink = match task.poll(cx).unwrap() { - Poll::Ready(sink) => sink, - _ => panic!() - }; - - assert_eq!(block_on(sink.send(1)).unwrap().get_ref(), &[1, 2]); - }) -} - -#[test] -// test simple use of with to change data -fn with_as_map() { - let sink = Vec::new().with(|item| -> Result { - Ok(item * 2) - }); - let sink = block_on(sink.send(0)).unwrap(); - let sink = block_on(sink.send(1)).unwrap(); - let sink = block_on(sink.send(2)).unwrap(); - assert_eq!(sink.get_ref(), &[0, 2, 4]); -} - -#[test] -// test simple use of with_flat_map -fn with_flat_map() { - let sink = Vec::new().with_flat_map(|item| { - stream::iter_ok(vec![item; item]) - }); - let sink = block_on(sink.send(0)).unwrap(); - let sink = block_on(sink.send(1)).unwrap(); - let sink = block_on(sink.send(2)).unwrap(); - let sink = block_on(sink.send(3)).unwrap(); - assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]); -} - -// Immediately accepts all requests to start pushing, but completion is managed -// by manually flushing -struct ManualFlush { - data: Vec, - waiting_tasks: Vec, -} - -impl Sink for ManualFlush { - type SinkItem = Option; // Pass None to flush - type SinkError = (); - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<(), Self::SinkError> { - Ok(Poll::Ready(())) - } - - fn start_send(&mut self, f: Self::SinkItem) -> Result<(), Self::SinkError> { - if let Some(item) = f { - self.data.push(item); - } else { - self.force_flush(); - } - Ok(()) - } - - fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<(), Self::SinkError> { - if self.data.is_empty() { - Ok(Poll::Ready(())) - } else { - self.waiting_tasks.push(cx.waker().clone()); - Ok(Poll::Pending) - } - } - - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<(), Self::SinkError> { - self.poll_flush(cx) - } -} - -impl ManualFlush { - fn new() -> ManualFlush { - ManualFlush { - data: Vec::new(), - waiting_tasks: Vec::new() - } - } - - fn force_flush(&mut self) -> Vec { - for task in self.waiting_tasks.drain(..) { - task.wake() - } - mem::replace(&mut self.data, Vec::new()) - } -} - -#[test] -// test that the `with` sink doesn't require the underlying sink to flush, -// but doesn't claim to be flushed until the underlying sink is -fn with_flush_propagate() { - let mut sink = ManualFlush::new().with(|x| -> Result, ()> { Ok(x) }); - flag_cx(|flag, cx| { - assert!(sink.poll_ready(cx).unwrap().is_ready()); - sink.start_send(Some(0)).unwrap(); - assert!(sink.poll_ready(cx).unwrap().is_ready()); - sink.start_send(Some(1)).unwrap(); - - let mut task = sink.flush(); - assert!(task.poll(cx).unwrap().is_pending()); - assert!(!flag.get()); - assert_eq!(task.get_mut().unwrap().get_mut().force_flush(), vec![0, 1]); - assert!(flag.get()); - assert!(task.poll(cx).unwrap().is_ready()); - }) -} - -#[test] -// test that a buffer is a no-nop around a sink that always accepts sends -fn buffer_noop() { - let sink = Vec::new().buffer(0); - let sink = block_on(sink.send(0)).unwrap(); - let sink = block_on(sink.send(1)).unwrap(); - assert_eq!(sink.get_ref(), &[0, 1]); - - let sink = Vec::new().buffer(1); - let sink = block_on(sink.send(0)).unwrap(); - let sink = block_on(sink.send(1)).unwrap(); - assert_eq!(sink.get_ref(), &[0, 1]); -} - -struct ManualAllow { - data: Vec, - allow: Rc, -} - -struct Allow { - flag: Cell, - tasks: RefCell>, -} - -impl Allow { - fn new() -> Allow { - Allow { - flag: Cell::new(false), - tasks: RefCell::new(Vec::new()), - } - } - - fn check(&self, cx: &mut Context<'_>) -> bool { - if self.flag.get() { - true - } else { - self.tasks.borrow_mut().push(cx.waker().clone()); - false - } - } - - fn start(&self) { - self.flag.set(true); - let mut tasks = self.tasks.borrow_mut(); - for task in tasks.drain(..) { - task.wake(); - } - } -} - -impl Sink for ManualAllow { - type SinkItem = T; - type SinkError = Never; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<(), Self::SinkError> { - if self.allow.check(cx) { - Ok(Poll::Ready(())) - } else { - Ok(Poll::Pending) - } - } - - fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> { - self.data.push(item); - Ok(()) - } - - fn poll_flush(&mut self, _: &mut Context<'_>) -> Poll<(), Self::SinkError> { - Ok(Poll::Ready(())) - } - - fn poll_close(&mut self, _: &mut Context<'_>) -> Poll<(), Self::SinkError> { - Ok(Poll::Ready(())) - } -} - -fn manual_allow() -> (ManualAllow, Rc) { - let allow = Rc::new(Allow::new()); - let manual_allow = ManualAllow { - data: Vec::new(), - allow: allow.clone(), - }; - (manual_allow, allow) -} - -#[test] -// test basic buffer functionality, including both filling up to capacity, -// and writing out when the underlying sink is ready -fn buffer() { - let (sink, allow) = manual_allow::(); - let sink = sink.buffer(2); - - let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); - let sink = block_on(StartSendFut::new(sink, 1)).unwrap(); - - flag_cx(|flag, cx| { - let mut task = sink.send(2); - assert!(task.poll(cx).unwrap().is_pending()); - assert!(!flag.get()); - allow.start(); - assert!(flag.get()); - match task.poll(cx).unwrap() { - Poll::Ready(sink) => { - assert_eq!(sink.get_ref().data, vec![0, 1, 2]); - } - _ => panic!() - } - }) -} - -#[test] -fn fanout_smoke() { - let sink1 = Vec::new(); - let sink2 = Vec::new(); - let sink = sink1.fanout(sink2); - let stream = futures::stream::iter_ok(vec![1,2,3]); - let (sink, _) = block_on(sink.send_all(stream)).unwrap(); - let (sink1, sink2) = sink.into_inner(); - assert_eq!(sink1, vec![1,2,3]); - assert_eq!(sink2, vec![1,2,3]); -} - -#[test] -fn fanout_backpressure() { - let (left_send, left_recv) = mpsc::channel(0); - let (right_send, right_recv) = mpsc::channel(0); - let sink = left_send.fanout(right_send); - - let sink = block_on(StartSendFut::new(sink, 0)).unwrap(); - - flag_cx(|flag, cx| { - let mut task = sink.send(2); - assert!(!flag.get()); - assert!(task.poll(cx).unwrap().is_pending()); - let (item, left_recv) = block_on(left_recv.next()).unwrap(); - assert_eq!(item, Some(0)); - assert!(flag.get()); - assert!(task.poll(cx).unwrap().is_pending()); - let (item, right_recv) = block_on(right_recv.next()).unwrap(); - assert_eq!(item, Some(0)); - assert!(flag.get()); - assert!(task.poll(cx).unwrap().is_ready()); - // make sure receivers live until end of test to prevent send errors - drop(left_recv); - drop(right_recv); - }) -} - -#[test] -fn map_err() { - panic_waker_cx(|cx| { - let (tx, _rx) = mpsc::channel(1); - let mut tx = tx.sink_map_err(|_| ()); - assert_eq!(tx.start_send(()), Ok(())); - assert_eq!(tx.poll_flush(cx), Ok(Poll::Ready(()))); - }); - - let tx = mpsc::channel(0).0; - assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(())); -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -struct FromErrTest; - -impl From for FromErrTest { - fn from(_: mpsc::SendError) -> FromErrTest { - FromErrTest - } -} - -#[test] -fn from_err() { - panic_waker_cx(|cx| { - let (tx, _rx) = mpsc::channel(1); - let mut tx: SinkErrInto, FromErrTest> = tx.sink_err_into(); - assert_eq!(tx.start_send(()), Ok(())); - assert_eq!(tx.poll_flush(cx), Ok(Poll::Ready(()))); - }); - - let tx = mpsc::channel(0).0; - assert_eq!(tx.sink_err_into().start_send(()), Err(FromErrTest)); -} diff --git a/futures/tests_disabled/stream.rs b/futures/tests_disabled/stream.rs deleted file mode 100644 index 4eaf12e0d9..0000000000 --- a/futures/tests_disabled/stream.rs +++ /dev/null @@ -1,393 +0,0 @@ -use futures::executor::{block_on, block_on_stream}; -use futures::future::{err, ok}; -use futures::stream::{empty, iter_ok, poll_fn, Peekable}; -use futures::channel::oneshot; -use futures::channel::mpsc; - -mod support; -use support::*; - -pub struct Iter { - iter: I, -} - -pub fn iter(i: J) -> Iter - where J: IntoIterator>, -{ - Iter { - iter: i.into_iter(), - } -} - -impl Stream for Iter - where I: Iterator>, -{ - type Item = T; - type Error = E; - - fn poll_next(&mut self, _: &mut Context<'_>) -> Poll, E> { - match self.iter.next() { - Some(Ok(e)) => Ok(Poll::Ready(Some(e))), - Some(Err(e)) => Err(e), - None => Ok(Poll::Ready(None)), - } - } -} - -fn list() -> Box + Send> { - let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Ok(3))) - .forget(); - Box::new(rx.then(|r| r.unwrap())) -} - -fn err_list() -> Box + Send> { - let (tx, rx) = mpsc::channel(1); - tx.send(Ok(1)) - .and_then(|tx| tx.send(Ok(2))) - .and_then(|tx| tx.send(Err(3))) - .forget(); - Box::new(rx.then(|r| r.unwrap())) -} - -#[test] -fn map() { - assert_done(|| list().map(|a| a + 1).collect(), Ok(vec![2, 3, 4])); -} - -#[test] -fn map_err() { - assert_done(|| err_list().map_err(|a| a + 1).collect::>(), Err(4)); -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -struct FromErrTest(u32); - -impl From for FromErrTest { - fn from(i: u32) -> FromErrTest { - FromErrTest(i) - } -} - -#[test] -fn from_err() { - assert_done(|| err_list().err_into().collect::>(), Err(FromErrTest(3))); -} - -#[test] -fn fold() { - assert_done(|| list().fold(0, |a, b| ok::(a + b)), Ok(6)); - assert_done(|| err_list().fold(0, |a, b| ok::(a + b)), Err(3)); -} - -#[test] -fn filter() { - assert_done(|| list().filter(|a| ok(*a % 2 == 0)).collect(), Ok(vec![2])); -} - -#[test] -fn filter_map() { - assert_done(|| list().filter_map(|x| { - ok(if x % 2 == 0 { - Some(x + 10) - } else { - None - }) - }).collect(), Ok(vec![12])); -} - -#[test] -fn and_then() { - assert_done(|| list().and_then(|a| Ok(a + 1)).collect(), Ok(vec![2, 3, 4])); - assert_done(|| list().and_then(|a| err::(a as u32)).collect::>(), - Err(1)); -} - -#[test] -fn then() { - assert_done(|| list().then(|a| a.map(|e| e + 1)).collect(), Ok(vec![2, 3, 4])); - -} - -#[test] -fn or_else() { - assert_done(|| err_list().or_else(|a| { - ok::(a as i32) - }).collect(), Ok(vec![1, 2, 3])); -} - -#[test] -fn flatten() { - assert_done(|| list().map(|_| list()).flatten().collect(), - Ok(vec![1, 2, 3, 1, 2, 3, 1, 2, 3])); - -} - -#[test] -fn skip() { - assert_done(|| list().skip(2).collect(), Ok(vec![3])); -} - -#[test] -fn skip_passes_errors_through() { - let mut s = block_on_stream( - iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)]).skip(1) - ); - assert_eq!(s.next(), Some(Err(1))); - assert_eq!(s.next(), Some(Err(2))); - assert_eq!(s.next(), Some(Ok(4))); - assert_eq!(s.next(), Some(Ok(5))); - assert_eq!(s.next(), None); -} - -#[test] -fn skip_while() { - assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(), - Ok(vec![2, 3])); -} -#[test] -fn take() { - assert_done(|| list().take(2).collect(), Ok(vec![1, 2])); -} - -#[test] -fn take_while() { - assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(), - Ok(vec![1, 2])); -} - -#[test] -fn take_passes_errors_through() { - let mut s = block_on_stream(iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)]).take(1)); - assert_eq!(s.next(), Some(Err(1))); - assert_eq!(s.next(), Some(Err(2))); - assert_eq!(s.next(), Some(Ok(3))); - assert_eq!(s.next(), None); - - let mut s = block_on_stream(iter(vec![Ok(1), Err(2)]).take(1)); - assert_eq!(s.next(), Some(Ok(1))); - assert_eq!(s.next(), None); -} - -#[test] -fn peekable() { - assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3])); -} - -#[test] -fn fuse() { - let mut stream = block_on_stream(list().fuse()); - assert_eq!(stream.next(), Some(Ok(1))); - assert_eq!(stream.next(), Some(Ok(2))); - assert_eq!(stream.next(), Some(Ok(3))); - assert_eq!(stream.next(), None); - assert_eq!(stream.next(), None); - assert_eq!(stream.next(), None); -} - -#[test] -fn buffered() { - let (tx, rx) = mpsc::channel(1); - let (a, b) = oneshot::channel::(); - let (c, d) = oneshot::channel::(); - - tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); - - let mut rx = rx.buffered(2); - sassert_empty(&mut rx); - c.send(3).unwrap(); - sassert_empty(&mut rx); - a.send(5).unwrap(); - let mut rx = block_on_stream(rx); - assert_eq!(rx.next(), Some(Ok(5))); - assert_eq!(rx.next(), Some(Ok(3))); - assert_eq!(rx.next(), None); - - let (tx, rx) = mpsc::channel(1); - let (a, b) = oneshot::channel::(); - let (c, d) = oneshot::channel::(); - - tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.map_err(|_| panic!())))) - .forget(); - - let mut rx = rx.buffered(1); - sassert_empty(&mut rx); - c.send(3).unwrap(); - sassert_empty(&mut rx); - a.send(5).unwrap(); - let mut rx = block_on_stream(rx); - assert_eq!(rx.next(), Some(Ok(5))); - assert_eq!(rx.next(), Some(Ok(3))); - assert_eq!(rx.next(), None); -} - -#[test] -fn unordered() { - let (tx, rx) = mpsc::channel(1); - let (a, b) = oneshot::channel::(); - let (c, d) = oneshot::channel::(); - - tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); - - let mut rx = rx.buffer_unordered(2); - sassert_empty(&mut rx); - let mut rx = block_on_stream(rx); - c.send(3).unwrap(); - assert_eq!(rx.next(), Some(Ok(3))); - a.send(5).unwrap(); - assert_eq!(rx.next(), Some(Ok(5))); - assert_eq!(rx.next(), None); - - let (tx, rx) = mpsc::channel(1); - let (a, b) = oneshot::channel::(); - let (c, d) = oneshot::channel::(); - - tx.send(Box::new(b.recover(|_| panic!())) as Box + Send>) - .and_then(|tx| tx.send(Box::new(d.recover(|_| panic!())))) - .forget(); - - // We don't even get to see `c` until `a` completes. - let mut rx = rx.buffer_unordered(1); - sassert_empty(&mut rx); - c.send(3).unwrap(); - sassert_empty(&mut rx); - a.send(5).unwrap(); - let mut rx = block_on_stream(rx); - assert_eq!(rx.next(), Some(Ok(5))); - assert_eq!(rx.next(), Some(Ok(3))); - assert_eq!(rx.next(), None); -} - -#[test] -fn zip() { - assert_done(|| list().zip(list()).collect(), - Ok(vec![(1, 1), (2, 2), (3, 3)])); - assert_done(|| list().zip(list().take(2)).collect(), - Ok(vec![(1, 1), (2, 2)])); - assert_done(|| list().take(2).zip(list()).collect(), - Ok(vec![(1, 1), (2, 2)])); - assert_done(|| err_list().zip(list()).collect::>(), Err(3)); - assert_done(|| list().zip(list().map(|x| x + 1)).collect(), - Ok(vec![(1, 2), (2, 3), (3, 4)])); -} - -#[test] -fn peek() { - struct Peek { - inner: Peekable + Send>> - } - - impl Future for Peek { - type Item = (); - type Error = u32; - - fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(), u32> { - { - let res = ready!(self.inner.peek(cx))?; - assert_eq!(res, Some(&1)); - } - assert_eq!(self.inner.peek(cx).unwrap(), Some(&1).into()); - assert_eq!(self.inner.poll_next(cx).unwrap(), Some(1).into()); - Ok(Poll::Ready(())) - } - } - - block_on(Peek { - inner: list().peekable(), - }).unwrap() -} - -#[test] -fn wait() { - assert_eq!(block_on_stream(list()).collect::, _>>(), - Ok(vec![1, 2, 3])); -} - -#[test] -fn chunks() { - assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]])); - assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); - assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]])); - let mut list = block_on_stream(err_list().chunks(3)); - let i = list.next().unwrap().unwrap(); - assert_eq!(i, vec![1, 2]); - let i = list.next().unwrap().unwrap_err(); - assert_eq!(i, 3); -} - -#[test] -#[should_panic] -fn chunks_panic_on_cap_zero() { - let _ = list().chunks(0); -} - -#[test] -fn forward() { - let v = Vec::new(); - let v = block_on(iter_ok::<_, Never>(vec![0, 1]).forward(v)).unwrap().1; - assert_eq!(v, vec![0, 1]); - - let v = block_on(iter_ok::<_, Never>(vec![2, 3]).forward(v)).unwrap().1; - assert_eq!(v, vec![0, 1, 2, 3]); - - assert_done(move || iter_ok::<_, Never>(vec![4, 5]).forward(v).map(|(_, s)| s), - Ok(vec![0, 1, 2, 3, 4, 5])); -} - -#[test] -#[allow(deprecated)] -fn concat() { - let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); - assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); - - let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); - assert_done(move || b.concat(), Err(())); -} - -#[test] -fn concat2() { - let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]); - assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9])); - - let b = iter(vec![Ok::<_, ()>(vec![1, 2, 3]), Err(()), Ok(vec![7, 8, 9])]); - assert_done(move || b.concat(), Err(())); - - let c = empty::, ()>(); - assert_done(move || c.concat(), Ok(vec![])) -} - -#[test] -fn stream_poll_fn() { - let mut counter = 5usize; - - let read_stream = poll_fn(move |_| -> Poll, std::io::Error> { - if counter == 0 { - return Ok(Poll::Ready(None)); - } - counter -= 1; - Ok(Poll::Ready(Some(counter))) - }); - - assert_eq!(block_on_stream(read_stream).count(), 5); -} - -#[test] -fn inspect() { - let mut seen = vec![]; - assert_done(|| list().inspect(|&a| seen.push(a)).collect(), Ok(vec![1, 2, 3])); - assert_eq!(seen, [1, 2, 3]); -} - -#[test] -fn inspect_err() { - let mut seen = vec![]; - assert_done(|| err_list().inspect_err(|&a| seen.push(a)).collect::>(), Err(3)); - assert_eq!(seen, [3]); -} diff --git a/futures/tests_disabled/stream_select_all.rs b/futures/tests_disabled/stream_select_all.rs deleted file mode 100644 index b728492b41..0000000000 --- a/futures/tests_disabled/stream_select_all.rs +++ /dev/null @@ -1,35 +0,0 @@ -use futures::executor::block_on_stream; -use futures::channel::mpsc; -use futures::stream::select_all; -use std::mem; - -mod support; - -#[test] -fn works_1() { - let (a_tx, a_rx) = mpsc::unbounded::(); - let (b_tx, b_rx) = mpsc::unbounded::(); - let (c_tx, c_rx) = mpsc::unbounded::(); - - let streams = vec![a_rx, b_rx, c_rx]; - - let mut stream = block_on_stream(select_all(streams)); - - b_tx.unbounded_send(99).unwrap(); - a_tx.unbounded_send(33).unwrap(); - assert_eq!(Some(Ok(33)), stream.next()); - assert_eq!(Some(Ok(99)), stream.next()); - - b_tx.unbounded_send(99).unwrap(); - a_tx.unbounded_send(33).unwrap(); - assert_eq!(Some(Ok(33)), stream.next()); - assert_eq!(Some(Ok(99)), stream.next()); - - c_tx.unbounded_send(42).unwrap(); - assert_eq!(Some(Ok(42)), stream.next()); - a_tx.unbounded_send(43).unwrap(); - assert_eq!(Some(Ok(43)), stream.next()); - - mem::drop((a_tx, b_tx, c_tx)); - assert_eq!(None, stream.next()); -}