diff --git a/src/task_impl/mod.rs b/src/task_impl/mod.rs index a261a76aa8..5e6b97ba65 100644 --- a/src/task_impl/mod.rs +++ b/src/task_impl/mod.rs @@ -333,8 +333,8 @@ impl Spawn { /// scheduled to receive a notification when poll can be called again. /// Otherwise if `Ready` or `Err` is returned, the `Spawn` task can be /// safely destroyed. - pub fn poll_future(&mut self, unpark: Arc) -> Poll { - self.enter(&unpark, |f| f.poll()) + pub fn poll_future(&mut self, unpark: &Arc) -> Poll { + self.enter(unpark, |f| f.poll()) } /// Waits for the internal future to complete, blocking this thread's @@ -344,10 +344,11 @@ impl Spawn { /// to complete. When a future cannot make progress it will use /// `thread::park` to block the current thread. pub fn wait_future(&mut self) -> Result { - let unpark = Arc::new(ThreadUnpark::new(thread::current())); + let thread_unpark = Arc::new(ThreadUnpark::new(thread::current())); + let unpark : Arc = thread_unpark.clone(); loop { - match try!(self.poll_future(unpark.clone())) { - Async::NotReady => unpark.park(), + match try!(self.poll_future(&unpark)) { + Async::NotReady => thread_unpark.park(), Async::Ready(e) => return Ok(e), } } @@ -391,18 +392,19 @@ impl Spawn { impl Spawn { /// Like `poll_future`, except polls the underlying stream. - pub fn poll_stream(&mut self, unpark: Arc) + pub fn poll_stream(&mut self, unpark: &Arc) -> Poll, S::Error> { - self.enter(&unpark, |stream| stream.poll()) + self.enter(unpark, |stream| stream.poll()) } /// Like `wait_future`, except only waits for the next element to arrive on /// the underlying stream. pub fn wait_stream(&mut self) -> Option> { - let unpark = Arc::new(ThreadUnpark::new(thread::current())); + let thread_unpark = Arc::new(ThreadUnpark::new(thread::current())); + let unpark : Arc = thread_unpark.clone(); loop { - match self.poll_stream(unpark.clone()) { - Ok(Async::NotReady) => unpark.park(), + match self.poll_stream(&unpark) { + Ok(Async::NotReady) => thread_unpark.park(), Ok(Async::Ready(Some(e))) => return Some(Ok(e)), Ok(Async::Ready(None)) => return None, Err(e) => return Some(Err(e)), @@ -569,9 +571,9 @@ impl Run { // we are in the `POLLING`/`REPOLL` state for the mutex. unsafe { inner.mutex.start_poll(); - + let unpark : Arc = inner.clone(); loop { - match spawn.poll_future(inner.clone()) { + match spawn.poll_future(&unpark) { Ok(Async::NotReady) => {} Ok(Async::Ready(())) | Err(()) => return inner.mutex.complete(), diff --git a/tests/all.rs b/tests/all.rs index bc872bb3bb..28167e8f81 100644 --- a/tests/all.rs +++ b/tests/all.rs @@ -127,7 +127,7 @@ fn smoke_oneshot() { let (c, p) = oneshot::channel::(); drop(c); - let res = executor::spawn(p).poll_future(unpark_panic()); + let res = executor::spawn(p).poll_future(&{unpark_panic()}); assert!(res.is_err()); let (c, p) = oneshot::channel::(); drop(c); @@ -150,7 +150,7 @@ fn select_cancels() { assert!(brx.try_recv().is_err()); assert!(drx.try_recv().is_err()); a.send(1).unwrap(); - let res = executor::spawn(f).poll_future(unpark_panic()); + let res = executor::spawn(f).poll_future(&{unpark_panic()}); assert!(res.ok().unwrap().is_ready()); assert_eq!(brx.recv().unwrap(), 1); drop(c); @@ -162,10 +162,10 @@ fn select_cancels() { let d = d.map(move |d| { dtx.send(d).unwrap(); d }); let mut f = executor::spawn(b.select(d).then(unselect)); - assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready()); - assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready()); + assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready()); + assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready()); a.send(1).unwrap(); - assert!(f.poll_future(unpark_panic()).ok().unwrap().is_ready()); + assert!(f.poll_future(&{unpark_panic()}).ok().unwrap().is_ready()); drop((c, f)); assert!(drx.recv().is_err()); } @@ -179,7 +179,7 @@ fn join_cancels() { let f = b.join(d); drop(a); - let res = executor::spawn(f).poll_future(unpark_panic()); + let res = executor::spawn(f).poll_future(&{unpark_panic()}); assert!(res.is_err()); drop(c); assert!(drx.recv().is_err()); @@ -208,37 +208,37 @@ fn join_incomplete() { let (a, b) = oneshot::channel::(); let (tx, rx) = channel(); let mut f = executor::spawn(ok(1).join(b).map(move |r| tx.send(r).unwrap())); - assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready()); + assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready()); assert!(rx.try_recv().is_err()); a.send(2).unwrap(); - assert!(f.poll_future(unpark_noop()).ok().unwrap().is_ready()); + assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_ready()); assert_eq!(rx.recv().unwrap(), (1, 2)); let (a, b) = oneshot::channel::(); let (tx, rx) = channel(); let mut f = executor::spawn(b.join(Ok(2)).map(move |r| tx.send(r).unwrap())); - assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready()); + assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready()); assert!(rx.try_recv().is_err()); a.send(1).unwrap(); - assert!(f.poll_future(unpark_noop()).ok().unwrap().is_ready()); + assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_ready()); assert_eq!(rx.recv().unwrap(), (1, 2)); let (a, b) = oneshot::channel::(); let (tx, rx) = channel(); let mut f = executor::spawn(ok(1).join(b).map_err(move |_r| tx.send(2).unwrap())); - assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready()); + assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready()); assert!(rx.try_recv().is_err()); drop(a); - assert!(f.poll_future(unpark_noop()).is_err()); + assert!(f.poll_future(&{unpark_noop()}).is_err()); assert_eq!(rx.recv().unwrap(), 2); let (a, b) = oneshot::channel::(); let (tx, rx) = channel(); let mut f = executor::spawn(b.join(Ok(2)).map_err(move |_r| tx.send(1).unwrap())); - assert!(f.poll_future(unpark_noop()).ok().unwrap().is_not_ready()); + assert!(f.poll_future(&{unpark_noop()}).ok().unwrap().is_not_ready()); assert!(rx.try_recv().is_err()); drop(a); - assert!(f.poll_future(unpark_noop()).is_err()); + assert!(f.poll_future(&{unpark_noop()}).is_err()); assert_eq!(rx.recv().unwrap(), 1); } @@ -323,7 +323,7 @@ fn select2() { let b = b.map(move |v| { btx.send(v).unwrap(); v }); let d = d.map(move |v| { dtx.send(v).unwrap(); v }); let f = b.select(d); - drop(executor::spawn(f).poll_future(support::unpark_noop())); + drop(executor::spawn(f).poll_future(&{support::unpark_noop()})); assert!(drx.recv().is_err()); assert!(brx.recv().is_err()); } diff --git a/tests/bilock.rs b/tests/bilock.rs index 1f94397ec5..43e77485b1 100644 --- a/tests/bilock.rs +++ b/tests/bilock.rs @@ -39,7 +39,7 @@ fn smoke() { }); assert!(executor::spawn(future) - .poll_future(unpark_noop()) + .poll_future(&{unpark_noop()}) .expect("failure in poll") .is_ready()); } diff --git a/tests/fuse.rs b/tests/fuse.rs index cffb8be92d..905837d3a0 100644 --- a/tests/fuse.rs +++ b/tests/fuse.rs @@ -9,6 +9,6 @@ use support::*; #[test] fn fuse() { let mut future = executor::spawn(ok::(2).fuse()); - assert!(future.poll_future(unpark_panic()).unwrap().is_ready()); - assert!(future.poll_future(unpark_panic()).unwrap().is_not_ready()); + assert!(future.poll_future(&{unpark_panic()}).unwrap().is_ready()); + assert!(future.poll_future(&{unpark_panic()}).unwrap().is_not_ready()); } diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs index d877f03b25..c1a35263d1 100644 --- a/tests/futures_unordered.rs +++ b/tests/futures_unordered.rs @@ -38,9 +38,9 @@ fn works_2() { let mut spawn = futures::executor::spawn(stream); a_tx.send(33).unwrap(); b_tx.send(33).unwrap(); - assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_ready()); + assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_ready()); c_tx.send(33).unwrap(); - assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_ready()); + assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_ready()); } #[test] @@ -56,13 +56,13 @@ fn finished_future_ok() { let mut spawn = futures::executor::spawn(stream); for _ in 0..10 { - assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_not_ready()); + assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_not_ready()); } b_tx.send(Box::new(())).unwrap(); - let next = spawn.poll_stream(support::unpark_noop()).unwrap(); + let next = spawn.poll_stream(&{support::unpark_noop()}).unwrap(); assert!(next.is_ready()); c_tx.send(Box::new(())).unwrap(); - assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_not_ready()); - assert!(spawn.poll_stream(support::unpark_noop()).unwrap().is_not_ready()); + assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_not_ready()); + assert!(spawn.poll_stream(&{support::unpark_noop()}).unwrap().is_not_ready()); } diff --git a/tests/oneshot.rs b/tests/oneshot.rs index 405906c106..e0fb57947a 100644 --- a/tests/oneshot.rs +++ b/tests/oneshot.rs @@ -21,7 +21,7 @@ fn smoke_poll() { assert!(tx.poll_cancel().unwrap().is_ready()); ok::<(), ()>(()) })); - assert!(task.poll_future(unpark_noop()).unwrap().is_ready()); + assert!(task.poll_future(&{unpark_noop()}).unwrap().is_ready()); } #[test] diff --git a/tests/sink.rs b/tests/sink.rs index 351cfca76e..d04097f9b5 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -113,14 +113,15 @@ fn mpsc_blocking_start_send() { assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready); let flag = Flag::new(); + let unpark : Arc = flag.clone(); let mut task = executor::spawn(StartSendFut::new(tx, 1)); - assert!(task.poll_future(flag.clone()).unwrap().is_not_ready()); + assert!(task.poll_future(&unpark).unwrap().is_not_ready()); assert!(!flag.get()); sassert_next(&mut rx, 0); assert!(flag.get()); flag.set(false); - assert!(task.poll_future(flag.clone()).unwrap().is_ready()); + assert!(task.poll_future(&unpark).unwrap().is_ready()); assert!(!flag.get()); sassert_next(&mut rx, 1); @@ -142,12 +143,13 @@ fn with_flush() { assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready)); let flag = Flag::new(); + let unpark : Arc = flag.clone(); let mut task = executor::spawn(sink.flush()); - assert!(task.poll_future(flag.clone()).unwrap().is_not_ready()); + assert!(task.poll_future(&unpark).unwrap().is_not_ready()); tx.send(()).unwrap(); assert!(flag.get()); - let sink = match task.poll_future(flag.clone()).unwrap() { + let sink = match task.poll_future(&unpark).unwrap() { Async::Ready(sink) => sink, _ => panic!() }; @@ -226,12 +228,13 @@ fn with_flush_propagate() { assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready); let flag = Flag::new(); + let unpark : Arc = flag.clone(); let mut task = executor::spawn(sink.flush()); - assert!(task.poll_future(flag.clone()).unwrap().is_not_ready()); + assert!(task.poll_future(&unpark).unwrap().is_not_ready()); assert!(!flag.get()); assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]); assert!(flag.get()); - assert!(task.poll_future(flag.clone()).unwrap().is_ready()); + assert!(task.poll_future(&unpark).unwrap().is_ready()); } #[test] @@ -326,12 +329,13 @@ fn buffer() { let sink = StartSendFut::new(sink, 1).wait().unwrap(); let flag = Flag::new(); + let unpark : Arc = flag.clone(); let mut task = executor::spawn(sink.send(2)); - assert!(task.poll_future(flag.clone()).unwrap().is_not_ready()); + assert!(task.poll_future(&unpark).unwrap().is_not_ready()); assert!(!flag.get()); allow.start(); assert!(flag.get()); - match task.poll_future(flag.clone()).unwrap() { + match task.poll_future(&unpark).unwrap() { Async::Ready(sink) => { assert_eq!(sink.get_ref().data, vec![0, 1, 2]); } diff --git a/tests/support/local_executor.rs b/tests/support/local_executor.rs index 8fc0d977c4..bd4dffb153 100644 --- a/tests/support/local_executor.rs +++ b/tests/support/local_executor.rs @@ -72,7 +72,7 @@ impl Core { let task = self.unpark.recv().unwrap(); // Safe to unwrap because self.unpark_send keeps the channel alive let unpark = Arc::new(Unpark { task: task, send: Mutex::new(self.unpark_send.clone()), }); let mut task = if let hash_map::Entry::Occupied(x) = self.live.entry(task) { x } else { return }; - let result = task.get_mut().poll_future(unpark); + let result = task.get_mut().poll_future(&{unpark}); match result { Ok(Async::Ready(())) => { task.remove(); } Err(()) => { task.remove(); } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index df4c8c60ac..16d076d79e 100755 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -27,11 +27,11 @@ pub fn assert_done(f: F, result: Result) } pub fn assert_empty T>(mut f: F) { - assert!(executor::spawn(f()).poll_future(unpark_panic()).ok().unwrap().is_not_ready()); + assert!(executor::spawn(f()).poll_future(&{unpark_panic()}).ok().unwrap().is_not_ready()); } pub fn sassert_done(s: &mut S) { - match executor::spawn(s).poll_stream(unpark_panic()) { + match executor::spawn(s).poll_stream(&{unpark_panic()}) { Ok(Async::Ready(None)) => {} Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), Ok(Async::NotReady) => panic!("stream wasn't ready"), @@ -40,7 +40,7 @@ pub fn sassert_done(s: &mut S) { } pub fn sassert_empty(s: &mut S) { - match executor::spawn(s).poll_stream(unpark_noop()) { + match executor::spawn(s).poll_stream(&{unpark_noop()}) { Ok(Async::Ready(None)) => panic!("stream is at its end"), Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), Ok(Async::NotReady) => {} @@ -51,7 +51,7 @@ pub fn sassert_empty(s: &mut S) { pub fn sassert_next(s: &mut S, item: S::Item) where S::Item: Eq + fmt::Debug { - match executor::spawn(s).poll_stream(unpark_panic()) { + match executor::spawn(s).poll_stream(&{unpark_panic()}) { Ok(Async::Ready(None)) => panic!("stream is at its end"), Ok(Async::Ready(Some(e))) => assert_eq!(e, item), Ok(Async::NotReady) => panic!("stream wasn't ready"), @@ -62,7 +62,7 @@ pub fn sassert_next(s: &mut S, item: S::Item) pub fn sassert_err(s: &mut S, err: S::Error) where S::Error: Eq + fmt::Debug { - match executor::spawn(s).poll_stream(unpark_panic()) { + match executor::spawn(s).poll_stream(&{unpark_panic()}) { Ok(Async::Ready(None)) => panic!("stream is at its end"), Ok(Async::Ready(Some(_))) => panic!("stream had more elements"), Ok(Async::NotReady) => panic!("stream wasn't ready"), @@ -129,4 +129,3 @@ pub fn delay_future(f: F) -> DelayFuture { DelayFuture(f.into_future(), false) } -