Skip to content

Commit

Permalink
Re-add tests that needed future::[try_]select
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e authored and cramertj committed May 20, 2019
1 parent a496d51 commit 6bb6881
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 38 deletions.
37 changes: 19 additions & 18 deletions futures/tests/futures_ordered.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, FutureExt};
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
use futures::stream::{StreamExt, FuturesOrdered};
use futures_test::task::noop_context;
use std::any::Any;

#[test]
fn works_1() {
Expand Down Expand Up @@ -56,27 +57,27 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
}

/* ToDo: This requires FutureExt::select to be implemented
#[test]
fn queue_never_unblocked() {
let (_a_tx, a_rx) = oneshot::channel::<Box<Any+Send>>();
let (b_tx, b_rx) = oneshot::channel::<Box<Any+Send>>();
let (c_tx, c_rx) = oneshot::channel::<Box<Any+Send>>();
let (_a_tx, a_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let (b_tx, b_rx) = oneshot::channel::<Box<dyn Any + Send>>();
let (c_tx, c_rx) = oneshot::channel::<Box<dyn Any + Send>>();

let mut stream = vec![
Box::new(a_rx) as Box<Future<Item = _, Error = _>>,
Box::new(b_rx.select(c_rx).then(|res| Ok(Box::new(res) as Box<Any+Send>))) as _,
Box::new(a_rx) as Box<dyn Future<Output = _> + Unpin>,
Box::new(future::try_select(b_rx, c_rx)
.map_err(|e| e.factor_first().0)
.and_then(|e| future::ok(Box::new(e) as Box<dyn Any + Send>))) as _,
].into_iter().collect::<FuturesOrdered<_>>();

with_no_spawn_context(|cx| {
for _ in 0..10 {
assert!(stream.poll_next(cx).unwrap().is_pending());
}
let cx = &mut noop_context();
for _ in 0..10 {
assert!(stream.poll_next_unpin(cx).is_pending());
}

b_tx.send(Box::new(())).unwrap();
assert!(stream.poll_next(cx).unwrap().is_pending());
c_tx.send(Box::new(())).unwrap();
assert!(stream.poll_next(cx).unwrap().is_pending());
assert!(stream.poll_next(cx).unwrap().is_pending());
})
}*/
b_tx.send(Box::new(())).unwrap();
assert!(stream.poll_next_unpin(cx).is_pending());
c_tx.send(Box::new(())).unwrap();
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}
28 changes: 13 additions & 15 deletions futures/tests/futures_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, FutureExt};
use futures::future::{self, join, Future, FutureExt};
use futures::stream::{StreamExt, FuturesUnordered};
use futures::task::Poll;
use futures_test::{assert_stream_done, assert_stream_next};
Expand Down Expand Up @@ -57,30 +57,28 @@ fn from_iterator() {
assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1,2,3]);
}

/* ToDo: This requires FutureExt::select to be implemented
#[test]
fn finished_future() {
let (_a_tx, a_rx) = oneshot::channel::<i32>();
let (b_tx, b_rx) = oneshot::channel::<i32>();
let (c_tx, c_rx) = oneshot::channel::<i32>();

let mut stream = vec![
a_rx.boxed(),
b_rx.select(c_rx).boxed(),
Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
].into_iter().collect::<FuturesUnordered<_>>();

support::with_noop_waker_context(f)(|cx| {
for _ in 0..10 {
assert!(stream.poll_next_unpin(cx).is_pending());
}
b_tx.send(12).unwrap();
assert!(stream.poll_next_unpin(cx).is_ready());
c_tx.send(3).unwrap();
assert!(stream.poll_next_unpin(cx).is_pending());
let cx = &mut noop_context();
for _ in 0..10 {
assert!(stream.poll_next_unpin(cx).is_pending());
})
}*/
}

b_tx.send(12).unwrap();
c_tx.send(3).unwrap();
assert!(stream.poll_next_unpin(cx).is_ready());
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}

#[test]
fn iter_mut_cancel() {
Expand Down
9 changes: 4 additions & 5 deletions futures/tests/shared.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::channel::oneshot;
use futures::executor::{block_on, LocalPool};
use futures::future::{self, FutureExt, LocalFutureObj};
use futures::future::{self, FutureExt, TryFutureExt, LocalFutureObj};
use futures::task::LocalSpawn;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
Expand Down Expand Up @@ -41,7 +41,6 @@ fn many_threads() {
send_shared_oneshot_and_wait_on_multiple_threads(1000);
}

/* ToDo: This requires FutureExt::select to be implemented
#[test]
fn drop_on_one_task_ok() {
let (tx, rx) = oneshot::channel::<u32>();
Expand All @@ -51,14 +50,14 @@ fn drop_on_one_task_ok() {
let (tx2, rx2) = oneshot::channel::<u32>();

let t1 = thread::spawn(|| {
let f = f1.map_err(|_| ()).map(|x| *x).select(rx2.map_err(|_| ()));
let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ()));
drop(block_on(f));
});

let (tx3, rx3) = oneshot::channel::<u32>();

let t2 = thread::spawn(|| {
let _ = block_on(f2.map(|x| tx3.send(*x).unwrap()).map_err(|_| ()));
let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ()));
});

tx2.send(11).unwrap(); // cancel `f1`
Expand All @@ -68,7 +67,7 @@ fn drop_on_one_task_ok() {
let result = block_on(rx3).unwrap();
assert_eq!(result, 42);
t2.join().unwrap();
}*/
}

#[test]
fn drop_in_poll() {
Expand Down

0 comments on commit 6bb6881

Please sign in to comment.