diff --git a/src/header.rs b/src/header.rs index 8a3a0b9..9747c5d 100644 --- a/src/header.rs +++ b/src/header.rs @@ -10,7 +10,7 @@ use crate::utils::abort_on_panic; /// The header of a task. /// /// This header is stored in memory at the beginning of the heap-allocated task. -pub(crate) struct Header { +pub(crate) struct Header { /// Current state of the task. /// /// Contains flags representing the current state and the reference count. @@ -26,9 +26,14 @@ pub(crate) struct Header { /// In addition to the actual waker virtual table, it also contains pointers to several other /// methods necessary for bookkeeping the heap-allocated task. pub(crate) vtable: &'static TaskVTable, + + /// Metadata associated with the task. + /// + /// This metadata may be provided to the user. + pub(crate) metadata: M, } -impl Header { +impl Header { /// Notifies the awaiter blocked on this task. /// /// If the awaiter is the same as the current waker, it will not be notified. @@ -145,7 +150,7 @@ impl Header { } } -impl fmt::Debug for Header { +impl fmt::Debug for Header { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let state = self.state.load(Ordering::SeqCst); @@ -157,6 +162,7 @@ impl fmt::Debug for Header { .field("awaiter", &(state & AWAITER != 0)) .field("task", &(state & TASK != 0)) .field("ref_count", &(state / REFERENCE)) + .field("metadata", &self.metadata) .finish() } } diff --git a/src/lib.rs b/src/lib.rs index dd689ec..19eb77d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -92,7 +92,7 @@ mod state; mod task; mod utils; -pub use crate::runnable::{spawn, spawn_unchecked, Runnable}; +pub use crate::runnable::{spawn, spawn_unchecked, Builder, Runnable}; pub use crate::task::{FallibleTask, Task}; #[cfg(feature = "std")] diff --git a/src/raw.rs b/src/raw.rs index ec2a6ec..4af7b27 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,6 +1,7 @@ use alloc::alloc::Layout as StdLayout; use core::cell::UnsafeCell; use core::future::Future; +use core::marker::PhantomData; use core::mem::{self, ManuallyDrop}; use core::pin::Pin; use core::ptr::NonNull; @@ -64,9 +65,9 @@ pub(crate) struct TaskLayout { } /// Raw pointers to the fields inside a task. -pub(crate) struct RawTask { +pub(crate) struct RawTask { /// The task header. - pub(crate) header: *const Header, + pub(crate) header: *const Header, /// The schedule function. pub(crate) schedule: *const S, @@ -78,22 +79,22 @@ pub(crate) struct RawTask { pub(crate) output: *mut T, } -impl Copy for RawTask {} +impl Copy for RawTask {} -impl Clone for RawTask { +impl Clone for RawTask { fn clone(&self) -> Self { *self } } -impl RawTask { +impl RawTask { const TASK_LAYOUT: Option = Self::eval_task_layout(); /// Computes the memory layout for a task. #[inline] const fn eval_task_layout() -> Option { // Compute the layouts for `Header`, `S`, `F`, and `T`. - let layout_header = Layout::new::
(); + let layout_header = Layout::new::>(); let layout_s = Layout::new::(); let layout_f = Layout::new::(); let layout_r = Layout::new::(); @@ -119,10 +120,10 @@ impl RawTask { } } -impl RawTask +impl RawTask where F: Future, - S: Fn(Runnable), + S: Fn(Runnable), { const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, @@ -134,7 +135,15 @@ where /// Allocates a task with the given `future` and `schedule` function. /// /// It is assumed that initially only the `Runnable` and the `Task` exist. - pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { + pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>( + future: Gen, + schedule: S, + metadata: M, + ) -> NonNull<()> + where + F: 'a, + M: 'a, + { // Compute the layout of the task for allocation. Abort if the computation fails. // // n.b. notgull: task_layout now automatically aborts instead of panicking @@ -150,7 +159,7 @@ where let raw = Self::from_ptr(ptr.as_ptr()); // Write the header as the first field of the task. - (raw.header as *mut Header).write(Header { + (raw.header as *mut Header).write(Header { state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), awaiter: UnsafeCell::new(None), vtable: &TaskVTable { @@ -163,11 +172,15 @@ where clone_waker: Self::clone_waker, layout_info: &Self::TASK_LAYOUT, }, + metadata, }); // Write the schedule function as the third field of the task. (raw.schedule as *mut S).write(schedule); + // Generate the future, now that the metadata has been pinned in place. + let future = abort_on_panic(|| future(&(*raw.header).metadata)); + // Write the future as the fourth field of the task. raw.future.write(future); @@ -183,7 +196,7 @@ where unsafe { Self { - header: p as *const Header, + header: p as *const Header, schedule: p.add(task_layout.offset_s) as *const S, future: p.add(task_layout.offset_f) as *mut F, output: p.add(task_layout.offset_r) as *mut T, @@ -319,6 +332,7 @@ where // still alive. let task = Runnable { ptr: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, }; (*raw.schedule)(task); } @@ -410,6 +424,7 @@ where let task = Runnable { ptr: NonNull::new_unchecked(ptr as *mut ()), + _marker: PhantomData, }; (*raw.schedule)(task); } @@ -442,6 +457,9 @@ where // We need a safeguard against panics because destructors can panic. abort_on_panic(|| { + // Drop the header along with the metadata. + (raw.header as *mut Header).drop_in_place(); + // Drop the schedule function. (raw.schedule as *mut S).drop_in_place(); }); @@ -625,15 +643,15 @@ where return false; /// A guard that closes the task if polling its future panics. - struct Guard(RawTask) + struct Guard(RawTask) where F: Future, - S: Fn(Runnable); + S: Fn(Runnable); - impl Drop for Guard + impl Drop for Guard where F: Future, - S: Fn(Runnable), + S: Fn(Runnable), { fn drop(&mut self) { let raw = self.0; @@ -648,7 +666,7 @@ where if state & CLOSED != 0 { // The thread that closed the task didn't drop the future because it // was running so now it's our responsibility to do so. - RawTask::::drop_future(ptr); + RawTask::::drop_future(ptr); // Mark the task as not running and not scheduled. (*raw.header) @@ -662,7 +680,7 @@ where } // Drop the task reference. - RawTask::::drop_ref(ptr); + RawTask::::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { @@ -680,7 +698,7 @@ where ) { Ok(state) => { // Drop the future because the task is now closed. - RawTask::::drop_future(ptr); + RawTask::::drop_future(ptr); // Take the awaiter out. let mut awaiter = None; @@ -689,7 +707,7 @@ where } // Drop the task reference. - RawTask::::drop_ref(ptr); + RawTask::::drop_ref(ptr); // Notify the awaiter that the future has been dropped. if let Some(w) = awaiter { diff --git a/src/runnable.rs b/src/runnable.rs index cb70ef3..c10177d 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -6,11 +6,339 @@ use core::ptr::NonNull; use core::sync::atomic::Ordering; use core::task::Waker; +use alloc::boxed::Box; + use crate::header::Header; use crate::raw::RawTask; use crate::state::*; use crate::Task; +/// A builder that creates a new task. +#[derive(Debug)] +pub struct Builder { + /// The metadata associated with the task. + metadata: M, +} + +impl Default for Builder { + fn default() -> Self { + Builder::new().metadata(M::default()) + } +} + +impl Builder<()> { + /// Creates a new task builder. + /// + /// By default, this task builder has no metadata. Use the [`metadata`] method to + /// set the metadata. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// + /// let (runnable, task) = Builder::new().spawn(|()| async {}, |_| {}); + /// ``` + pub fn new() -> Builder<()> { + Builder { metadata: () } + } + + /// Adds metadata to the task. + /// + /// In certain cases, it may be useful to associate some metadata with a task. For instance, + /// you may want to associate a name with a task, or a priority for a priority queue. This + /// method allows the user to attach arbitrary metadata to a task that is available through + /// the [`Runnable`] or the [`Task`]. + /// + /// # Examples + /// + /// This example creates an executor that associates a "priority" number with each task, and + /// then runs the tasks in order of priority. + /// + /// ``` + /// use async_task::{Builder, Runnable}; + /// use once_cell::sync::Lazy; + /// use std::cmp; + /// use std::collections::BinaryHeap; + /// use std::sync::Mutex; + /// + /// # smol::future::block_on(async { + /// /// A wrapper around a `Runnable` that implements `Ord` so that it can be used in a + /// /// priority queue. + /// struct TaskWrapper(Runnable); + /// + /// impl PartialEq for TaskWrapper { + /// fn eq(&self, other: &Self) -> bool { + /// self.0.metadata() == other.0.metadata() + /// } + /// } + /// + /// impl Eq for TaskWrapper {} + /// + /// impl PartialOrd for TaskWrapper { + /// fn partial_cmp(&self, other: &Self) -> Option { + /// Some(self.cmp(other)) + /// } + /// } + /// + /// impl Ord for TaskWrapper { + /// fn cmp(&self, other: &Self) -> cmp::Ordering { + /// self.0.metadata().cmp(other.0.metadata()) + /// } + /// } + /// + /// static EXECUTOR: Lazy>> = Lazy::new(|| { + /// Mutex::new(BinaryHeap::new()) + /// }); + /// + /// let schedule = |runnable| { + /// EXECUTOR.lock().unwrap().push(TaskWrapper(runnable)); + /// }; + /// + /// // Spawn a few tasks with different priorities. + /// let spawn_task = move |priority| { + /// let (runnable, task) = Builder::new().metadata(priority).spawn( + /// move |_| async move { priority }, + /// schedule, + /// ); + /// runnable.schedule(); + /// task + /// }; + /// + /// let t1 = spawn_task(1); + /// let t2 = spawn_task(2); + /// let t3 = spawn_task(3); + /// + /// // Run the tasks in order of priority. + /// let mut metadata_seen = vec![]; + /// while let Some(TaskWrapper(runnable)) = EXECUTOR.lock().unwrap().pop() { + /// metadata_seen.push(*runnable.metadata()); + /// runnable.run(); + /// } + /// + /// assert_eq!(metadata_seen, vec![3, 2, 1]); + /// assert_eq!(t1.await, 1); + /// assert_eq!(t2.await, 2); + /// assert_eq!(t3.await, 3); + /// # }); + /// ``` + pub fn metadata(self, metadata: M) -> Builder { + Builder { metadata } + } +} + +impl Builder { + /// Creates a new task. + /// + /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its + /// output. + /// + /// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] + /// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run + /// again. + /// + /// When the task is woken, its [`Runnable`] is passed to the `schedule` function. + /// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it + /// should push it into a task queue so that it can be processed later. + /// + /// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider + /// using [`spawn_local()`] or [`spawn_unchecked()`] instead. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// + /// // The future inside the task. + /// let future = async { + /// println!("Hello, world!"); + /// }; + /// + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = Builder::new().spawn(|()| future, schedule); + /// ``` + pub fn spawn(self, future: F, schedule: S) -> (Runnable, Task) + where + F: FnOnce(&M) -> Fut, + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + S: Fn(Runnable) + Send + Sync + 'static, + { + unsafe { self.spawn_unchecked(future, schedule) } + } + + /// Creates a new thread-local task. + /// + /// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the + /// [`Runnable`] is used or dropped on another thread, a panic will occur. + /// + /// This function is only available when the `std` feature for this crate is enabled. + /// + /// # Examples + /// + /// ``` + /// use async_task::{Builder, Runnable}; + /// use flume::{Receiver, Sender}; + /// use std::rc::Rc; + /// + /// thread_local! { + /// // A queue that holds scheduled tasks. + /// static QUEUE: (Sender, Receiver) = flume::unbounded(); + /// } + /// + /// // Make a non-Send future. + /// let msg: Rc = "Hello, world!".into(); + /// let future = async move { + /// println!("{}", msg); + /// }; + /// + /// // A function that schedules the task when it gets woken up. + /// let s = QUEUE.with(|(s, _)| s.clone()); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = Builder::new().spawn_local(move |()| future, schedule); + /// ``` + #[cfg(feature = "std")] + pub fn spawn_local( + self, + future: F, + schedule: S, + ) -> (Runnable, Task) + where + F: FnOnce(&M) -> Fut, + Fut: Future + 'static, + Fut::Output: 'static, + S: Fn(Runnable) + Send + Sync + 'static, + { + use std::mem::ManuallyDrop; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::thread::{self, ThreadId}; + + #[inline] + fn thread_id() -> ThreadId { + thread_local! { + static ID: ThreadId = thread::current().id(); + } + ID.try_with(|id| *id) + .unwrap_or_else(|_| thread::current().id()) + } + + struct Checked { + id: ThreadId, + inner: ManuallyDrop, + } + + impl Drop for Checked { + fn drop(&mut self) { + assert!( + self.id == thread_id(), + "local task dropped by a thread that didn't spawn it" + ); + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl Future for Checked { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + assert!( + self.id == thread_id(), + "local task polled by a thread that didn't spawn it" + ); + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } + + // Wrap the future into one that checks which thread it's on. + let future = move |meta| { + let future = future(meta); + + Checked { + id: thread_id(), + inner: ManuallyDrop::new(future), + } + }; + + unsafe { self.spawn_unchecked(future, schedule) } + } + + /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. + /// + /// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and + /// `'static` on `future` and `schedule`. + /// + /// # Safety + /// + /// - If `future`'s output is not [`Send`], its [`Runnable`] must be used and dropped on the original + /// thread. + /// - If `future`'s output is not `'static`, borrowed variables must outlive its [`Runnable`]. + /// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on + /// the original thread. + /// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. + /// + /// # Examples + /// + /// ``` + /// use async_task::Builder; + /// + /// // The future inside the task. + /// let future = async { + /// println!("Hello, world!"); + /// }; + /// + /// // If the task gets woken up, it will be sent into this channel. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = unsafe { Builder::new().spawn_unchecked(move |()| future, schedule) }; + /// ``` + pub unsafe fn spawn_unchecked<'a, F, Fut, S>( + self, + future: F, + schedule: S, + ) -> (Runnable, Task) + where + F: FnOnce(&'a M) -> Fut, + Fut: Future + 'a, + S: Fn(Runnable), + M: 'a, + { + let Self { metadata } = self; + + // Allocate large futures on the heap. + let ptr = if mem::size_of::() >= 2048 { + let future = |meta| { + let future = future(meta); + Box::pin(future) + }; + + RawTask::<_, Fut::Output, S, M>::allocate(future, schedule, metadata) + } else { + RawTask::::allocate(future, schedule, metadata) + }; + + let runnable = Runnable { + ptr, + _marker: PhantomData, + }; + let task = Task { + ptr, + _marker: PhantomData, + }; + (runnable, task) + } +} + /// Creates a new task. /// /// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its @@ -90,56 +418,7 @@ where F::Output: 'static, S: Fn(Runnable) + Send + Sync + 'static, { - use std::mem::ManuallyDrop; - use std::pin::Pin; - use std::task::{Context, Poll}; - use std::thread::{self, ThreadId}; - - #[inline] - fn thread_id() -> ThreadId { - thread_local! { - static ID: ThreadId = thread::current().id(); - } - ID.try_with(|id| *id) - .unwrap_or_else(|_| thread::current().id()) - } - - struct Checked { - id: ThreadId, - inner: ManuallyDrop, - } - - impl Drop for Checked { - fn drop(&mut self) { - assert!( - self.id == thread_id(), - "local task dropped by a thread that didn't spawn it" - ); - unsafe { - ManuallyDrop::drop(&mut self.inner); - } - } - } - - impl Future for Checked { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - assert!( - self.id == thread_id(), - "local task polled by a thread that didn't spawn it" - ); - unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } - } - } - - // Wrap the future into one that checks which thread it's on. - let future = Checked { - id: thread_id(), - inner: ManuallyDrop::new(future), - }; - - unsafe { spawn_unchecked(future, schedule) } + Builder::new().spawn_local(move |()| future, schedule) } /// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. @@ -176,20 +455,7 @@ where F: Future, S: Fn(Runnable), { - // Allocate large futures on the heap. - let ptr = if mem::size_of::() >= 2048 { - let future = alloc::boxed::Box::pin(future); - RawTask::<_, F::Output, S>::allocate(future, schedule) - } else { - RawTask::::allocate(future, schedule) - }; - - let runnable = Runnable { ptr }; - let task = Task { - ptr, - _marker: PhantomData, - }; - (runnable, task) + Builder::new().spawn_unchecked(move |()| future, schedule) } /// A handle to a runnable task. @@ -230,20 +496,31 @@ where /// runnable.schedule(); /// assert_eq!(smol::future::block_on(task), 3); /// ``` -pub struct Runnable { +pub struct Runnable { /// A pointer to the heap-allocated task. pub(crate) ptr: NonNull<()>, + + /// A marker capturing generic type `M`. + pub(crate) _marker: PhantomData, } -unsafe impl Send for Runnable {} -unsafe impl Sync for Runnable {} +unsafe impl Send for Runnable {} +unsafe impl Sync for Runnable {} #[cfg(feature = "std")] -impl std::panic::UnwindSafe for Runnable {} +impl std::panic::UnwindSafe for Runnable {} #[cfg(feature = "std")] -impl std::panic::RefUnwindSafe for Runnable {} +impl std::panic::RefUnwindSafe for Runnable {} + +impl Runnable { + /// Get the metadata associated with this task. + /// + /// Tasks can be created with a metadata object associated with them; by default, this + /// is a `()` value. See the [`Builder::metadata()`] method for more information. + pub fn metadata(&self) -> &M { + &self.header().metadata + } -impl Runnable { /// Schedules the task. /// /// This is a convenience method that passes the [`Runnable`] to the schedule function. @@ -265,7 +542,7 @@ impl Runnable { /// ``` pub fn schedule(self) { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; mem::forget(self); unsafe { @@ -303,7 +580,7 @@ impl Runnable { /// ``` pub fn run(self) -> bool { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; mem::forget(self); unsafe { ((*header).vtable.run)(ptr) } @@ -334,22 +611,26 @@ impl Runnable { /// ``` pub fn waker(&self) -> Waker { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { let raw_waker = ((*header).vtable.clone_waker)(ptr); Waker::from_raw(raw_waker) } } + + fn header(&self) -> &Header { + unsafe { &*(self.ptr.as_ptr() as *const Header) } + } } -impl Drop for Runnable { +impl Drop for Runnable { fn drop(&mut self) { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = self.header(); unsafe { - let mut state = (*header).state.load(Ordering::Acquire); + let mut state = header.state.load(Ordering::Acquire); loop { // If the task has been completed or closed, it can't be canceled. @@ -358,7 +639,7 @@ impl Drop for Runnable { } // Mark the task as closed. - match (*header).state.compare_exchange_weak( + match header.state.compare_exchange_weak( state, state | CLOSED, Ordering::AcqRel, @@ -370,10 +651,10 @@ impl Drop for Runnable { } // Drop the future. - ((*header).vtable.drop_future)(ptr); + (header.vtable.drop_future)(ptr); // Mark the task as unscheduled. - let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + let state = header.state.fetch_and(!SCHEDULED, Ordering::AcqRel); // Notify the awaiter that the future has been dropped. if state & AWAITER != 0 { @@ -381,15 +662,15 @@ impl Drop for Runnable { } // Drop the task reference. - ((*header).vtable.drop_ref)(ptr); + (header.vtable.drop_ref)(ptr); } } } -impl fmt::Debug for Runnable { +impl fmt::Debug for Runnable { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; f.debug_struct("Runnable") .field("header", unsafe { &(*header) }) diff --git a/src/task.rs b/src/task.rs index 8ecd746..38def1e 100644 --- a/src/task.rs +++ b/src/task.rs @@ -44,25 +44,25 @@ use crate::state::*; /// assert_eq!(future::block_on(task), 3); /// ``` #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -pub struct Task { +pub struct Task { /// A raw task pointer. pub(crate) ptr: NonNull<()>, - /// A marker capturing generic type `T`. - pub(crate) _marker: PhantomData, + /// A marker capturing generic types `T` and `M`. + pub(crate) _marker: PhantomData<(T, M)>, } -unsafe impl Send for Task {} -unsafe impl Sync for Task {} +unsafe impl Send for Task {} +unsafe impl Sync for Task {} -impl Unpin for Task {} +impl Unpin for Task {} #[cfg(feature = "std")] -impl std::panic::UnwindSafe for Task {} +impl std::panic::UnwindSafe for Task {} #[cfg(feature = "std")] -impl std::panic::RefUnwindSafe for Task {} +impl std::panic::RefUnwindSafe for Task {} -impl Task { +impl Task { /// Detaches the task to let it keep running in the background. /// /// # Examples @@ -173,14 +173,14 @@ impl Task { /// // Wait for the task's output. /// assert_eq!(future::block_on(task.fallible()), None); /// ``` - pub fn fallible(self) -> FallibleTask { + pub fn fallible(self) -> FallibleTask { FallibleTask { task: self } } /// Puts the task in canceled state. fn set_canceled(&mut self) { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { let mut state = (*header).state.load(Ordering::Acquire); @@ -228,7 +228,7 @@ impl Task { /// Puts the task in detached state. fn set_detached(&mut self) -> Option { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { // A place where the output will be stored in case it needs to be dropped. @@ -316,7 +316,7 @@ impl Task { /// 4. It is completed and the `Task` gets dropped. fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll> { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { let mut state = (*header).state.load(Ordering::Acquire); @@ -391,9 +391,9 @@ impl Task { } } - fn header(&self) -> &Header { + fn header(&self) -> &Header { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { &*header } } @@ -402,23 +402,31 @@ impl Task { /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. pub fn is_finished(&self) -> bool { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header; unsafe { let state = (*header).state.load(Ordering::Acquire); state & (CLOSED | COMPLETED) != 0 } } + + /// Get the metadata associated with this task. + /// + /// Tasks can be created with a metadata object associated with them; by default, this + /// is a `()` value. See the [`Builder::metadata()`] method for more information. + pub fn metadata(&self) -> &M { + &self.header().metadata + } } -impl Drop for Task { +impl Drop for Task { fn drop(&mut self) { self.set_canceled(); self.set_detached(); } } -impl Future for Task { +impl Future for Task { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -429,7 +437,7 @@ impl Future for Task { } } -impl fmt::Debug for Task { +impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Task") .field("header", self.header()) @@ -446,11 +454,11 @@ impl fmt::Debug for Task { /// This can be useful to avoid the panic produced when polling the `Task` /// future if the executor dropped its `Runnable`. #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -pub struct FallibleTask { - task: Task, +pub struct FallibleTask { + task: Task, } -impl FallibleTask { +impl FallibleTask { /// Detaches the task to let it keep running in the background. /// /// # Examples @@ -515,7 +523,7 @@ impl FallibleTask { } } -impl Future for FallibleTask { +impl Future for FallibleTask { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -523,7 +531,7 @@ impl Future for FallibleTask { } } -impl fmt::Debug for FallibleTask { +impl fmt::Debug for FallibleTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FallibleTask") .field("header", self.task.header()) diff --git a/tests/metadata.rs b/tests/metadata.rs new file mode 100644 index 0000000..d3d8d53 --- /dev/null +++ b/tests/metadata.rs @@ -0,0 +1,58 @@ +use async_task::{Builder, Runnable}; +use flume::unbounded; +use smol::future; + +use std::sync::atomic::{AtomicUsize, Ordering}; + +#[test] +fn metadata_use_case() { + // Each future has a counter that is incremented every time it is scheduled. + let (sender, receiver) = unbounded::>(); + let schedule = move |runnable: Runnable| { + runnable.metadata().fetch_add(1, Ordering::SeqCst); + sender.send(runnable).ok(); + }; + + async fn my_future(counter: &AtomicUsize) { + loop { + // Loop until we've been scheduled five times. + let count = counter.load(Ordering::SeqCst); + if count < 5 { + // Make sure that we are immediately scheduled again. + future::yield_now().await; + continue; + } + + // We've been scheduled five times, so we're done. + break; + } + } + + let make_task = || { + // SAFETY: We are spawning a non-'static future, so we need to use the unsafe API. + // The borrowed variables, in this case the metadata, are guaranteed to outlive the runnable. + let (runnable, task) = unsafe { + Builder::new() + .metadata(AtomicUsize::new(0)) + .spawn_unchecked(my_future, schedule.clone()) + }; + + runnable.schedule(); + task + }; + + // Make tasks. + let t1 = make_task(); + let t2 = make_task(); + + // Run the tasks. + while let Ok(runnable) = receiver.try_recv() { + runnable.run(); + } + + // Unwrap the tasks. + smol::future::block_on(async move { + t1.await; + t2.await; + }); +}