Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata to tasks #33

Merged
merged 7 commits into from
Nov 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::utils::abort_on_panic;
/// The header of a task.
///
/// This header is stored in memory at the beginning of the heap-allocated task.
pub(crate) struct Header {
pub(crate) struct Header<M> {
/// Current state of the task.
///
/// Contains flags representing the current state and the reference count.
Expand All @@ -26,9 +26,14 @@ pub(crate) struct Header {
/// In addition to the actual waker virtual table, it also contains pointers to several other
/// methods necessary for bookkeeping the heap-allocated task.
pub(crate) vtable: &'static TaskVTable,

/// Metadata associated with the task.
///
/// This metadata may be provided to the user.
pub(crate) metadata: M,
}

impl Header {
impl<M> Header<M> {
/// Notifies the awaiter blocked on this task.
///
/// If the awaiter is the same as the current waker, it will not be notified.
Expand Down Expand Up @@ -145,7 +150,7 @@ impl Header {
}
}

impl fmt::Debug for Header {
impl<M: fmt::Debug> fmt::Debug for Header<M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);

Expand All @@ -157,6 +162,7 @@ impl fmt::Debug for Header {
.field("awaiter", &(state & AWAITER != 0))
.field("task", &(state & TASK != 0))
.field("ref_count", &(state / REFERENCE))
.field("metadata", &self.metadata)
.finish()
}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ mod state;
mod task;
mod utils;

pub use crate::runnable::{spawn, spawn_unchecked, Runnable};
pub use crate::runnable::{spawn, spawn_unchecked, Builder, Runnable};
pub use crate::task::{FallibleTask, Task};

#[cfg(feature = "std")]
Expand Down
56 changes: 37 additions & 19 deletions src/raw.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use alloc::alloc::Layout as StdLayout;
use core::cell::UnsafeCell;
use core::future::Future;
use core::marker::PhantomData;
use core::mem::{self, ManuallyDrop};
use core::pin::Pin;
use core::ptr::NonNull;
Expand Down Expand Up @@ -64,9 +65,9 @@ pub(crate) struct TaskLayout {
}

/// Raw pointers to the fields inside a task.
pub(crate) struct RawTask<F, T, S> {
pub(crate) struct RawTask<F, T, S, M> {
/// The task header.
pub(crate) header: *const Header,
pub(crate) header: *const Header<M>,

/// The schedule function.
pub(crate) schedule: *const S,
Expand All @@ -78,22 +79,22 @@ pub(crate) struct RawTask<F, T, S> {
pub(crate) output: *mut T,
}

impl<F, T, S> Copy for RawTask<F, T, S> {}
impl<F, T, S, M> Copy for RawTask<F, T, S, M> {}

impl<F, T, S> Clone for RawTask<F, T, S> {
impl<F, T, S, M> Clone for RawTask<F, T, S, M> {
fn clone(&self) -> Self {
*self
}
}

impl<F, T, S> RawTask<F, T, S> {
impl<F, T, S, M> RawTask<F, T, S, M> {
const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout();

/// Computes the memory layout for a task.
#[inline]
const fn eval_task_layout() -> Option<TaskLayout> {
// Compute the layouts for `Header`, `S`, `F`, and `T`.
let layout_header = Layout::new::<Header>();
let layout_header = Layout::new::<Header<M>>();
let layout_s = Layout::new::<S>();
let layout_f = Layout::new::<F>();
let layout_r = Layout::new::<T>();
Expand All @@ -119,10 +120,10 @@ impl<F, T, S> RawTask<F, T, S> {
}
}

impl<F, T, S> RawTask<F, T, S>
impl<F, T, S, M> RawTask<F, T, S, M>
where
F: Future<Output = T>,
S: Fn(Runnable),
S: Fn(Runnable<M>),
{
const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
Self::clone_waker,
Expand All @@ -134,7 +135,15 @@ where
/// Allocates a task with the given `future` and `schedule` function.
///
/// It is assumed that initially only the `Runnable` and the `Task` exist.
pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>(
future: Gen,
schedule: S,
metadata: M,
) -> NonNull<()>
where
F: 'a,
M: 'a,
{
// Compute the layout of the task for allocation. Abort if the computation fails.
//
// n.b. notgull: task_layout now automatically aborts instead of panicking
Expand All @@ -150,7 +159,7 @@ where
let raw = Self::from_ptr(ptr.as_ptr());

// Write the header as the first field of the task.
(raw.header as *mut Header).write(Header {
(raw.header as *mut Header<M>).write(Header {
state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
awaiter: UnsafeCell::new(None),
vtable: &TaskVTable {
Expand All @@ -163,11 +172,15 @@ where
clone_waker: Self::clone_waker,
layout_info: &Self::TASK_LAYOUT,
},
metadata,
});

// Write the schedule function as the third field of the task.
(raw.schedule as *mut S).write(schedule);

// Generate the future, now that the metadata has been pinned in place.
let future = abort_on_panic(|| future(&(*raw.header).metadata));

// Write the future as the fourth field of the task.
raw.future.write(future);

Expand All @@ -183,7 +196,7 @@ where

unsafe {
Self {
header: p as *const Header,
header: p as *const Header<M>,
schedule: p.add(task_layout.offset_s) as *const S,
future: p.add(task_layout.offset_f) as *mut F,
output: p.add(task_layout.offset_r) as *mut T,
Expand Down Expand Up @@ -319,6 +332,7 @@ where
// still alive.
let task = Runnable {
ptr: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
};
(*raw.schedule)(task);
}
Expand Down Expand Up @@ -410,6 +424,7 @@ where

let task = Runnable {
ptr: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
};
(*raw.schedule)(task);
}
Expand Down Expand Up @@ -442,6 +457,9 @@ where

// We need a safeguard against panics because destructors can panic.
abort_on_panic(|| {
// Drop the header along with the metadata.
(raw.header as *mut Header<M>).drop_in_place();

// Drop the schedule function.
(raw.schedule as *mut S).drop_in_place();
});
Expand Down Expand Up @@ -625,15 +643,15 @@ where
return false;

/// A guard that closes the task if polling its future panics.
struct Guard<F, T, S>(RawTask<F, T, S>)
struct Guard<F, T, S, M>(RawTask<F, T, S, M>)
where
F: Future<Output = T>,
S: Fn(Runnable);
S: Fn(Runnable<M>);

impl<F, T, S> Drop for Guard<F, T, S>
impl<F, T, S, M> Drop for Guard<F, T, S, M>
where
F: Future<Output = T>,
S: Fn(Runnable),
S: Fn(Runnable<M>),
{
fn drop(&mut self) {
let raw = self.0;
Expand All @@ -648,7 +666,7 @@ where
if state & CLOSED != 0 {
// The thread that closed the task didn't drop the future because it
// was running so now it's our responsibility to do so.
RawTask::<F, T, S>::drop_future(ptr);
RawTask::<F, T, S, M>::drop_future(ptr);

// Mark the task as not running and not scheduled.
(*raw.header)
Expand All @@ -662,7 +680,7 @@ where
}

// Drop the task reference.
RawTask::<F, T, S>::drop_ref(ptr);
RawTask::<F, T, S, M>::drop_ref(ptr);

// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
Expand All @@ -680,7 +698,7 @@ where
) {
Ok(state) => {
// Drop the future because the task is now closed.
RawTask::<F, T, S>::drop_future(ptr);
RawTask::<F, T, S, M>::drop_future(ptr);

// Take the awaiter out.
let mut awaiter = None;
Expand All @@ -689,7 +707,7 @@ where
}

// Drop the task reference.
RawTask::<F, T, S>::drop_ref(ptr);
RawTask::<F, T, S, M>::drop_ref(ptr);

// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
Expand Down
Loading