Skip to content

Commit

Permalink
rt: Allow concurrent block_on's with basic_scheduler (#2804)
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco authored Sep 23, 2020
1 parent 0f70530 commit f25f12d
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 59 deletions.
192 changes: 154 additions & 38 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
use crate::future::poll_fn;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime;
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
use crate::sync::Notify;
use crate::util::linked_list::{Link, LinkedList};
use crate::util::{waker_ref, Wake};
use crate::util::{waker_ref, Wake, WakerRef};

use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::task::Poll::Ready;
use std::sync::{Arc, PoisonError};
use std::task::Poll::{Pending, Ready};
use std::time::Duration;

/// Executes tasks on the current thread
pub(crate) struct BasicScheduler<P>
where
P: Park,
{
pub(crate) struct BasicScheduler<P: Park> {
/// Inner state guarded by a mutex that is shared
/// between all `block_on` calls.
inner: Mutex<Option<Inner<P>>>,

/// Notifier for waking up other threads to steal the
/// parker.
notify: Notify,

/// Sendable task spawner
spawner: Spawner,
}

/// The inner scheduler that owns the task queue and the main parker P.
struct Inner<P: Park> {
/// Scheduler run queue
///
/// When the scheduler is executed, the queue is removed from `self` and
Expand Down Expand Up @@ -59,7 +72,7 @@ struct Shared {
unpark: Box<dyn Unpark>,
}

/// Thread-local context
/// Thread-local context.
struct Context {
/// Shared scheduler state
shared: Arc<Shared>,
Expand All @@ -68,38 +81,43 @@ struct Context {
tasks: RefCell<Tasks>,
}

/// Initial queue capacity
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;

/// How often ot check the remote queue first
/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;

// Tracks the current BasicScheduler
// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);

impl<P> BasicScheduler<P>
where
P: Park,
{
impl<P: Park> BasicScheduler<P> {
pub(crate) fn new(park: P) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());

BasicScheduler {
let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
}),
};

let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
owned: LinkedList::new(),
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
spawner: Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
}),
},
spawner: spawner.clone(),
tick: 0,
park,
}));

BasicScheduler {
inner,
notify: Notify::new(),
spawner,
}
}

Expand All @@ -108,7 +126,6 @@ where
}

/// Spawns a future onto the thread pool
#[allow(dead_code)]
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
Expand All @@ -117,13 +134,57 @@ where
self.spawner.spawn(future)
}

pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
pin!(future);

// Attempt to steal the dedicated parker and block_on the future if we can there,
// othwerwise, lets select on a notification that the parker is available
// or the future is complete.
loop {
if let Some(inner) = &mut self.take_inner() {
return inner.block_on(future);
} else {
let mut enter = crate::runtime::enter(false);

let notified = self.notify.notified();
pin!(notified);

if let Some(out) = enter
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}

if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}

Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}

fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
let inner = self.inner.lock().unwrap().take()?;

Some(InnerGuard {
inner: Some(inner),
basic_scheduler: &self,
})
}
}

