Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sleep trait #665

Merged
merged 12 commits into from
Dec 6, 2017
120 changes: 68 additions & 52 deletions src/executor/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
//! [`CurrentThread::execute_daemon`]: struct.CurrentThread.html#method.execute_daemon

use Async;
use executor::{self, Spawn};
use executor::{self, Spawn, Sleep, Wakeup};
use future::{Future, Executor, ExecuteError, ExecuteErrorKind};
use scheduler;
use task_impl::ThreadNotify;
Expand All @@ -72,7 +72,6 @@ use std::prelude::v1::*;
use std::{fmt, thread};
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use std::sync::Arc;

/// Executes futures on the current thread.
///
Expand Down Expand Up @@ -114,9 +113,9 @@ pub struct Context<'a> {
/// `TaskRunner` will be created during `block_with_init` and will sit on the
/// stack until execution is complete.
#[derive(Debug)]
struct TaskRunner {
struct TaskRunner<T> {
/// Executes futures.
scheduler: Scheduler,
scheduler: Scheduler<T>,
}

struct CurrentRunner {
Expand All @@ -135,7 +134,7 @@ struct CurrentRunner {
push: RefCell<Push>,
}

type Scheduler = scheduler::Scheduler<SpawnedFuture, Arc<ThreadNotify>>;
type Scheduler<T> = scheduler::Scheduler<SpawnedFuture, T>;
type Push = scheduler::Push<SpawnedFuture>;

#[derive(Debug)]
Expand Down Expand Up @@ -202,7 +201,20 @@ impl CurrentThread {
pub fn block_with_init<F, R>(f: F) -> R
where F: FnOnce(&mut Context) -> R
{
TaskRunner::enter(f)
ThreadNotify::with_current(|mut thread_notify| {
TaskRunner::enter(&mut thread_notify, f)
})
}

/// Calls the given closure with a custom sleep strategy.
///
/// This function is the same as `block_with_init` except that it allows
/// customizing the sleep strategy.
pub fn block_with_sleep<S, F, R>(sleep: &mut S, f: F) -> R
where F: FnOnce(&mut Context) -> R,
S: Sleep,
{
TaskRunner::enter(sleep, f)
}

/// Executes a future on the current thread.
Expand Down Expand Up @@ -314,11 +326,15 @@ where F: Future<Item = (), Error = ()> + 'static,
})
}

impl TaskRunner {
impl<T> TaskRunner<T>
where T: Wakeup,
{
/// Return a new `TaskRunner`
fn new(thread_notify: Arc<ThreadNotify>) -> TaskRunner {
fn new(wakeup: T) -> TaskRunner<T> {
let scheduler = scheduler::Scheduler::new(wakeup);

TaskRunner {
scheduler: scheduler::Scheduler::new(thread_notify),
scheduler: scheduler,
}
}

Expand All @@ -343,58 +359,58 @@ impl TaskRunner {
/// state until all non daemon futures complete. When no scheduled futures
/// are ready to be advanced, the thread is blocked using
/// `ThreadNotify::park`.
fn enter<F, R>(f: F) -> R
fn enter<S, F, R>(sleep: &mut S, f: F) -> R
where F: FnOnce(&mut Context) -> R,
S: Sleep<Wakeup = T>,
{
// Create a new task runner that will be used for the duration of `f`.
ThreadNotify::with_current(|thread_notify| {
let mut runner = TaskRunner::new(thread_notify.clone());

CURRENT.with(|current| {
let enter = executor::enter()
.expect("cannot execute `CurrentThread` executor from within \
another executor");

// Enter an execution scope
let mut ctx = Context {
enter: enter,
_p: ::std::marker::PhantomData,
};

// Set the scheduler to the TLS and perform setup work,
// returning a future to execute.
//
// This could possibly suubmit other futures for execution.
let ret = current.enter(|| {
let ret = f(&mut ctx);
let mut runner = TaskRunner::new(sleep.wakeup());

println!("~~~ DONE ENTER ~~~");

// Transfer all pushed futures to the scheduler
current.push_all(&mut runner.scheduler);
CURRENT.with(|current| {
let enter = executor::enter()
.expect("cannot execute `CurrentThread` executor from within \
another executor");

// Enter an execution scope
let mut ctx = Context {
enter: enter,
_p: ::std::marker::PhantomData,
};

ret
});
// Set the scheduler to the TLS and perform setup work,
// returning a future to execute.
//
// This could possibly suubmit other futures for execution.
let ret = current.enter(|| {
let ret = f(&mut ctx);

// Execute the runner.
//
// This function will not return until either
//
// a) All non daemon futures have completed execution
// b) `cancel_all_executing` is called, forcing the executor to
// return.
runner.run(thread_notify, current);
println!("~~~ DONE ENTER ~~~");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to be cleaned out.


// Not technically required, but this makes the fact that `ctx`
// needs to live until this point explicit.
drop(ctx);
// Transfer all pushed futures to the scheduler
current.push_all(&mut runner.scheduler);

ret
})
});

// Execute the runner.
//
// This function will not return until either
//
// a) All non daemon futures have completed execution
// b) `cancel_all_executing` is called, forcing the executor to
// return.
runner.run(sleep, current);

// Not technically required, but this makes the fact that `ctx`
// needs to live until this point explicit.
drop(ctx);

ret
})
}

fn run(&mut self, thread_notify: &Arc<ThreadNotify>, current: &CurrentRunner) {
fn run<S>(&mut self, sleep: &mut S, current: &CurrentRunner)
where S: Sleep<Wakeup = T>,
{
use scheduler::Tick;

while current.is_running() {
Expand Down Expand Up @@ -448,7 +464,7 @@ impl TaskRunner {

// Block the current thread until a future managed by the scheduler
// receives a readiness notification.
thread_notify.park();
sleep.sleep();
}
Tick::Inconsistent => {
// Yield the thread and loop
Expand Down Expand Up @@ -495,7 +511,7 @@ impl CurrentRunner {
f()
}

fn push_all(&self, dst: &mut Scheduler) {
fn push_all<T: Wakeup>(&self, dst: &mut Scheduler<T>) {
dst.push_all(&mut *self.push.borrow_mut());
}

Expand Down
6 changes: 3 additions & 3 deletions src/task_impl/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,11 @@ impl Notify for ThreadNotify {
}
}

impl ::executor::Sleep for Arc<ThreadNotify> {
type Wakeup = Self;
impl<'a> ::executor::Sleep for &'a Arc<ThreadNotify> {
type Wakeup = Arc<ThreadNotify>;

fn wakeup(&self) -> Self::Wakeup {
self.clone()
(*self).clone()
}

fn sleep(&mut self) {
Expand Down