Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add variant of LocalPool with bound lifetime #2846

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion futures-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ extern crate std;
#[cfg(feature = "std")]
mod local_pool;
#[cfg(feature = "std")]
pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner};
pub use crate::local_pool::{
block_on, block_on_stream, BlockingStream, BoundLocalPool, BoundLocalSpawner, LocalPool,
LocalSpawner,
};

#[cfg(feature = "thread-pool")]
#[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
Expand Down
69 changes: 52 additions & 17 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::enter;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_task::{waker_ref, ArcWake};
use futures_task::{waker_ref, ArcWake, BoundLocalSpawn};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_util::pin_mut;
use futures_util::stream::FuturesUnordered;
Expand All @@ -17,6 +17,36 @@ use std::sync::{
use std::thread::{self, Thread};
use std::vec::Vec;

/// A single-threaded task pool with bound lifetime for polling futures to
/// completion.
///
/// This executor allows you to multiplex any number of tasks onto a single
/// thread. It's appropriate to poll strictly I/O-bound futures that do very
/// little work in between I/O actions. The lifetime of the executor is bound by
/// a generic parameter. Futures associated with the executor need only outlive
/// this lifetime. That uncompleted futures are dropped when the lifetime of the
/// executor expires.
///
/// To get a handle to the pool that implements [`Spawn`](futures_task::Spawn),
/// use the [`spawner()`](BoundLocalPool::spawner) method. Because the executor
/// is single-threaded, it supports a special form of task spawning for
/// non-`Send` futures, via
/// [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
/// Additionally, tasks with a limited lifetime can be spawned via
/// [`spawn_bound_local_obj`](futures_task::BoundLocalSpawn::spawn_bound_local_obj).
#[derive(Debug)]
pub struct BoundLocalPool<'a> {
pool: FuturesUnordered<LocalFutureObj<'a, ()>>,
incoming: Rc<Incoming<'a>>,
}

/// A handle to a [`BoundLocalPool`] that implements
/// [`BoundLocalSpawn`](futures_task::BoundLocalSpawn).
#[derive(Clone, Debug)]
pub struct BoundLocalSpawner<'a> {
incoming: Weak<Incoming<'a>>,
}

/// A single-threaded task pool for polling futures to completion.
///
/// This executor allows you to multiplex any number of tasks onto a single
Expand All @@ -28,19 +58,13 @@ use std::vec::Vec;
/// [`spawner()`](LocalPool::spawner) method. Because the executor is
/// single-threaded, it supports a special form of task spawning for non-`Send`
/// futures, via [`spawn_local_obj`](futures_task::LocalSpawn::spawn_local_obj).
#[derive(Debug)]
pub struct LocalPool {
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
incoming: Rc<Incoming>,
}
pub type LocalPool = BoundLocalPool<'static>;

/// A handle to a [`LocalPool`] that implements [`Spawn`](futures_task::Spawn).
#[derive(Clone, Debug)]
pub struct LocalSpawner {
incoming: Weak<Incoming>,
}
/// A handle to a [`LocalPool`] that implements [`Spawn`](futures_task::Spawn)
/// and [`LocalSpawn`](futures_task::LocalSpawn).
pub type LocalSpawner = BoundLocalSpawner<'static>;

type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;
type Incoming<'a> = RefCell<Vec<LocalFutureObj<'a, ()>>>;

pub(crate) struct ThreadNotify {
/// The (single) executor thread.
Expand Down Expand Up @@ -107,15 +131,15 @@ fn woken() -> bool {
CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire))
}

impl LocalPool {
impl<'a> BoundLocalPool<'a> {
/// Create a new, empty pool of tasks.
pub fn new() -> Self {
Self { pool: FuturesUnordered::new(), incoming: Default::default() }
}

/// Get a clonable handle to the pool as a [`Spawn`].
pub fn spawner(&self) -> LocalSpawner {
LocalSpawner { incoming: Rc::downgrade(&self.incoming) }
pub fn spawner(&self) -> BoundLocalSpawner<'a> {
BoundLocalSpawner { incoming: Rc::downgrade(&self.incoming) }
}

/// Run all tasks in the pool to completion.
Expand Down Expand Up @@ -362,7 +386,7 @@ impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
}
}

impl Spawn for LocalSpawner {
impl Spawn for BoundLocalSpawner<'_> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future.into());
Expand All @@ -381,7 +405,7 @@ impl Spawn for LocalSpawner {
}
}

impl LocalSpawn for LocalSpawner {
impl LocalSpawn for BoundLocalSpawner<'_> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future);
Expand All @@ -399,3 +423,14 @@ impl LocalSpawn for LocalSpawner {
}
}
}

impl<'a> BoundLocalSpawn<'a> for BoundLocalSpawner<'a> {
fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> {
if let Some(incoming) = self.incoming.upgrade() {
incoming.borrow_mut().push(future);
Ok(())
} else {
Err(SpawnError::shutdown())
}
}
}
2 changes: 1 addition & 1 deletion futures-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ extern crate alloc;
extern crate std;

mod spawn;
pub use crate::spawn::{LocalSpawn, Spawn, SpawnError};
pub use crate::spawn::{BoundLocalSpawn, LocalSpawn, Spawn, SpawnError};

