Skip to content

Commit ba1182c

Browse files
committed
Take &Unpark + Clone in poll methods
Hide UnparkHandle from the public API. Take &Unpark + Clone in poll methods of Spawn. Move the 'static bound to the Unpark trait. Also remove MaxUnparkBytes512 and MaxUnparkBytes1024, it's unlikely that someone will use those.
1 parent d588ffc commit ba1182c

File tree

13 files changed

+152
-141
lines changed

13 files changed

+152
-141
lines changed

Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ with-deprecated = []
2626
default = ["use_std", "with-deprecated"]
2727
MaxUnparkBytes128 = []
2828
MaxUnparkBytes256 = []
29-
MaxUnparkBytes512 = []
30-
MaxUnparkBytes1024 = []
3129

3230
[workspace]
3331
members = ["futures-cpupool"]

src/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
//!
88
//! [online]: https://tokio.rs/docs/going-deeper/tasks/
99
10-
pub use task_impl::{Spawn, spawn, Unpark, Executor, Run, UnparkHandle, MAX_UNPARK_BYTES};
10+
pub use task_impl::{Spawn, spawn, Unpark, Executor, Run, MAX_UNPARK_BYTES};

src/future/shared.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
//! ```
1515
1616
use {Future, Poll, Async};
17-
use executor::{self, Spawn, Unpark, UnparkHandle};
17+
use executor::{self, Spawn, Unpark};
1818
use task::{self, Task};
1919

2020
use std::{fmt, mem, ops};
@@ -146,9 +146,6 @@ impl<F> Future for Shared<F>
146146
_ => unreachable!(),
147147
}
148148

149-
// Get a handle to the unparker
150-
let unpark = UnparkHandle::new(self.inner.unparker.clone());
151-
152149
loop {
153150
struct Reset<'a>(&'a AtomicUsize);
154151

@@ -165,7 +162,7 @@ impl<F> Future for Shared<F>
165162
let _reset = Reset(&self.inner.unparker.state);
166163

