Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/task_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ impl<F: Future> Spawn<F> {
/// scheduled to receive a notification when poll can be called again.
/// Otherwise if `Ready` or `Err` is returned, the `Spawn` task can be
/// safely destroyed.
pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> {
self.enter(&unpark, |f| f.poll())
pub fn poll_future(&mut self, unpark: &Arc<Unpark>) -> Poll<F::Item, F::Error> {
self.enter(unpark, |f| f.poll())
}

/// Waits for the internal future to complete, blocking this thread's
Expand All @@ -344,10 +344,11 @@ impl<F: Future> Spawn<F> {
/// to complete. When a future cannot make progress it will use
/// `thread::park` to block the current thread.
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
let thread_unpark = Arc::new(ThreadUnpark::new(thread::current()));
let unpark : Arc<Unpark> = thread_unpark.clone();
loop {
match try!(self.poll_future(unpark.clone())) {
Async::NotReady => unpark.park(),
match try!(self.poll_future(&unpark)) {
Async::NotReady => thread_unpark.park(),
Async::Ready(e) => return Ok(e),
}
}
Expand Down Expand Up @@ -391,18 +392,19 @@ impl<F: Future> Spawn<F> {

impl<S: Stream> Spawn<S> {
/// Like `poll_future`, except polls the underlying stream.
pub fn poll_stream(&mut self, unpark: Arc<Unpark>)
pub fn poll_stream(&mut self, unpark: &Arc<Unpark>)
-> Poll<Option<S::Item>, S::Error> {
self.enter(&unpark, |stream| stream.poll())
self.enter(unpark, |stream| stream.poll())
}

/// Like `wait_future`, except only waits for the next element to arrive on
/// the underlying stream.
pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
let thread_unpark = Arc::new(ThreadUnpark::new(thread::current()));
let unpark : Arc<Unpark> = thread_unpark.clone();
loop {
match self.poll_stream(unpark.clone()) {
Ok(Async::NotReady) => unpark.park(),
match self.poll_stream(&unpark) {
Ok(Async::NotReady) => thread_unpark.park(),
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
Ok(Async::Ready(None)) => return None,
Err(e) => return Some(Err(e)),
Expand Down Expand Up @@ -569,9 +571,9 @@ impl Run {
// we are in the `POLLING`/`REPOLL` state for the mutex.
unsafe {
inner.mutex.start_poll();

let unpark : Arc<Unpark> = inner.clone();
loop {
match spawn.poll_future(inner.clone()) {
match spawn.poll_future(&unpark) {
Ok(Async::NotReady) => {}
Ok(Async::Ready(())) |
Err(()) => return inner.mutex.complete(),
Expand Down
30 changes: 15 additions & 15 deletions tests/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ fn smoke_oneshot() {

let (c, p) = oneshot::channel::<i32>();
drop(c);
let res = executor::spawn(p).poll_future(unpark_panic());
let res = executor::spawn(p).poll_future(&{unpark_panic()});
assert!(res.is_err());
let (c, p) = oneshot::channel::<i32>();
drop(c);
Expand All @@ -150,7 +150,7 @@ fn select_cancels() {
assert!(brx.try_recv().is_err());
assert!(drx.try_recv().is_err());
a.send(1).unwrap();
let res = executor::spawn(f).poll_future(unpark_panic());
let res = executor::spawn(f).poll_future(&{unpark_panic()});
assert!(res.ok().unwrap().is_ready());
assert_eq!(brx.recv().unwrap(), 1);
drop(c);
Expand All @@ -162,10 +162,10 @@ fn select_cancels() {
let d = d.map(move |d| { dtx.send(d).unwrap(); d });

let mut f = executor::spawn(b.select(d).then(unselect));
assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready());
assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready());
assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready());
assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready());
a.send(1).unwrap();
assert!(f.poll_future(unpark_panic()).ok().unwrap().is_ready());
assert!(f.poll_future(&{unpark_panic()}).ok().unwrap().is_ready());
drop((c, f));
assert!(drx.recv().is_err());
}
Expand All @@ -179,7 +179,7 @@ fn join_cancels() {

let f = b.join(d);
drop(a);
let res = executor::spawn(f).poll_future(unpark_panic());
let res = executor::spawn(f).poll_future(&{unpark_panic()});
assert!(res.is_err());
drop(c);
assert!(drx.recv().is_err());
Expand Down Expand Up @@ -208,37 +208,37 @@ fn join_incomplete() {
let (a, b) = oneshot::channel::<i32>();
let (tx, rx) = channel();
let mut f = executor::spawn(ok(1).join(b).map(move |r| tx.send(r).unwrap()));
assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready());
assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready());
assert!(rx.try_recv().is_err());
a.send(2).unwrap();
assert!(f.poll_future(unpark_noop()).ok().unwrap().is_ready());
assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_ready());
assert_eq!(rx.recv().unwrap(), (1, 2));

let (a, b) = oneshot::channel::<i32>();
let (tx, rx) = channel();
let mut f = executor::spawn(b.join(Ok(2)).map(move |r| tx.send(r).unwrap()));
assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready());
assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready());
assert!(rx.try_recv().is_err());
a.send(1).unwrap();
assert!(f.poll_future(unpark_noop()).ok().unwrap().is_ready());
assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_ready());
assert_eq!(rx.recv().unwrap(), (1, 2));

let (a, b) = oneshot::channel::<i32>();
let (tx, rx) = channel();
let mut f = executor::spawn(ok(1).join(b).map_err(move |_r| tx.send(2).unwrap()));
assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready());
assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready());
assert!(rx.try_recv().is_err());
drop(a);
assert!(f.poll_future(unpark_noop()).is_err());
assert!(f.poll_future(&{unpark_noop()}).is_err());
assert_eq!(rx.recv().unwrap(), 2);

let (a, b) = oneshot::channel::<i32>();
let (tx, rx) = channel();
let mut f = executor::spawn(b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap()));
assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready());
assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready());
assert!(rx.try_recv().is_err());
drop(a);
assert!(f.poll_future(unpark_noop()).is_err());
assert!(f.poll_future(&{unpark_noop()}).is_err());
assert_eq!(rx.recv().unwrap(), 1);
}

Expand Down Expand Up @@ -323,7 +323,7 @@ fn select2() {
let b = b.map(move |v| { btx.send(v).unwrap(); v });
let d = d.map(move |v| { dtx.send(v).unwrap(); v });
let f = b.select(d);
drop(executor::spawn(f).poll_future(support::unpark_noop()));
drop(executor::spawn(f).poll_future(&{support::unpark_noop()}));
assert!(drx.recv().is_err());
assert!(brx.recv().is_err());
}
Expand Down
2 changes: 1 addition & 1 deletion tests/bilock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn smoke() {
});

assert!(executor::spawn(future)
.poll_future(unpark_noop())
.poll_future(&{unpark_noop()})
.expect("failure in poll")
.is_ready());
}
Expand Down
4 changes: 2 additions & 2 deletions tests/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ use support::*;
#[test]
fn fuse() {
let mut future = executor::spawn(ok::<i32, u32>(2).fuse());
assert!(future.poll_future(unpark_panic()).unwrap().is_ready());
assert!(future.poll_future(unpark_panic()).unwrap().is_not_ready());
assert!(future.poll_future(&{unpark_panic()}).unwrap().is_ready());
assert!(future.poll_future(&{unpark_panic()}).unwrap().is_not_ready());
}
12 changes: 6 additions & 6 deletions tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ fn works_2() {
let mut spawn = futures::executor::spawn(stream);
a_tx.send(33).unwrap();
b_tx.send(33).unwrap();
assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_ready());
assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_ready());
c_tx.send(33).unwrap();
assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_ready());
assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_ready());
}

