Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(maitake): implement JoinHandles #261

Merged
merged 23 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion maitake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ optional = true
[dev-dependencies]
futures-util = "0.3"
futures = "0.3"
tokio-test = "0.4"

[target.'cfg(not(loom))'.dev-dependencies]
tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", features = ["ansi", "fmt"] }
tracing-subscriber = { git = "https://github.com/tokio-rs/tracing", default-features = false, features = ["ansi", "fmt"] }
tracing-02 = { package = "tracing", git = "https://github.com/tokio-rs/tracing", default-features = false }
console-subscriber = "0.1.6"

Expand Down
50 changes: 49 additions & 1 deletion maitake/src/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,55 @@ mod inner {
#![allow(dead_code)]

#[cfg(feature = "alloc")]
pub(crate) use loom::alloc;
pub(crate) mod alloc {
use super::sync::Arc;
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub(crate) use loom::alloc::*;
#[pin_project::pin_project]
pub(crate) struct TrackFuture<F> {
#[pin]
inner: F,
track: Arc<()>,
}

impl<F: Future> Future for TrackFuture<F> {
type Output = TrackFuture<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.inner.poll(cx).map(|inner| TrackFuture {
inner,
track: this.track.clone(),
})
}
}

impl<F> TrackFuture<F> {
/// Wrap a `Future` in a `TrackFuture` that participates in Loom's
/// leak checking.
#[track_caller]
pub(crate) fn new(inner: F) -> Self {
Self {
inner,
track: Arc::new(()),
}
}

/// Stop tracking this future, and return the inner value.
pub(crate) fn into_inner(self) -> F {
self.inner
}
}

#[track_caller]
pub(crate) fn track_future<F: Future>(inner: F) -> TrackFuture<F> {
TrackFuture::new(inner)
}
}

pub(crate) use loom::{cell, future, model, thread};

pub(crate) mod sync {
Expand Down
40 changes: 28 additions & 12 deletions maitake/src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::task::{self, Header, Storage, TaskRef};
use crate::task::{self, Header, JoinHandle, Storage, TaskRef};
use core::{future::Future, pin::Pin};

use cordyceps::mpsc_queue::MpscQueue;
Expand Down Expand Up @@ -87,13 +87,15 @@ impl StaticScheduler {
/// [`Storage`]: crate::task::Storage
#[inline]
#[track_caller]
pub fn spawn_allocated<F, STO>(&'static self, task: STO::StoredTask)
pub fn spawn_allocated<F, STO>(&'static self, task: STO::StoredTask) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
STO: Storage<&'static Self, F>,
{
let tr = TaskRef::new_allocated::<&'static Self, F, STO>(task);
self.schedule(tr);
let (task, join) = TaskRef::new_allocated::<&'static Self, F, STO>(task);
self.schedule(task);
join
}

/// Returns a new [task `Builder`] for configuring tasks prior to spawning
Expand Down Expand Up @@ -245,18 +247,26 @@ feature! {

#[inline]
#[track_caller]
pub fn spawn(&self, future: impl Future + 'static) {
self.schedule(TaskRef::new(self.clone(), future));
pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let (task, join) = TaskRef::new(self.clone(), future);
self.schedule(task);
join
}

#[inline]
#[track_caller]
pub fn spawn_allocated<F>(&'static self, task: Box<Task<Self, F, BoxStorage>>)
pub fn spawn_allocated<F>(&'static self, task: Box<Task<Self, F, BoxStorage>>) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let tr = TaskRef::new_allocated::<Self, F, BoxStorage>(task);
self.schedule(tr);
let (task, join) = TaskRef::new_allocated::<Self, F, BoxStorage>(task);
self.schedule(task);
join
}

pub fn tick(&self) -> Tick {
Expand All @@ -278,14 +288,20 @@ feature! {

#[inline]
#[track_caller]
pub fn spawn(&'static self, future: impl Future + 'static) {
self.schedule(TaskRef::new(self, future));
pub fn spawn<F>(&'static self, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let (task, join) = TaskRef::new(self, future);
self.schedule(task);
join
}
}

impl Core {
fn new() -> Self {
let stub_task = TaskRef::new(Stub, Stub);
let (stub_task, _) = TaskRef::new(Stub, Stub);
Self {
run_queue: MpscQueue::new_with_stub(test_dbg!(stub_task)),
}
Expand Down
39 changes: 7 additions & 32 deletions maitake/src/scheduler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@ mod custom_storage {
}
}

impl<F: Future + 'static> MyBoxTask<&'static StaticScheduler, F> {
fn spawn(scheduler: &'static StaticScheduler, future: F) {
impl<F> MyBoxTask<&'static StaticScheduler, F>
where
F: Future + 'static,
F::Output: 'static,
{
fn spawn(scheduler: &'static StaticScheduler, future: F) -> task::JoinHandle<F::Output> {
let task = MyBoxTask(Box::new(Task::new(scheduler, future)));
scheduler.spawn_allocated::<F, MyBoxStorage>(task)
}
Expand Down Expand Up @@ -155,39 +159,10 @@ mod loom {
use super::*;
use crate::loom::{
self,
alloc::track_future,
sync::{atomic::AtomicBool, Arc},
thread,
};
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

#[pin_project::pin_project]
struct TrackFuture<F> {
#[pin]
inner: F,
track: Arc<()>,
}

impl<F: Future> Future for TrackFuture<F> {
type Output = TrackFuture<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.inner.poll(cx).map(|inner| TrackFuture {
inner,
track: this.track.clone(),
})
}
}

fn track_future<F: Future>(inner: F) -> TrackFuture<F> {
TrackFuture {
inner,
track: Arc::new(()),
}
}

#[test]
fn basically_works() {
Expand Down
Loading