Skip to content

Commit

Permalink
Add metadata to tasks (#33)
Browse files Browse the repository at this point in the history
* Add metadata to tasks

* Make sure to drop the header.

* Revamp so that it uses a generator instead

* Fix heap allocation

* Slightly more elegant strategy

* Add a test for using metadata.

* Use non-driven block_on() instead.
  • Loading branch information
notgull authored Nov 6, 2022
1 parent 230b0a4 commit d5bd8be
Show file tree
Hide file tree
Showing 6 changed files with 500 additions and 129 deletions.
12 changes: 9 additions & 3 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::utils::abort_on_panic;
/// The header of a task.
///
/// This header is stored in memory at the beginning of the heap-allocated task.
pub(crate) struct Header {
pub(crate) struct Header<M> {
/// Current state of the task.
///
/// Contains flags representing the current state and the reference count.
Expand All @@ -26,9 +26,14 @@ pub(crate) struct Header {
/// In addition to the actual waker virtual table, it also contains pointers to several other
/// methods necessary for bookkeeping the heap-allocated task.
pub(crate) vtable: &'static TaskVTable,

/// Metadata associated with the task.
///
/// This metadata may be provided to the user.
pub(crate) metadata: M,
}

impl Header {
impl<M> Header<M> {
/// Notifies the awaiter blocked on this task.
///
/// If the awaiter is the same as the current waker, it will not be notified.
Expand Down Expand Up @@ -145,7 +150,7 @@ impl Header {
}
}

impl fmt::Debug for Header {
impl<M: fmt::Debug> fmt::Debug for Header<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);

Expand All @@ -157,6 +162,7 @@ impl fmt::Debug for Header {
.field("awaiter", &(state & AWAITER != 0))
.field("task", &(state & TASK != 0))
.field("ref_count", &(state / REFERENCE))
.field("metadata", &self.metadata)
.finish()
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ mod state;
mod task;
mod utils;

pub use crate::runnable::{spawn, spawn_unchecked, Runnable};
pub use crate::runnable::{spawn, spawn_unchecked, Builder, Runnable};
pub use crate::task::{FallibleTask, Task};

#[cfg(feature = "std")]
Expand Down
56 changes: 37 additions & 19 deletions src/raw.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use alloc::alloc::Layout as StdLayout;
use core::cell::UnsafeCell;
use core::future::Future;
use core::marker::PhantomData;
use core::mem::{self, ManuallyDrop};
use core::pin::Pin;
use core::ptr::NonNull;
Expand Down Expand Up @@ -64,9 +65,9 @@ pub(crate) struct TaskLayout {
}

/// Raw pointers to the fields inside a task.
pub(crate) struct RawTask<F, T, S> {
pub(crate) struct RawTask<F, T, S, M> {
/// The task header.
pub(crate) header: *const Header,
pub(crate) header: *const Header<M>,

/// The schedule function.
pub(crate) schedule: *const S,
Expand All @@ -78,22 +79,22 @@ pub(crate) struct RawTask<F, T, S> {
pub(crate) output: *mut T,
}

impl<F, T, S> Copy for RawTask<F, T, S> {}
impl<F, T, S, M> Copy for RawTask<F, T, S, M> {}

impl<F, T, S> Clone for RawTask<F, T, S> {
impl<F, T, S, M> Clone for RawTask<F, T, S, M> {
fn clone(&self) -> Self {
*self
}
}

impl<F, T, S> RawTask<F, T, S> {
impl<F, T, S, M> RawTask<F, T, S, M> {
const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout();

/// Computes the memory layout for a task.
#[inline]
const fn eval_task_layout() -> Option<TaskLayout> {
// Compute the layouts for `Header`, `S`, `F`, and `T`.
let layout_header = Layout::new::<Header>();
let layout_header = Layout::new::<Header<M>>();
let layout_s = Layout::new::<S>();
let layout_f = Layout::new::<F>();
let layout_r = Layout::new::<T>();
Expand All @@ -119,10 +120,10 @@ impl<F, T, S> RawTask<F, T, S> {
}
}

impl<F, T, S> RawTask<F, T, S>
impl<F, T, S, M> RawTask<F, T, S, M>
where
F: Future<Output = T>,
S: Fn(Runnable),
S: Fn(Runnable<M>),
{
const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
Self::clone_waker,
Expand All @@ -134,7 +135,15 @@ where
/// Allocates a task with the given `future` and `schedule` function.
///
/// It is assumed that initially only the `Runnable` and the `Task` exist.
pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>(
future: Gen,
schedule: S,
metadata: M,
) -> NonNull<()>
where
F: 'a,
M: 'a,
{
// Compute the layout of the task for allocation. Abort if the computation fails.
//
// n.b. notgull: task_layout now automatically aborts instead of panicking
Expand All @@ -150,7 +159,7 @@ where
let raw = Self::from_ptr(ptr.as_ptr());

// Write the header as the first field of the task.
(raw.header as *mut Header).write(Header {
(raw.header as *mut Header<M>).write(Header {
state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
awaiter: UnsafeCell::new(None),
vtable: &TaskVTable {
Expand All @@ -163,11 +172,15 @@ where
clone_waker: Self::clone_waker,
layout_info: &Self::TASK_LAYOUT,
},
metadata,
});

// Write the schedule function as the third field of the task.
(raw.schedule as *mut S).write(schedule);

// Generate the future, now that the metadata has been pinned in place.
let future = abort_on_panic(|| future(&(*raw.header).metadata));

// Write the future as the fourth field of the task.
raw.future.write(future);

Expand All @@ -183,7 +196,7 @@ where

unsafe {
Self {
header: p as *const Header,
header: p as *const Header<M>,
schedule: p.add(task_layout.offset_s) as *const S,
future: p.add(task_layout.offset_f) as *mut F,
output: p.add(task_layout.offset_r) as *mut T,
Expand Down Expand Up @@ -319,6 +332,7 @@ where
// still alive.
let task = Runnable {
ptr: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
};
(*raw.schedule)(task);
}
Expand Down Expand Up @@ -410,6 +424,7 @@ where

let task = Runnable {
ptr: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
};
(*raw.schedule)(task);
}
Expand Down Expand Up @@ -442,6 +457,9 @@ where

// We need a safeguard against panics because destructors can panic.
abort_on_panic(|| {
// Drop the header along with the metadata.
(raw.header as *mut Header<M>).drop_in_place();

// Drop the schedule function.
(raw.schedule as *mut S).drop_in_place();
});
Expand Down Expand Up @@ -625,15 +643,15 @@ where
return false;

/// A guard that closes the task if polling its future panics.
struct Guard<F, T, S>(RawTask<F, T, S>)
struct Guard<F, T, S, M>(RawTask<F, T, S, M>)
where
F: Future<Output = T>,
S: Fn(Runnable);
S: Fn(Runnable<M>);

impl<F, T, S> Drop for Guard<F, T, S>
impl<F, T, S, M> Drop for Guard<F, T, S, M>
where
F: Future<Output = T>,
S: Fn(Runnable),
S: Fn(Runnable<M>),
{
fn drop(&mut self) {
let raw = self.0;
Expand All @@ -648,7 +666,7 @@ where
if state & CLOSED != 0 {
// The thread that closed the task didn't drop the future because it
// was running so now it's our responsibility to do so.
RawTask::<F, T, S>::drop_future(ptr);
RawTask::<F, T, S, M>::drop_future(ptr);

// Mark the task as not running and not scheduled.
(*raw.header)
Expand All @@ -662,7 +680,7 @@ where
}

// Drop the task reference.
RawTask::<F, T, S>::drop_ref(ptr);
RawTask::<F, T, S, M>::drop_ref(ptr);

// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
Expand All @@ -680,7 +698,7 @@ where
) {
Ok(state) => {
// Drop the future because the task is now closed.
RawTask::<F, T, S>::drop_future(ptr);
RawTask::<F, T, S, M>::drop_future(ptr);

// Take the awaiter out.
let mut awaiter = None;
Expand All @@ -689,7 +707,7 @@ where
}

// Drop the task reference.
RawTask::<F, T, S>::drop_ref(ptr);
RawTask::<F, T, S, M>::drop_ref(ptr);

// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
Expand Down
Loading

0 comments on commit d5bd8be

Please sign in to comment.