Skip to content

Commit cf2fd8c

Browse files
committed
Zero-allocation futures
Get rid of `Arc` in the `poll` methods, replacing with a `UnparkHandle`. Implement a custom trait object that can clone itself by limiting the maximum size of the object. Upon `park`, instead of giving `Arc` references we clone the contents of the `UnparkHandle`, this generalizes the behaviour of cloning the `Arc`.
1 parent 61a592d commit cf2fd8c

File tree

8 files changed

+242
-68
lines changed

8 files changed

+242
-68
lines changed

src/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
//!
88
//! [online]: https://tokio.rs/docs/going-deeper/tasks/
99
10-
pub use task_impl::{Spawn, spawn, Unpark, Executor, Run};
10+
pub use task_impl::{Spawn, spawn, Unpark, Executor, Run, UnparkHandle};

src/task_impl/mod.rs

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@ use future::BoxFuture;
1212

1313
mod unpark_mutex;
1414
use self::unpark_mutex::UnparkMutex;
15-
15+
mod unpark_handle;
16+
use self::unpark_handle::UnparkObj;
1617
mod task_rc;
1718
mod data;
1819
#[allow(deprecated)]
1920
#[cfg(feature = "with-deprecated")]
2021
pub use self::task_rc::TaskRc;
2122
pub use self::data::LocalKey;
23+
pub use self::unpark_handle::UnparkHandle;
2224

