From fcfa4ab885682a86c0b8aa951447cfd5f559cd25 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 25 Nov 2019 18:39:17 +0100 Subject: [PATCH 1/4] Add spawn_local function --- examples/panic-propagation.rs | 6 +- examples/panic-result.rs | 9 +- examples/spawn-local.rs | 76 ++++++++ examples/spawn-on-thread.rs | 4 +- examples/spawn.rs | 9 +- examples/task-id.rs | 9 +- src/join_handle.rs | 4 +- src/lib.rs | 5 +- src/raw.rs | 16 +- src/task.rs | 100 +++++++++- tests/pending.rs | 353 ++++++++++++++++++++++++++++++++++ 11 files changed, 562 insertions(+), 29 deletions(-) create mode 100644 examples/spawn-local.rs create mode 100644 tests/pending.rs diff --git a/examples/panic-propagation.rs b/examples/panic-propagation.rs index 8a5339f..05ec85a 100644 --- a/examples/panic-propagation.rs +++ b/examples/panic-propagation.rs @@ -11,6 +11,8 @@ use futures::executor; use futures::future::FutureExt; use lazy_static::lazy_static; +type Task = async_task::Task<()>; + /// Spawns a future on the executor. fn spawn(future: F) -> JoinHandle where @@ -19,8 +21,8 @@ where { lazy_static! { // A channel that holds scheduled tasks. - static ref QUEUE: Sender> = { - let (sender, receiver) = unbounded::>(); + static ref QUEUE: Sender = { + let (sender, receiver) = unbounded::(); // Start the executor thread. thread::spawn(|| { diff --git a/examples/panic-result.rs b/examples/panic-result.rs index 7cf5a14..6308240 100644 --- a/examples/panic-result.rs +++ b/examples/panic-result.rs @@ -9,16 +9,19 @@ use futures::executor; use futures::future::FutureExt; use lazy_static::lazy_static; +type Task = async_task::Task<()>; +type JoinHandle = async_task::JoinHandle; + /// Spawns a future on the executor. -fn spawn(future: F) -> async_task::JoinHandle, ()> +fn spawn(future: F) -> JoinHandle> where F: Future + Send + 'static, R: Send + 'static, { lazy_static! { // A channel that holds scheduled tasks. - static ref QUEUE: Sender> = { - let (sender, receiver) = unbounded::>(); + static ref QUEUE: Sender = { + let (sender, receiver) = unbounded::(); // Start the executor thread. thread::spawn(|| { diff --git a/examples/spawn-local.rs b/examples/spawn-local.rs new file mode 100644 index 0000000..4e66c32 --- /dev/null +++ b/examples/spawn-local.rs @@ -0,0 +1,76 @@ +//! A simple single-threaded executor that can spawn non-`Send` futures. + +use std::cell::Cell; +use std::future::Future; +use std::rc::Rc; + +use crossbeam::channel::{unbounded, Receiver, Sender}; + +type Task = async_task::Task<()>; +type JoinHandle = async_task::JoinHandle; + +thread_local! { + // A channel that holds scheduled tasks. + static QUEUE: (Sender, Receiver) = unbounded(); +} + +/// Spawns a future on the executor. +fn spawn(future: F) -> JoinHandle +where + F: Future + 'static, + R: 'static, +{ + // Create a task that is scheduled by sending itself into the channel. + let schedule = |t| QUEUE.with(|(s, _)| s.send(t).unwrap()); + let (task, handle) = async_task::spawn_local(future, schedule, ()); + + // Schedule the task by sending it into the queue. + task.schedule(); + + handle +} + +/// Runs a future to completion. +fn run(future: F) -> R +where + F: Future + 'static, + R: 'static, +{ + // Spawn a task that sends its result through a channel. + let (s, r) = unbounded(); + spawn(async move { s.send(future.await).unwrap() }); + + loop { + // If the original task has completed, return its result. + if let Ok(val) = r.try_recv() { + return val; + } + + // Otherwise, take a task from the queue and run it. + QUEUE.with(|(_, r)| r.recv().unwrap().run()); + } +} + +fn main() { + let val = Rc::new(Cell::new(0)); + + // Run a future that increments a non-`Send` value. + run({ + let val = val.clone(); + async move { + // Spawn a future that increments the value. + let handle = spawn({ + let val = val.clone(); + async move { + val.set(dbg!(val.get()) + 1); + } + }); + + val.set(dbg!(val.get()) + 1); + handle.await; + } + }); + + // The value should be 2 at the end of the program. + dbg!(val.get()); +} diff --git a/examples/spawn-on-thread.rs b/examples/spawn-on-thread.rs index 22da0c5..95214ed 100644 --- a/examples/spawn-on-thread.rs +++ b/examples/spawn-on-thread.rs @@ -7,10 +7,12 @@ use std::thread; use crossbeam::channel; use futures::executor; +type JoinHandle = async_task::JoinHandle; + /// Spawns a future on a new dedicated thread. /// /// The returned handle can be used to await the output of the future. -fn spawn_on_thread(future: F) -> async_task::JoinHandle +fn spawn_on_thread(future: F) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, diff --git a/examples/spawn.rs b/examples/spawn.rs index 4af5a02..9db7215 100644 --- a/examples/spawn.rs +++ b/examples/spawn.rs @@ -8,16 +8,19 @@ use crossbeam::channel::{unbounded, Sender}; use futures::executor; use lazy_static::lazy_static; +type Task = async_task::Task<()>; +type JoinHandle = async_task::JoinHandle; + /// Spawns a future on the executor. -fn spawn(future: F) -> async_task::JoinHandle +fn spawn(future: F) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, { lazy_static! { // A channel that holds scheduled tasks. - static ref QUEUE: Sender> = { - let (sender, receiver) = unbounded::>(); + static ref QUEUE: Sender = { + let (sender, receiver) = unbounded::(); // Start the executor thread. thread::spawn(|| { diff --git a/examples/task-id.rs b/examples/task-id.rs index 66b7aec..2a7bcf7 100644 --- a/examples/task-id.rs +++ b/examples/task-id.rs @@ -13,6 +13,9 @@ use lazy_static::lazy_static; #[derive(Clone, Copy, Debug)] struct TaskId(usize); +type Task = async_task::Task; +type JoinHandle = async_task::JoinHandle; + thread_local! { /// The ID of the current task. static TASK_ID: Cell> = Cell::new(None); @@ -26,15 +29,15 @@ fn task_id() -> Option { } /// Spawns a future on the executor. -fn spawn(future: F) -> async_task::JoinHandle +fn spawn(future: F) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, { lazy_static! { // A channel that holds scheduled tasks. - static ref QUEUE: Sender> = { - let (sender, receiver) = unbounded::>(); + static ref QUEUE: Sender = { + let (sender, receiver) = unbounded::(); // Start the executor thread. thread::spawn(|| { diff --git a/src/join_handle.rs b/src/join_handle.rs index bd9366f..b3c4da5 100644 --- a/src/join_handle.rs +++ b/src/join_handle.rs @@ -24,8 +24,8 @@ pub struct JoinHandle { pub(crate) _marker: PhantomData<(R, T)>, } -unsafe impl Send for JoinHandle {} -unsafe impl Sync for JoinHandle {} +unsafe impl Send for JoinHandle {} +unsafe impl Sync for JoinHandle {} impl Unpin for JoinHandle {} diff --git a/src/lib.rs b/src/lib.rs index 3f61ea4..a265679 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,7 +23,7 @@ //! # let (task, handle) = async_task::spawn(future, schedule, ()); //! ``` //! -//! A task is constructed using the [`spawn`] function: +//! A task is constructed using either [`spawn`] or [`spawn_local`]: //! //! ``` //! # let (sender, receiver) = crossbeam::channel::unbounded(); @@ -93,6 +93,7 @@ //! union of the future and its output. //! //! [`spawn`]: fn.spawn.html +//! [`spawn_local`]: fn.spawn_local.html //! [`Task`]: struct.Task.html //! [`JoinHandle`]: struct.JoinHandle.html @@ -108,4 +109,4 @@ mod task; mod utils; pub use crate::join_handle::JoinHandle; -pub use crate::task::{spawn, Task}; +pub use crate::task::{spawn, spawn_local, Task}; diff --git a/src/raw.rs b/src/raw.rs index 3b993a3..2c47f0c 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -95,15 +95,13 @@ impl Clone for RawTask { impl RawTask where - F: Future + Send + 'static, - R: Send + 'static, + F: Future + 'static, S: Fn(Task) + Send + Sync + 'static, - T: Send + 'static, { /// Allocates a task with the given `future` and `schedule` function. /// /// It is assumed that initially only the `Task` reference and the `JoinHandle` exist. - pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> { + pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> { // Compute the layout of the task for allocation. Abort if the computation fails. let task_layout = abort_on_panic(|| Self::task_layout()); @@ -592,17 +590,13 @@ where /// A guard that closes the task if polling its future panics. struct Guard(RawTask) where - F: Future + Send + 'static, - R: Send + 'static, - S: Fn(Task) + Send + Sync + 'static, - T: Send + 'static; + F: Future + 'static, + S: Fn(Task) + Send + Sync + 'static; impl Drop for Guard where - F: Future + Send + 'static, - R: Send + 'static, + F: Future + 'static, S: Fn(Task) + Send + Sync + 'static, - T: Send + 'static, { fn drop(&mut self) { let raw = self.0; diff --git a/src/task.rs b/src/task.rs index 42a4024..b12cace 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,8 +1,11 @@ use std::fmt; use std::future::Future; use std::marker::PhantomData; -use std::mem; +use std::mem::{self, ManuallyDrop}; +use std::pin::Pin; use std::ptr::NonNull; +use std::task::{Context, Poll}; +use std::thread::{self, ThreadId}; use crate::header::Header; use crate::raw::RawTask; @@ -16,8 +19,13 @@ use crate::JoinHandle; /// When run, the task polls `future`. When woken up, it gets scheduled for running by the /// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. /// +/// If you need to spawn a future that does not implement [`Send`], consider using the +/// [`spawn_local`] function instead. +/// /// [`Task`]: struct.Task.html /// [`JoinHandle`]: struct.JoinHandle.html +/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html +/// [`spawn_local`]: fn.spawn_local.html /// /// # Examples /// @@ -43,7 +51,95 @@ where S: Fn(Task) + Send + Sync + 'static, T: Send + Sync + 'static, { - let raw_task = RawTask::::allocate(tag, future, schedule); + let raw_task = RawTask::::allocate(future, schedule, tag); + let task = Task { + raw_task, + _marker: PhantomData, + }; + let handle = JoinHandle { + raw_task, + _marker: PhantomData, + }; + (task, handle) +} + +/// Creates a new local task. +/// +/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that +/// awaits its result. +/// +/// When run, the task polls `future`. When woken up, it gets scheduled for running by the +/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. +/// +/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the +/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur. +/// +/// [`Task`]: struct.Task.html +/// [`JoinHandle`]: struct.JoinHandle.html +/// [`spawn`]: fn.spawn.html +/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html +/// +/// # Examples +/// +/// ``` +/// use crossbeam::channel; +/// +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up, it will be sent into this channel. +/// let (s, r) = channel::unbounded(); +/// let schedule = move |task| s.send(task).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (task, handle) = async_task::spawn_local(future, schedule, ()); +/// ``` +pub fn spawn_local(future: F, schedule: S, tag: T) -> (Task, JoinHandle) +where + F: Future + 'static, + R: 'static, + S: Fn(Task) + Send + Sync + 'static, + T: Send + Sync + 'static, +{ + thread_local! { + static ID: ThreadId = thread::current().id(); + } + + struct Checked { + id: ThreadId, + inner: ManuallyDrop, + } + + impl Drop for Checked { + fn drop(&mut self) { + if ID.with(|id| *id) != self.id { + panic!("local task dropped by a thread that didn't spawn it"); + } + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl Future for Checked { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if ID.with(|id| *id) != self.id { + panic!("local task polled by a thread that didn't spawn it"); + } + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } + + let future = Checked { + id: ID.with(|id| *id), + inner: ManuallyDrop::new(future), + }; + + let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag); let task = Task { raw_task, _marker: PhantomData, diff --git a/tests/pending.rs b/tests/pending.rs new file mode 100644 index 0000000..5840763 --- /dev/null +++ b/tests/pending.rs @@ -0,0 +1,353 @@ +#![feature(async_await)] + +use std::future::Future; +use std::pin::Pin; +use std::task::Waker; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Task; +use crossbeam::atomic::AtomicCell; +use crossbeam::channel; +use lazy_static::lazy_static; + +// Creates a future with event counters. +// +// Usage: `future!(f, waker, POLL, DROP)` +// +// The future `f` always sleeps for 200 ms and returns `Poll::Pending`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Every time the future is run, it stores the waker into a global variable. +// This waker can be extracted using the `waker` function. +macro_rules! future { + ($name:pat, $poll:ident, $drop:ident) => { + lazy_static! { + static ref $poll: AtomicCell = AtomicCell::new(0); + static ref $drop: AtomicCell = AtomicCell::new(0); + } + + let $name = { + struct Fut(Box); + + impl Future for Fut { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + $poll.fetch_add(1); + thread::sleep(ms(200)); + Poll::Pending + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + lazy_static! { + static ref $sched: AtomicCell = AtomicCell::new(0); + static ref $drop: AtomicCell = AtomicCell::new(0); + } + + let $name = { + struct Guard(Box); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + let guard = Guard(Box::new(0)); + move |_task| { + &guard; + $sched.fetch_add(1); + } + }; + }; +} + +// Creates a task with event counters. +// +// Usage: `task!(task, handle f, s, DROP)` +// +// A task with future `f` and schedule function `s` is created. +// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. +// When the tag inside the task gets dropped, `DROP` is incremented. +macro_rules! task { + ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { + lazy_static! { + static ref $drop: AtomicCell = AtomicCell::new(0); + } + + let ($task, $handle) = { + struct Tag(Box); + + impl Drop for Tag { + fn drop(&mut self) { + $drop.fetch_add(1); + } + } + + async_task::spawn($future, $schedule, Tag(Box::new(0))) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn foo() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + task!(task, handle, f, s, DROP_D); + + assert_eq!(DROP_F.load(), 0); + drop(task); + assert_eq!(DROP_F.load(), 0); + drop(handle); + + assert_eq!(DROP_F.load(), 1); +} +// +// #[test] +// fn wake_during_run() { +// future!(f, waker, POLL, DROP_F); +// schedule!(s, chan, SCHEDULE, DROP_S); +// task!(task, _handle, f, s, DROP_D); +// +// task.run(); +// let w = waker(); +// w.wake_by_ref(); +// let task = chan.recv().unwrap(); +// +// crossbeam::scope(|scope| { +// scope.spawn(|_| { +// task.run(); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 2); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 1); +// }); +// +// thread::sleep(ms(100)); +// +// w.wake_by_ref(); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// thread::sleep(ms(200)); +// +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 2); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 1); +// }) +// .unwrap(); +// +// chan.recv().unwrap(); +// drop(waker()); +// } +// +// #[test] +// fn cancel_during_run() { +// future!(f, waker, POLL, DROP_F); +// schedule!(s, chan, SCHEDULE, DROP_S); +// task!(task, handle, f, s, DROP_D); +// +// task.run(); +// let w = waker(); +// w.wake(); +// let task = chan.recv().unwrap(); +// +// crossbeam::scope(|scope| { +// scope.spawn(|_| { +// task.run(); +// drop(waker()); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 1); +// assert_eq!(DROP_S.load(), 1); +// assert_eq!(DROP_D.load(), 1); +// assert_eq!(chan.len(), 0); +// }); +// +// thread::sleep(ms(100)); +// +// handle.cancel(); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// drop(handle); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// thread::sleep(ms(200)); +// +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 1); +// assert_eq!(DROP_S.load(), 1); +// assert_eq!(DROP_D.load(), 1); +// assert_eq!(chan.len(), 0); +// }) +// .unwrap(); +// } +// +// #[test] +// fn wake_and_cancel_during_run() { +// future!(f, waker, POLL, DROP_F); +// schedule!(s, chan, SCHEDULE, DROP_S); +// task!(task, handle, f, s, DROP_D); +// +// task.run(); +// let w = waker(); +// w.wake_by_ref(); +// let task = chan.recv().unwrap(); +// +// crossbeam::scope(|scope| { +// scope.spawn(|_| { +// task.run(); +// drop(waker()); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 1); +// assert_eq!(DROP_S.load(), 1); +// assert_eq!(DROP_D.load(), 1); +// assert_eq!(chan.len(), 0); +// }); +// +// thread::sleep(ms(100)); +// +// w.wake(); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// handle.cancel(); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// drop(handle); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// thread::sleep(ms(200)); +// +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 1); +// assert_eq!(DROP_S.load(), 1); +// assert_eq!(DROP_D.load(), 1); +// assert_eq!(chan.len(), 0); +// }) +// .unwrap(); +// } +// +// #[test] +// fn cancel_and_wake_during_run() { +// future!(f, waker, POLL, DROP_F); +// schedule!(s, chan, SCHEDULE, DROP_S); +// task!(task, handle, f, s, DROP_D); +// +// task.run(); +// let w = waker(); +// w.wake_by_ref(); +// let task = chan.recv().unwrap(); +// +// crossbeam::scope(|scope| { +// scope.spawn(|_| { +// task.run(); +// drop(waker()); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 1); +// assert_eq!(DROP_S.load(), 1); +// assert_eq!(DROP_D.load(), 1); +// assert_eq!(chan.len(), 0); +// }); +// +// thread::sleep(ms(100)); +// +// handle.cancel(); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// drop(handle); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// w.wake(); +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 0); +// assert_eq!(DROP_S.load(), 0); +// assert_eq!(DROP_D.load(), 0); +// assert_eq!(chan.len(), 0); +// +// thread::sleep(ms(200)); +// +// assert_eq!(POLL.load(), 2); +// assert_eq!(SCHEDULE.load(), 1); +// assert_eq!(DROP_F.load(), 1); +// assert_eq!(DROP_S.load(), 1); +// assert_eq!(DROP_D.load(), 1); +// assert_eq!(chan.len(), 0); +// }) +// .unwrap(); +// } From 951d9711f6144797c342bb1e47490fd79b372606 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 25 Nov 2019 18:47:16 +0100 Subject: [PATCH 2/4] Add an note about the schedule function --- src/task.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/task.rs b/src/task.rs index b12cace..83cdf79 100644 --- a/src/task.rs +++ b/src/task.rs @@ -19,6 +19,9 @@ use crate::JoinHandle; /// When run, the task polls `future`. When woken up, it gets scheduled for running by the /// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. /// +/// The schedule function should not attempt to run the task nor to drop it. Instead, it should +/// push the task into some kind of queue so that it can be processed later. +/// /// If you need to spawn a future that does not implement [`Send`], consider using the /// [`spawn_local`] function instead. /// @@ -71,6 +74,9 @@ where /// When run, the task polls `future`. When woken up, it gets scheduled for running by the /// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task. /// +/// The schedule function should not attempt to run the task nor to drop it. Instead, it should +/// push the task into some kind of queue so that it can be processed later. +/// /// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the /// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur. /// From 9b69acdd25fd38a08e9c4b866cf5a0751934693f Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 25 Nov 2019 18:52:55 +0100 Subject: [PATCH 3/4] Relax the bounds on Send for JoinHandle impl --- src/join_handle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/join_handle.rs b/src/join_handle.rs index b3c4da5..b192418 100644 --- a/src/join_handle.rs +++ b/src/join_handle.rs @@ -25,7 +25,7 @@ pub struct JoinHandle { } unsafe impl Send for JoinHandle {} -unsafe impl Sync for JoinHandle {} +unsafe impl Sync for JoinHandle {} impl Unpin for JoinHandle {} From 5d80be610a66654530bb0a80f613b62e67af8d3b Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 25 Nov 2019 18:54:21 +0100 Subject: [PATCH 4/4] Remove accidentally added pending.rs --- tests/pending.rs | 353 ----------------------------------------------- 1 file changed, 353 deletions(-) delete mode 100644 tests/pending.rs diff --git a/tests/pending.rs b/tests/pending.rs deleted file mode 100644 index 5840763..0000000 --- a/tests/pending.rs +++ /dev/null @@ -1,353 +0,0 @@ -#![feature(async_await)] - -use std::future::Future; -use std::pin::Pin; -use std::task::Waker; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; - -use async_task::Task; -use crossbeam::atomic::AtomicCell; -use crossbeam::channel; -use lazy_static::lazy_static; - -// Creates a future with event counters. -// -// Usage: `future!(f, waker, POLL, DROP)` -// -// The future `f` always sleeps for 200 ms and returns `Poll::Pending`. -// When it gets polled, `POLL` is incremented. -// When it gets dropped, `DROP` is incremented. -// -// Every time the future is run, it stores the waker into a global variable. -// This waker can be extracted using the `waker` function. -macro_rules! future { - ($name:pat, $poll:ident, $drop:ident) => { - lazy_static! { - static ref $poll: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Fut(Box); - - impl Future for Fut { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - $poll.fetch_add(1); - thread::sleep(ms(200)); - Poll::Pending - } - } - - impl Drop for Fut { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - Fut(Box::new(0)) - }; - }; -} - -// Creates a schedule function with event counters. -// -// Usage: `schedule!(s, SCHED, DROP)` -// -// The schedule function `s` does nothing. -// When it gets invoked, `SCHED` is incremented. -// When it gets dropped, `DROP` is incremented. -macro_rules! schedule { - ($name:pat, $sched:ident, $drop:ident) => { - lazy_static! { - static ref $sched: AtomicCell = AtomicCell::new(0); - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let $name = { - struct Guard(Box); - - impl Drop for Guard { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - let guard = Guard(Box::new(0)); - move |_task| { - &guard; - $sched.fetch_add(1); - } - }; - }; -} - -// Creates a task with event counters. -// -// Usage: `task!(task, handle f, s, DROP)` -// -// A task with future `f` and schedule function `s` is created. -// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively. -// When the tag inside the task gets dropped, `DROP` is incremented. -macro_rules! task { - ($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => { - lazy_static! { - static ref $drop: AtomicCell = AtomicCell::new(0); - } - - let ($task, $handle) = { - struct Tag(Box); - - impl Drop for Tag { - fn drop(&mut self) { - $drop.fetch_add(1); - } - } - - async_task::spawn($future, $schedule, Tag(Box::new(0))) - }; - }; -} - -fn ms(ms: u64) -> Duration { - Duration::from_millis(ms) -} - -#[test] -fn foo() { - future!(f, POLL, DROP_F); - schedule!(s, SCHEDULE, DROP_S); - task!(task, handle, f, s, DROP_D); - - assert_eq!(DROP_F.load(), 0); - drop(task); - assert_eq!(DROP_F.load(), 0); - drop(handle); - - assert_eq!(DROP_F.load(), 1); -} -// -// #[test] -// fn wake_during_run() { -// future!(f, waker, POLL, DROP_F); -// schedule!(s, chan, SCHEDULE, DROP_S); -// task!(task, _handle, f, s, DROP_D); -// -// task.run(); -// let w = waker(); -// w.wake_by_ref(); -// let task = chan.recv().unwrap(); -// -// crossbeam::scope(|scope| { -// scope.spawn(|_| { -// task.run(); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 2); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 1); -// }); -// -// thread::sleep(ms(100)); -// -// w.wake_by_ref(); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// thread::sleep(ms(200)); -// -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 2); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 1); -// }) -// .unwrap(); -// -// chan.recv().unwrap(); -// drop(waker()); -// } -// -// #[test] -// fn cancel_during_run() { -// future!(f, waker, POLL, DROP_F); -// schedule!(s, chan, SCHEDULE, DROP_S); -// task!(task, handle, f, s, DROP_D); -// -// task.run(); -// let w = waker(); -// w.wake(); -// let task = chan.recv().unwrap(); -// -// crossbeam::scope(|scope| { -// scope.spawn(|_| { -// task.run(); -// drop(waker()); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 1); -// assert_eq!(DROP_S.load(), 1); -// assert_eq!(DROP_D.load(), 1); -// assert_eq!(chan.len(), 0); -// }); -// -// thread::sleep(ms(100)); -// -// handle.cancel(); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// drop(handle); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// thread::sleep(ms(200)); -// -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 1); -// assert_eq!(DROP_S.load(), 1); -// assert_eq!(DROP_D.load(), 1); -// assert_eq!(chan.len(), 0); -// }) -// .unwrap(); -// } -// -// #[test] -// fn wake_and_cancel_during_run() { -// future!(f, waker, POLL, DROP_F); -// schedule!(s, chan, SCHEDULE, DROP_S); -// task!(task, handle, f, s, DROP_D); -// -// task.run(); -// let w = waker(); -// w.wake_by_ref(); -// let task = chan.recv().unwrap(); -// -// crossbeam::scope(|scope| { -// scope.spawn(|_| { -// task.run(); -// drop(waker()); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 1); -// assert_eq!(DROP_S.load(), 1); -// assert_eq!(DROP_D.load(), 1); -// assert_eq!(chan.len(), 0); -// }); -// -// thread::sleep(ms(100)); -// -// w.wake(); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// handle.cancel(); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// drop(handle); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// thread::sleep(ms(200)); -// -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 1); -// assert_eq!(DROP_S.load(), 1); -// assert_eq!(DROP_D.load(), 1); -// assert_eq!(chan.len(), 0); -// }) -// .unwrap(); -// } -// -// #[test] -// fn cancel_and_wake_during_run() { -// future!(f, waker, POLL, DROP_F); -// schedule!(s, chan, SCHEDULE, DROP_S); -// task!(task, handle, f, s, DROP_D); -// -// task.run(); -// let w = waker(); -// w.wake_by_ref(); -// let task = chan.recv().unwrap(); -// -// crossbeam::scope(|scope| { -// scope.spawn(|_| { -// task.run(); -// drop(waker()); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 1); -// assert_eq!(DROP_S.load(), 1); -// assert_eq!(DROP_D.load(), 1); -// assert_eq!(chan.len(), 0); -// }); -// -// thread::sleep(ms(100)); -// -// handle.cancel(); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// drop(handle); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// w.wake(); -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 0); -// assert_eq!(DROP_S.load(), 0); -// assert_eq!(DROP_D.load(), 0); -// assert_eq!(chan.len(), 0); -// -// thread::sleep(ms(200)); -// -// assert_eq!(POLL.load(), 2); -// assert_eq!(SCHEDULE.load(), 1); -// assert_eq!(DROP_F.load(), 1); -// assert_eq!(DROP_S.load(), 1); -// assert_eq!(DROP_D.load(), 1); -// assert_eq!(chan.len(), 0); -// }) -// .unwrap(); -// }