#[test]
Expand All @@ -56,13 +56,13 @@ fn finished_future_ok() {

let mut spawn = futures::executor::spawn(stream);
for _ in 0..10 {
assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_not_ready());
assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_not_ready());
}

b_tx.send(Box::new(())).unwrap();
let next = spawn.poll_stream(support::unpark_noop()).unwrap();
let next = spawn.poll_stream(&{support::unpark_noop()}).unwrap();
assert!(next.is_ready());
c_tx.send(Box::new(())).unwrap();
assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_not_ready());
assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_not_ready());
assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_not_ready());
assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_not_ready());
}
2 changes: 1 addition & 1 deletion tests/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn smoke_poll() {
assert!(tx.poll_cancel().unwrap().is_ready());
ok::<(), ()>(())
}));
assert!(task.poll_future(unpark_noop()).unwrap().is_ready());
assert!(task.poll_future(&{unpark_noop()}).unwrap().is_ready());
}

#[test]
Expand Down
20 changes: 12 additions & 8 deletions tests/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,15 @@ fn mpsc_blocking_start_send() {
assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready);

let flag = Flag::new();
let unpark : Arc<Unpark> = flag.clone();
let mut task = executor::spawn(StartSendFut::new(tx, 1));

assert!(task.poll_future(flag.clone()).unwrap().is_not_ready());
assert!(task.poll_future(&unpark).unwrap().is_not_ready());
assert!(!flag.get());
sassert_next(&mut rx, 0);
assert!(flag.get());
flag.set(false);
assert!(task.poll_future(flag.clone()).unwrap().is_ready());
assert!(task.poll_future(&unpark).unwrap().is_ready());
assert!(!flag.get());
sassert_next(&mut rx, 1);

Expand All @@ -142,12 +143,13 @@ fn with_flush() {
assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready));