2325
struct BorrowedTask<'a> {
2426
id: usize,
25-
unpark: &'a Arc<Unpark>,
27+
unpark: UnparkHandle<'a>,
2628
map: &'a data::LocalMap,
2729
events: Events,
2830
}
@@ -81,7 +83,7 @@ fn with<F: FnOnce(&BorrowedTask) -> R, R>(f: F) -> R {
8183
#[derive(Clone)]
8284
pub struct Task {
8385
id: usize,
84-
unpark: Arc<Unpark>,
86+
unpark: UnparkObj,
8587
events: Events,
8688
}
8789

@@ -121,7 +123,7 @@ pub fn park() -> Task {
121123
Task {
122124
id: task.id,
123125
events: task.events.clone(),
124-
unpark: task.unpark.clone(),
126+
unpark: UnparkObj::from(task.unpark),
125127
}
126128
})
127129
}
@@ -333,8 +335,8 @@ impl<F: Future> Spawn<F> {
333335
/// scheduled to receive a notification when poll can be called again.
334336
/// Otherwise if `Ready` or `Err` is returned, the `Spawn` task can be
335337
/// safely destroyed.
336-
pub fn poll_future(&mut self, unpark: Arc<Unpark>) -> Poll<F::Item, F::Error> {
337-
self.enter(&unpark, |f| f.poll())
338+
pub fn poll_future(&mut self, unpark: UnparkHandle) -> Poll<F::Item, F::Error> {
339+
self.enter(unpark, |f| f.poll())
338340
}
339341

340342
/// Waits for the internal future to complete, blocking this thread's
@@ -344,9 +346,11 @@ impl<F: Future> Spawn<F> {
344346
/// to complete. When a future cannot make progress it will use
345347
/// `thread::park` to block the current thread.
346348
pub fn wait_future(&mut self) -> Result<F::Item, F::Error> {
347-
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
349+
let unpark = Arc::new(ThreadUnpark::new());
350+
let mut unpark2 = unpark.clone();
351+
let handle = UnparkHandle::new(&mut unpark2);
348352
loop {
349-
match try!(self.poll_future(unpark.clone())) {
353+
match try!(self.poll_future(handle)) {
350354
Async::NotReady => unpark.park(),
351355
Async::Ready(e) => return Ok(e),
352356
}
@@ -391,17 +395,19 @@ impl<F: Future> Spawn<F> {
391395

392396
impl<S: Stream> Spawn<S> {
393397
/// Like `poll_future`, except polls the underlying stream.
394-
pub fn poll_stream(&mut self, unpark: Arc<Unpark>)
398+
pub fn poll_stream(&mut self, unpark: UnparkHandle)
395399
-> Poll<Option<S::Item>, S::Error> {
396-
self.enter(&unpark, |stream| stream.poll())
400+
self.enter(unpark, |stream| stream.poll())
397401
}
398402

399403
/// Like `wait_future`, except only waits for the next element to arrive on
400404
/// the underlying stream.
401405
pub fn wait_stream(&mut self) -> Option<Result<S::Item, S::Error>> {
402-
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
406+
let unpark = Arc::new(ThreadUnpark::new());
407+
let mut unpark2 = unpark.clone();
408+
let handle = UnparkHandle::new(&mut unpark2);
403409
loop {
404-
match self.poll_stream(unpark.clone()) {
410+
match self.poll_stream(handle) {
405411
Ok(Async::NotReady) => unpark.park(),
406412
Ok(Async::Ready(Some(e))) => return Some(Ok(e)),
407413
Ok(Async::Ready(None)) => return None,
@@ -417,7 +423,7 @@ impl<S: Sink> Spawn<S> {
417423
/// If the underlying operation returns `NotReady` then the `unpark` value
418424
/// passed in will receive a notification when the operation is ready to be
419425
/// attempted again.
420-
pub fn start_send(&mut self, value: S::SinkItem, unpark: &Arc<Unpark>)
426+
pub fn start_send(&mut self, value: S::SinkItem, unpark: UnparkHandle)
421427
-> StartSend<S::SinkItem, S::SinkError> {
422428
self.enter(unpark, |sink| sink.start_send(value))
423429
}
@@ -427,7 +433,7 @@ impl<S: Sink> Spawn<S> {
427433
/// If the underlying operation returns `NotReady` then the `unpark` value
428434
/// passed in will receive a notification when the operation is ready to be
429435
/// attempted again.
430-
pub fn poll_flush(&mut self, unpark: &Arc<Unpark>)
436+
pub fn poll_flush(&mut self, unpark: UnparkHandle)
431437
-> Poll<(), S::SinkError> {
432438
self.enter(unpark, |sink| sink.poll_complete())
433439
}
@@ -439,10 +445,11 @@ impl<S: Sink> Spawn<S> {
439445
/// be blocked until it's able to send the value.
440446
pub fn wait_send(&mut self, mut value: S::SinkItem)
441447
-> Result<(), S::SinkError> {
442-
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
443-
let unpark2 = unpark.clone() as Arc<Unpark>;
448+
let unpark = Arc::new(ThreadUnpark::new());
449+
let mut unpark2 = unpark.clone();
450+
let handle = UnparkHandle::new(&mut unpark2);
444451
loop {
445-
value = match try!(self.start_send(value, &unpark2)) {
452+
value = match try!(self.start_send(value, handle)) {
446453
AsyncSink::NotReady(v) => v,
447454
AsyncSink::Ready => return Ok(()),
448455
};
@@ -459,10 +466,11 @@ impl<S: Sink> Spawn<S> {
459466
/// The thread will be blocked until `poll_complete` returns that it's
460467
/// ready.
461468
pub fn wait_flush(&mut self) -> Result<(), S::SinkError> {
462-
let unpark = Arc::new(ThreadUnpark::new(thread::current()));
463-
let unpark2 = unpark.clone() as Arc<Unpark>;
469+
let unpark = Arc::new(ThreadUnpark::new());
470+
let mut unpark2 = unpark.clone();
471+
let handle = UnparkHandle::new(&mut unpark2);
464472
loop {
465-
if try!(self.poll_flush(&unpark2)).is_ready() {
473+
if try!(self.poll_flush(handle)).is_ready() {
466474
return Ok(())
467475
}
468476
unpark.park();
@@ -471,7 +479,7 @@ impl<S: Sink> Spawn<S> {
471479
}
472480

473481
impl<T> Spawn<T> {
474-
fn enter<F, R>(&mut self, unpark: &Arc<Unpark>, f: F) -> R
482+
fn enter<F, R>(&mut self, unpark: UnparkHandle, f: F) -> R
475483
where F: FnOnce(&mut T) -> R
476484
{
477485
let task = BorrowedTask {
@@ -501,7 +509,7 @@ impl<T: fmt::Debug> fmt::Debug for Spawn<T> {
501509
/// `Spawn::poll_stream` functions. It's transitively used as part of the
502510
/// `Task::unpark` method to internally deliver notifications of readiness of a
503511
/// future to move forward.
504-
pub trait Unpark: Send + Sync {
512+
pub trait Unpark: Send {
505513
/// Indicates that an associated future and/or task are ready to make
506514
/// progress.
507515
///
@@ -510,6 +518,12 @@ pub trait Unpark: Send + Sync {
510518
fn unpark(&self);
511519
}
512520

521+
impl<T: Unpark + Sync> Unpark for Arc<T> {
522+
fn unpark(&self) {
523+
(**self).unpark();
524+
}
525+
}
526+
513527
/// A trait representing requests to poll futures.
514528
///
515529
/// This trait is an argument to the `Spawn::execute` which is used to run a
@@ -526,9 +540,9 @@ struct ThreadUnpark {
526540
}
527541

528542
impl ThreadUnpark {
529-
fn new(thread: thread::Thread) -> ThreadUnpark {
543+
fn new() -> ThreadUnpark {
530544
ThreadUnpark {
531-
thread: thread,
545+
thread: thread::current(),
532546
ready: AtomicBool::new(false),
533547
}
534548
}
@@ -563,15 +577,14 @@ impl Run {
563577
/// Actually run the task (invoking `poll` on its future) on the current
564578
/// thread.
565579
pub fn run(self) {
566-
let Run { mut spawn, inner } = self;
567-
580+
let Run { mut spawn, mut inner } = self;
568581
// SAFETY: the ownership of this `Run` object is evidence that
569582
// we are in the `POLLING`/`REPOLL` state for the mutex.
570583
unsafe {
571584
inner.mutex.start_poll();
572585

573586
loop {
574-
match spawn.poll_future(inner.clone()) {
587+
match spawn.poll_future(UnparkHandle::new(&mut inner)) {
575588
Ok(Async::NotReady) => {}
576589
Ok(Async::Ready(())) |
577590
Err(()) => return inner.mutex.complete(),

src/task_impl/unpark_handle.rs

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
use core::ptr;
2+
use core::mem::{forget, size_of};
3+
use core::marker::PhantomData;
4+
use super::Unpark;
5+
6+
/// Maximum size in bytes that will fit in a `UnparkObject`.
7+
/// TODO: What should this value be?
8+
/// Should we expose this?
9+
/// We probably want to say that this value may increase but never decrease in a 1.x release.
10+
const MAX_OBJ_BYTES: usize = 64;
11+
12+
/// A VTable that knows how to clone because the data has a maximum size.
13+
#[derive(Copy)]
14+
struct UnparkVtable {
15+
unpark: fn(*const ()),
16+
clone_to_byte_buffer: fn(*const ()) -> [u8; MAX_OBJ_BYTES],
17+
drop_in_place: unsafe fn(*mut ()),
18+
}
19+
20+
impl Clone for UnparkVtable {
21+
fn clone(&self) -> Self {
22+
UnparkVtable { ..*self }
23+
}
24+
}
25+
26+
impl UnparkVtable {
27+
fn new<T: Unpark + Clone>() -> UnparkVtable {
28+
assert!(size_of::<T>() <= MAX_OBJ_BYTES);
29+
UnparkVtable {
30+
unpark: Self::call_unpark::<T>,
31+
clone_to_byte_buffer: Self::clone_to_byte_buffer::<T>,
32+
drop_in_place: Self::drop_in_place::<T>,
33+
}
34+
}
35+
36+
fn call_unpark<T: Unpark>(data: *const ()) {
37+
let downcasted = unsafe { &*(data as *const _ as *const T) };
38+
downcasted.unpark()
39+
}
40+
41+
/// Returns array with bytes of the cloned data. Make sure data is shorter than MAX_OBJ_BYTES.
42+
/// The caller owns the new data and is responsible for dropping it with `drop_in_place<T>`.
43+
fn clone_to_byte_buffer<T: Clone>(data: *const ()) -> [u8; MAX_OBJ_BYTES] {
44+
let downcasted = unsafe { &*(data as *const _ as *const T) };
45+
let cloned = downcasted.clone();
46+
let mut buffer = [0; MAX_OBJ_BYTES];
47+
// View cloned and buffer as raw bytes.
48+
let cloned_ptr = &cloned as *const _ as *const u8;
49+
let buffer_ptr = &mut buffer as *mut _ as *mut u8;
50+
// Copy from cloned to the buffer and forget cloned.
51+
// Semantically, the buffer now owns cloned.
52+
unsafe {
53+
ptr::copy_nonoverlapping(cloned_ptr, buffer_ptr, size_of::<T>());
54+
}
55+
forget(cloned);
56+
buffer
57+
}
58+
59+
/// Make sure the original value is forgotten to avoid double free.
60+
unsafe fn drop_in_place<T>(data: *mut ()) {
61+
ptr::drop_in_place(&mut *(data as *mut _ as *mut T));
62+
}
63+
}
64+
65+
/// `UnparkHandle` is used as an argument to methods like `poll_future`.
66+
/// Each `Task` handle generated by `task::park` will contain a fresh clone
67+
/// of the `unpark` argument provided to `UnparkHandle::new`.
68+
/// Depending on your use case, you may want to wrap your `unpark` value in
69+
/// an `Arc` before passing it to the `UnparkHandle`.
70+
///
71+
/// # Deciding whether to use an `Arc`
72+
/// If your `unpark` is not `Clone` or has lifetime parameters then you must use an `Arc`.
73+
/// If you use inner mutability in your `unpark`, then you should carefully
74+
/// consider what happens when it is cloned and what is the behaviour you want.
75+
/// Inner mutability aside, the only difference between using or not an `Arc` should be performance.
76+
/// An `Arc` will cost an allocation upfront and updates an atomic ref count on park,
77+
/// while no `Arc` has no upfront cost but will cost a clone on park.
78+
/// The best strategy depends on how often your futures park and how costly it is to clone `unpark`.
79+
#[derive(Copy, Clone)]
80+
// Safe to copy because `UnparkHandle` never mutates itself.
81+
#[allow(missing_debug_implementations)]
82+
pub struct UnparkHandle<'a> {
83+
// Trait object that can clone `data` into an 'UnparkObj` to be put in a `Task`.
84+
data: *const (),
85+
data_lifetime: PhantomData<&'a ()>,
86+
vtable: UnparkVtable,
87+
}
88+
89+
impl<'a> UnparkHandle<'a> {
90+
/// Constructs a `UnparkHandle`.
91+
/// A `Task` handle returned by `park` will contain a clone of the `unpark`
92+
/// argument provided here. You may want to wrap `unpark` in an `Arc`.
93+
///
94+
/// # Panic
95+
/// Panics if the size of 'T' is too large. If you get a panic try wrapping the argument
96+
/// in an `Arc`.
97+
98+
// Take unpark as &mut data to make sure it dosen't change under our feet due to inner mutability.
99+
// T must be static so we don't clone around references to a stack or something.
100+
pub fn new<T: Unpark + Clone + 'static>(unpark: &mut T) -> UnparkHandle<'a> {
101+
if size_of::<T>() > MAX_OBJ_BYTES {
102+
// TODO: Panicking here seems reasonable and could be a compile time error when we
103+
// get a const sytem in Rust. But what about libraries that pass a user supplied type as T?
104+
// Should we expose MAX_OBJ_BYTES? Offer a version that return an error?
105+
panic!("The size of T is {} bytes which is too large. Try wrapping the unpark argument in an Arc.");
106+
}
107+
108+
UnparkHandle {
109+
data: unpark as *const _ as *const (),
110+
data_lifetime: PhantomData,
111+
vtable: UnparkVtable::new::<T>(),
112+
}
113+
}
114+
}
115+
116+
/// A custom trait object that takes ownership of the data as a slice of bytes.
117+
pub struct UnparkObj {
118+
data: [u8; MAX_OBJ_BYTES],
119+
vtable: UnparkVtable,
120+
}
121+
122+
impl Drop for UnparkObj {
123+
fn drop(&mut self) {
124+
unsafe {
125+
(self.vtable.drop_in_place)(&mut self.data as *mut _ as *mut ());
126+
}
127+
}
128+
}
129+
130+
impl UnparkObj {
131+
fn new(data: *const (), vtable: UnparkVtable) -> Self {
132+
UnparkObj {
133+
data: (vtable.clone_to_byte_buffer)(data),
134+
vtable: vtable,
135+
}
136+
}
137+
}
138+
139+
impl Clone for UnparkObj {
140+
fn clone(&self) -> Self {
141+
UnparkObj::new(&self.data as *const _ as *const (), self.vtable)
142+
}
143+
}
144+
145+
impl<'a> From<UnparkHandle<'a>> for UnparkObj {
146+
fn from(handle: UnparkHandle) -> UnparkObj {
147+
UnparkObj::new(handle.data, handle.vtable)
148+
}
149+
}
150+
151+
impl Unpark for UnparkObj {
152+
fn unpark(&self) {
153+
(self.vtable.unpark)(&self.data as *const _ as *const ())
154+
}
155+
}

tests/all.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ fn select2() {
323323
let b = b.map(move |v| { btx.send(v).unwrap(); v });
324324
let d = d.map(move |v| { dtx.send(v).unwrap(); v });
325325
let f = b.select(d);
326-
drop(executor::spawn(f).poll_future(support::unpark_noop()));
326+
drop(executor::spawn(f).poll_future(unpark_noop()));
327327
assert!(drx.recv().is_err());
328328
assert!(brx.recv().is_err());
329329
}

0 commit comments

Comments
 (0)