impl<P: Park> Inner<P> {
/// Block on the future provided and drive the runtime's driver.
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
enter(self, |scheduler, context| {
let _enter = runtime::enter(false);
let waker = waker_ref(&scheduler.spawner.shared);
let _enter = crate::runtime::enter(false);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);

pin!(future);
Expand Down Expand Up @@ -178,16 +239,16 @@ where

/// Enter the scheduler context. This sets the queue and other necessary
/// scheduler state in the thread-local
fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
where
F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
F: FnOnce(&mut Inner<P>, &Context) -> R,
P: Park,
{
// Ensures the run queue is placed back in the `BasicScheduler` instance
// once `block_on` returns.`
struct Guard<'a, P: Park> {
context: Option<Context>,
scheduler: &'a mut BasicScheduler<P>,
scheduler: &'a mut Inner<P>,
}

impl<P: Park> Drop for Guard<'_, P> {
Expand All @@ -214,12 +275,23 @@ where
CURRENT.set(context, || f(scheduler, context))
}

impl<P> Drop for BasicScheduler<P>
where
P: Park,
{
impl<P: Park> Drop for BasicScheduler<P> {
fn drop(&mut self) {
enter(self, |scheduler, context| {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.

let mut inner = match self
.inner
.lock()
.unwrap_or_else(PoisonError::into_inner)
.take()
{
Some(inner) => inner,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
};

enter(&mut inner, |scheduler, context| {
// Loop required here to ensure borrow is dropped between iterations
#[allow(clippy::while_let_loop)]
loop {
Expand Down Expand Up @@ -269,6 +341,10 @@ impl Spawner {
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
self.shared.queue.lock().unwrap().pop_front()
}

fn waker_ref(&self) -> WakerRef<'_> {
waker_ref(&self.shared)
}
}

impl fmt::Debug for Spawner {
Expand Down Expand Up @@ -325,3 +401,43 @@ impl Wake for Shared {
arc_self.unpark.unpark();
}
}

// ===== InnerGuard =====

/// Used to ensure we always place the Inner value
/// back into its slot in `BasicScheduler`, even if the
/// future panics.
struct InnerGuard<'a, P: Park> {
inner: Option<Inner<P>>,
basic_scheduler: &'a BasicScheduler<P>,
}

impl<P: Park> InnerGuard<'_, P> {
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
// The only time inner gets set to `None` is if we have dropped
// already so this unwrap is safe.
self.inner.as_mut().unwrap().block_on(future)
}
}

impl<P: Park> Drop for InnerGuard<'_, P> {
fn drop(&mut self) {
if let Some(scheduler) = self.inner.take() {
// We can ignore the poison error here since we are
// just replacing the state.
let mut lock = self
.basic_scheduler
.inner
.lock()
.unwrap_or_else(PoisonError::into_inner);

// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
lock.replace(scheduler);

// Wake up other possible threads that could steal
// the dedicated parker P.
self.basic_scheduler.notify.notify_one()
}
}
}
2 changes: 1 addition & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ cfg_rt_core! {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Basic(Mutex::new(Some(scheduler))),
kind: Kind::Basic(scheduler),
handle: Handle {
spawner,
io_handle: resources.io_handle,
Expand Down
23 changes: 3 additions & 20 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ enum Kind {

/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
Basic(Mutex<Option<BasicScheduler<driver::Driver>>>),
Basic(BasicScheduler<driver::Driver>),

/// Execute tasks across multiple threads.
#[cfg(feature = "rt-threaded")]
Expand Down Expand Up @@ -401,7 +401,7 @@ impl Runtime {
Kind::Shell(_) => panic!("task execution disabled"),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.spawn(future),
Kind::Basic(_exec) => self.handle.spawner.spawn(future),
Kind::Basic(exec) => exec.spawn(future),
}
}

Expand Down Expand Up @@ -461,24 +461,7 @@ impl Runtime {
}
}
#[cfg(feature = "rt-core")]
Kind::Basic(exec) => {
// TODO(lucio): clean this up and move this impl into
// `basic_scheduler.rs`, this is hacky and bad but will work for
// now.
let exec_temp = {
let mut lock = exec.lock().unwrap();
lock.take()
};

if let Some(mut exec_temp) = exec_temp {
let res = exec_temp.block_on(future);
exec.lock().unwrap().replace(exec_temp);
res
} else {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).unwrap()
}
}
Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.block_on(future),
})
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,13 @@ cfg_sync! {
pub mod watch;
}

cfg_not_sync! {
cfg_rt_core! {
mod notify;
pub(crate) use notify::Notify;
}
}

cfg_not_sync! {
cfg_atomic_waker_impl! {
mod task;
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
// Allow `unreachable_pub` warnings when sync is not enabled
// due to the usage of `Notify` within the `rt-core` feature set.
// When this module is compiled with `sync` enabled we will warn on
// this lint. When `rt-core` is enabled we use `pub(crate)` which
// triggers this warning but it is safe to ignore in this case.
#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]

use crate::loom::sync::atomic::AtomicU8;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ mod rand;
mod wake;
pub(crate) use wake::{waker_ref, Wake};

cfg_rt_core! {
pub(crate) use wake::WakerRef;
}

cfg_rt_threaded! {
pub(crate) use rand::FastRand;

Expand Down
Loading

0 comments on commit f25f12d

Please sign in to comment.