let flag = Flag::new();
let unpark : Arc<Unpark> = flag.clone();
let mut task = executor::spawn(sink.flush());
assert!(task.poll_future(flag.clone()).unwrap().is_not_ready());
assert!(task.poll_future(&unpark).unwrap().is_not_ready());
tx.send(()).unwrap();
assert!(flag.get());

let sink = match task.poll_future(flag.clone()).unwrap() {
let sink = match task.poll_future(&unpark).unwrap() {
Async::Ready(sink) => sink,
_ => panic!()
};
Expand Down Expand Up @@ -226,12 +228,13 @@ fn with_flush_propagate() {
assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready);

let flag = Flag::new();
let unpark : Arc<Unpark> = flag.clone();
let mut task = executor::spawn(sink.flush());
assert!(task.poll_future(flag.clone()).unwrap().is_not_ready());
assert!(task.poll_future(&unpark).unwrap().is_not_ready());
assert!(!flag.get());
assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]);
assert!(flag.get());
assert!(task.poll_future(flag.clone()).unwrap().is_ready());
assert!(task.poll_future(&unpark).unwrap().is_ready());
}

#[test]
Expand Down Expand Up @@ -326,12 +329,13 @@ fn buffer() {
let sink = StartSendFut::new(sink, 1).wait().unwrap();

let flag = Flag::new();
let unpark : Arc<Unpark> = flag.clone();
let mut task = executor::spawn(sink.send(2));
assert!(task.poll_future(flag.clone()).unwrap().is_not_ready());
assert!(task.poll_future(&unpark).unwrap().is_not_ready());
assert!(!flag.get());
allow.start();
assert!(flag.get());
match task.poll_future(flag.clone()).unwrap() {
match task.poll_future(&unpark).unwrap() {
Async::Ready(sink) => {
assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/support/local_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Core {
let task = self.unpark.recv().unwrap(); // Safe to unwrap because self.unpark_send keeps the channel alive
let unpark = Arc::new(Unpark { task: task, send: Mutex::new(self.unpark_send.clone()), });
let mut task = if let hash_map::Entry::Occupied(x) = self.live.entry(task) { x } else { return };
let result = task.get_mut().poll_future(unpark);
let result = task.get_mut().poll_future(&{unpark});
match result {
Ok(Async::Ready(())) => { task.remove(); }
Err(()) => { task.remove(); }
Expand Down
11 changes: 5 additions & 6 deletions tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ pub fn assert_done<T, F>(f: F, result: Result<T::Item, T::Error>)
}

pub fn assert_empty<T: Future, F: FnMut() -> T>(mut f: F) {
assert!(executor::spawn(f()).poll_future(unpark_panic()).ok().unwrap().is_not_ready());
assert!(executor::spawn(f()).poll_future(&{unpark_panic()}).ok().unwrap().is_not_ready());
}

pub fn sassert_done<S: Stream>(s: &mut S) {
match executor::spawn(s).poll_stream(unpark_panic()) {
match executor::spawn(s).poll_stream(&{unpark_panic()}) {
Ok(Async::Ready(None)) => {}
Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
Ok(Async::NotReady) => panic!("stream wasn't ready"),
Expand All @@ -40,7 +40,7 @@ pub fn sassert_done<S: Stream>(s: &mut S) {
}

pub fn sassert_empty<S: Stream>(s: &mut S) {
match executor::spawn(s).poll_stream(unpark_noop()) {
match executor::spawn(s).poll_stream(&{unpark_noop()}) {
Ok(Async::Ready(None)) => panic!("stream is at its end"),
Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
Ok(Async::NotReady) => {}
Expand All @@ -51,7 +51,7 @@ pub fn sassert_empty<S: Stream>(s: &mut S) {
pub fn sassert_next<S: Stream>(s: &mut S, item: S::Item)
where S::Item: Eq + fmt::Debug
{
match executor::spawn(s).poll_stream(unpark_panic()) {
match executor::spawn(s).poll_stream(&{unpark_panic()}) {
Ok(Async::Ready(None)) => panic!("stream is at its end"),
Ok(Async::Ready(Some(e))) => assert_eq!(e, item),
Ok(Async::NotReady) => panic!("stream wasn't ready"),
Expand All @@ -62,7 +62,7 @@ pub fn sassert_next<S: Stream>(s: &mut S, item: S::Item)
pub fn sassert_err<S: Stream>(s: &mut S, err: S::Error)
where S::Error: Eq + fmt::Debug
{
match executor::spawn(s).poll_stream(unpark_panic()) {
match executor::spawn(s).poll_stream(&{unpark_panic()}) {
Ok(Async::Ready(None)) => panic!("stream is at its end"),
Ok(Async::Ready(Some(_))) => panic!("stream had more elements"),
Ok(Async::NotReady) => panic!("stream wasn't ready"),
Expand Down Expand Up @@ -129,4 +129,3 @@ pub fn delay_future<F>(f: F) -> DelayFuture<F::Future>
{
DelayFuture(f.into_future(), false)
}