#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
#[cfg(feature = "alloc")]
Expand Down
117 changes: 30 additions & 87 deletions futures-task/src/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ pub trait LocalSpawn {
}
}

/// The `BoundLocalSpawn` is similar to [`LocalSpawn`], but allows spawning
/// futures that don't implement `Send` and have a lifetime that only needs to
/// exceed that of the associated executor.
pub trait BoundLocalSpawn<'a> {
/// Spawns a future that will be run to completion or until the executor is
/// dropped.
///
/// # Errors
///
/// The executor may be unable to spawn tasks. Spawn errors should
/// represent relatively rare scenarios, such as the executor
/// having been shut down so that it is no longer able to accept
/// tasks.
fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError>;
}

/// An error that occurred during spawning.
pub struct SpawnError {
_priv: (),
Expand Down Expand Up @@ -83,17 +99,10 @@ impl SpawnError {
}
}

impl<Sp: ?Sized + Spawn> Spawn for &Sp {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_obj(self, future)
}

fn status(&self) -> Result<(), SpawnError> {
Sp::status(self)
}
}

impl<Sp: ?Sized + Spawn> Spawn for &mut Sp {
impl<T, Sp: ?Sized + Spawn> Spawn for T
where
T: core::ops::Deref<Target = Sp>,
{
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_obj(self, future)
}
Expand All @@ -103,17 +112,10 @@ impl<Sp: ?Sized + Spawn> Spawn for &mut Sp {
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &Sp {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_local_obj(self, future)
}

fn status_local(&self) -> Result<(), SpawnError> {
Sp::status_local(self)
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &mut Sp {
impl<T, Sp: ?Sized + LocalSpawn> LocalSpawn for T
where
T: core::ops::Deref<Target = Sp>,
{
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
Sp::spawn_local_obj(self, future)
}
Expand All @@ -123,70 +125,11 @@ impl<Sp: ?Sized + LocalSpawn> LocalSpawn for &mut Sp {
}
}

#[cfg(feature = "alloc")]
mod if_alloc {
use super::*;
use alloc::{boxed::Box, rc::Rc};

impl<Sp: ?Sized + Spawn> Spawn for Box<Sp> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_obj(future)
}

fn status(&self) -> Result<(), SpawnError> {
(**self).status()
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for Box<Sp> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_local_obj(future)
}

fn status_local(&self) -> Result<(), SpawnError> {
(**self).status_local()
}
}

impl<Sp: ?Sized + Spawn> Spawn for Rc<Sp> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_obj(future)
}

fn status(&self) -> Result<(), SpawnError> {
(**self).status()
}
}

impl<Sp: ?Sized + LocalSpawn> LocalSpawn for Rc<Sp> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_local_obj(future)
}

fn status_local(&self) -> Result<(), SpawnError> {
(**self).status_local()
}
}

#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
impl<Sp: ?Sized + Spawn> Spawn for alloc::sync::Arc<Sp> {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_obj(future)
}

fn status(&self) -> Result<(), SpawnError> {
(**self).status()
}
}

#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
impl<Sp: ?Sized + LocalSpawn> LocalSpawn for alloc::sync::Arc<Sp> {
fn spawn_local_obj(&self, future: LocalFutureObj<'static, ()>) -> Result<(), SpawnError> {
(**self).spawn_local_obj(future)
}

fn status_local(&self) -> Result<(), SpawnError> {
(**self).status_local()
}
impl<'a, T, Sp: ?Sized + BoundLocalSpawn<'a>> BoundLocalSpawn<'a> for T
where
T: core::ops::Deref<Target = Sp>,
{
fn spawn_bound_local_obj(&self, future: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> {
Sp::spawn_bound_local_obj(self, future)
}
}
9 changes: 8 additions & 1 deletion futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use core::sync::atomic::{AtomicBool, AtomicPtr};
use futures_core::future::Future;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_task::{BoundLocalSpawn, FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};

mod abort;

Expand Down Expand Up @@ -78,6 +78,13 @@ impl LocalSpawn for FuturesUnordered<LocalFutureObj<'_, ()>> {
}
}

impl<'a> BoundLocalSpawn<'a> for FuturesUnordered<LocalFutureObj<'a, ()>> {
fn spawn_bound_local_obj(&self, future_obj: LocalFutureObj<'a, ()>) -> Result<(), SpawnError> {
self.push(future_obj);
Ok(())
}
}

// FuturesUnordered is implemented using two linked lists. One which links all
// futures managed by a `FuturesUnordered` and one that tracks futures that have
// been scheduled for polling. The first linked list allows for thread safe
Expand Down
6 changes: 4 additions & 2 deletions futures-util/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
#[doc(no_inline)]
pub use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

pub use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj};
pub use futures_task::{
BoundLocalSpawn, FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError, UnsafeFutureObj,
};

pub use futures_task::noop_waker;
pub use futures_task::noop_waker_ref;
Expand All @@ -37,4 +39,4 @@ pub use futures_task::{waker_ref, WakerRef};
pub use futures_core::task::__internal::AtomicWaker;

mod spawn;
pub use self::spawn::{LocalSpawnExt, SpawnExt};
pub use self::spawn::{BoundLocalSpawnExt, LocalSpawnExt, SpawnExt};
Loading