167164
// Poll the future
168-
match unsafe { (*self.inner.future.get()).as_mut().unwrap().poll_future(&unpark) } {
165+
match unsafe { (*self.inner.future.get()).as_mut().unwrap().poll_future(&self.inner.unparker) } {
169166
Ok(Async::NotReady) => {
170167
// Not ready, try to release the handle
171168
match self.inner.unparker.state.compare_and_swap(POLLING, IDLE, SeqCst) {

src/task_impl/mod.rs

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,18 @@ use future::BoxFuture;
1313
mod unpark_mutex;
1414
use self::unpark_mutex::UnparkMutex;
1515
mod unpark_handle;
16+
use self::unpark_handle::{UnparkHandle, UnparkObj};
1617
mod task_rc;
1718
mod data;
1819
#[allow(deprecated)]
1920
#[cfg(feature = "with-deprecated")]
2021
pub use self::task_rc::TaskRc;
2122
pub use self::data::LocalKey;
22-
pub use self::unpark_handle::UnparkHandle;
2323
pub use self::unpark_handle::MAX_UNPARK_BYTES;
2424

2525
struct BorrowedTask<'a> {
2626
id: usize,
27-
unpark: &'a UnparkHandle,
27+
unpark: UnparkHandle<'a>,
2828
map: &'a data::LocalMap,
2929
events: Events,
3030
}
@@ -83,7 +83,7 @@ fn with<F: FnOnce(&BorrowedTask) -> R, R>(f: F) -> R {
8383
#[derive(Clone)]
8484
pub struct Task {
8585
id: usize,
86-
unpark: UnparkHandle,
86+
unpark: UnparkObj,
8787
events: Events,
8888
}
8989

@@ -123,7 +123,7 @@ pub fn park() -> Task {
123123
Task {
124124
id: task.id,
125125
events: task.events.clone(),
126-
unpark: task.unpark.clone(),
126+
unpark: UnparkObj::from(task.unpark),
127127
}
128128
})
129129
}
@@ -281,6 +281,33 @@ impl Events {
281281
/// can be blocked indefinitely until a notification arrives. This can be used
282282
/// with either futures or streams, with different methods being available on
283283
/// `Spawn` depending which is used.
284+
///
285+
/// # Notes on `unpark` parameters
286+
///
287+
/// A `Task` handle returned by `task::park` will contain a fresh
288+
/// clone of the `unpark` argument provided. Depending on your use case,
289+
/// you may want to wrap your `unpark` value in an `Arc`.
290+
///
291+
/// If you use inner mutability in your `unpark`, then you should carefully
292+
/// consider what happens when it is cloned and what is the behaviour you want.
293+
/// Inner mutability aside, the only difference between using or not an `Arc`
294+
/// should be performance. An `Arc` will cost an allocation upfront and updates
295+
/// an atomic ref count on park, while no `Arc` has no upfront cost but will
296+
/// cost a clone on park. The best strategy depends on how often your futures
297+
/// park and how costly it is to clone `unpark`.
298+
///
299+
/// ## Panics
300+
///
301+
/// Many `Spawn` methods take an `unpark` parameter, notably the polling
302+
/// methods. If the size of `unpark` is too large, these methods will panic.
303+
/// If you get a panic either wrap `unpark` in an `Arc` or use the
304+
/// MaxUnparkBytes features to increase the maximum size.
305+
/// The panic depends only on the type and not the value, so you
306+
/// should not fear unexpected panics, simply testing the function
307+
/// for each type of `unpark` used will suffice. If you need to check
308+
/// whether your `unpark` is too large, use the `MAX_UNPARK_BYTES` constant.
309+
/// if `mem::size_of<U>() <= MAX_UNPARK_BYTES` where `U` is the type of
310+
/// your unpark, then you are good.
284311
pub struct Spawn<T> {
285312
obj: T,
286313
id: usize,
@@ -328,15 +355,16 @@ impl<F: Future> Spawn<F> {
328355
/// This method will poll the internal future, testing if it's completed
329356
/// yet. The `unpark` argument is used as a sink for notifications sent to
330357
/// this future. That is, while the future is being polled, any call to
331-
/// `task::park()` will return a handle that contains the `unpark`
332-
/// specified.
358+
/// `task::park()` will return a handle that contains a fresh clone of
359+
/// the `unpark` specified. You may want to wrap `unpark` in an `Arc`.
360+
/// See `Spawn` documentation for more.
333361
///
334362
/// If this function returns `NotReady`, then the `unpark` should have been
335363
/// scheduled to receive a notification when poll can be called again.
336364
/// Otherwise if `Ready` or `Err` is returned, the `Spawn` task can be
337365
/// safely destroyed.
338-
pub fn poll_future(&mut self, unpark: &UnparkHandle) -> Poll<F::Item, F::Error> {
339-
self.enter(unpark, |f| f.poll())
366+
pub fn poll_future<U : Unpark + Clone>(&mut self, unpark: &U) -> Poll<F::Item, F::Error> {
367+
self.enter(UnparkHandle::from(unpark), |f| f.poll())
340368
}
341369

342370
/// Waits for the internal future to complete, blocking this thread's
@@ -347,9 +375,8 @@ impl<F: Future> Spawn<F> {
347375
/// `thread::park` to block the current thread.
348376
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
349377
let unpark = Arc::new(ThreadUnpark::new());
350-
let handle = UnparkHandle::new(unpark.clone());
351378
loop {
352-
match try!(self.poll_future(&handle)) {
379+
match try!(self.poll_future(&unpark)) {
353380
Async::NotReady => unpark.park(),
354381
Async::Ready(e) => return Ok(e),
355382
}
@@ -394,18 +421,17 @@ impl<F: Future> Spawn<F> {
394421

395422
impl<S: Stream> Spawn<S> {
396423
/// Like `poll_future`, except polls the underlying stream.
397-
pub fn poll_stream(&mut self, unpark: &UnparkHandle)
424+
pub fn poll_stream<U : Unpark + Clone>(&mut self, unpark: &U)
398425
-> Poll<Option<S::Item>, S::Error> {
399-
self.enter(unpark, |stream| stream.poll())
426+
self.enter(UnparkHandle::from(unpark), |stream| stream.poll())
400427
}
401428

402429
/// Like `wait_future`, except only waits for the next element to arrive on
403430
/// the underlying stream.
404431
pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
405432
let unpark = Arc::new(ThreadUnpark::new());
406-
let handle = UnparkHandle::new(unpark.clone());
407433
loop {
408-
match self.poll_stream(&handle) {
434+
match self.poll_stream(&unpark) {
409435
Ok(Async::NotReady) => unpark.park(),
410436
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
411437
Ok(Async::Ready(None)) => return None,
@@ -421,19 +447,19 @@ impl<S: Sink> Spawn<S> {
421447
/// If the underlying operation returns `NotReady` then the `unpark` value
422448
/// passed in will receive a notification when the operation is ready to be
423449
/// attempted again.
424-
pub fn start_send(&mut self, value: S::SinkItem, unpark: &UnparkHandle)
450+
pub fn start_send<U : Unpark + Clone>(&mut self, value: S::SinkItem, unpark: &U)
425451
-> StartSend<S::SinkItem, S::SinkError> {
426-
self.enter(unpark, |sink| sink.start_send(value))
452+
self.enter(UnparkHandle::from(unpark), |sink| sink.start_send(value))
427453
}
428454

429455
/// Invokes the underlying `poll_complete` method with this task in place.
430456
///
431-
/// If the underlying operation returns `NotReady` then the `unpark` value
432-
/// passed in will receive a notification when the operation is ready to be
433-
/// attempted again.
434-
pub fn poll_flush(&mut self, unpark: &UnparkHandle)
457+
/// If the underlying operation returns `NotReady` then a clone of
458+
/// the `unpark` value passed in will receive a notification
459+
/// when the operation is ready to be attempted again.
460+
pub fn poll_flush<U : Unpark + Clone>(&mut self, unpark: &U)
435461
-> Poll<(), S::SinkError> {
436-
self.enter(unpark, |sink| sink.poll_complete())
462+
self.enter(UnparkHandle::from(unpark), |sink| sink.poll_complete())
437463
}
438464

439465
/// Blocks the current thread until it's able to send `value` on this sink.
@@ -444,9 +470,8 @@ impl<S: Sink> Spawn<S> {
444470
pub fn wait_send(&mut self, mut value: S::SinkItem)
445471
-> Result<(), S::SinkError> {
446472
let unpark = Arc::new(ThreadUnpark::new());
447-
let handle = UnparkHandle::new(unpark.clone());
448473
loop {
449-
value = match try!(self.start_send(value, &handle)) {
474+
value = match try!(self.start_send(value, &unpark)) {
450475
AsyncSink::NotReady(v) => v,
451476
AsyncSink::Ready => return Ok(()),
452477
};
@@ -464,9 +489,8 @@ impl<S: Sink> Spawn<S> {
464489
/// ready.
465490
pub fn wait_flush(&mut self) -> Result<(), S::SinkError> {
466491
let unpark = Arc::new(ThreadUnpark::new());
467-
let handle = UnparkHandle::new(unpark.clone());
468492
loop {
469-
if try!(self.poll_flush(&handle)).is_ready() {
493+
if try!(self.poll_flush(&unpark)).is_ready() {
470494
return Ok(())
471495
}
472496
unpark.park();
@@ -475,7 +499,7 @@ impl<S: Sink> Spawn<S> {
475499
}
476500

477501
impl<T> Spawn<T> {
478-
fn enter<F, R>(&mut self, unpark: &UnparkHandle, f: F) -> R
502+
fn enter<F, R>(&mut self, unpark: UnparkHandle, f: F) -> R
479503
where F: FnOnce(&mut T) -> R
480504
{
481505
let task = BorrowedTask {
@@ -505,7 +529,7 @@ impl<T: fmt::Debug> fmt::Debug for Spawn<T> {
505529
/// `Spawn::poll_stream` functions. It's transitively used as part of the
506530
/// `Task::unpark` method to internally deliver notifications of readiness of a
507531
/// future to move forward.
508-
pub trait Unpark: Send {
532+
pub trait Unpark: Send + 'static {
509533
/// Indicates that an associated future and/or task are ready to make
510534
/// progress.
511535
///
@@ -574,14 +598,14 @@ impl Run {
574598
/// thread.
575599
pub fn run(self) {
576600
let Run { mut spawn, inner } = self;
577-
let handle = UnparkHandle::new(inner.clone());
601+
578602
// SAFETY: the ownership of this `Run` object is evidence that
579603
// we are in the `POLLING`/`REPOLL` state for the mutex.
580604
unsafe {
581605
inner.mutex.start_poll();
582606

583607
loop {
584-
match spawn.poll_future(&handle) {
608+
match spawn.poll_future(&inner) {
585609
Ok(Async::NotReady) => {}
586610
Ok(Async::Ready(())) |
587611
Err(()) => return inner.mutex.complete(),

0 commit comments

Comments
 (0)