Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move task count bookeeping out of libstd #11212

Merged
merged 1 commit into from
Jan 1, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 77 additions & 15 deletions src/libgreen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@
// NB this does *not* include globs, please keep it that way.
#[feature(macro_rules)];

// Allow check-stage0-green for now
#[cfg(test, stage0)] extern mod green;

use std::os;
use std::rt::crate_map;
use std::rt::local::Local;
use std::rt::rtio;
use std::rt::task::Task;
use std::rt::thread::Thread;
use std::rt;
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
use std::sync::deque;
use std::task::TaskOpts;
use std::util;
use std::vec;
use std::sync::arc::UnsafeArc;

use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
use sleeper_list::SleeperList;
Expand Down Expand Up @@ -118,14 +120,6 @@ pub fn run(main: proc()) -> int {
os::set_exit_status(rt::DEFAULT_ERROR_CODE);
}

// Once the main task has exited and we've set our exit code, wait for all
// spawned sub-tasks to finish running. This is done to allow all schedulers
// to remain active while there are still tasks possibly running.
unsafe {
let mut task = Local::borrow(None::<Task>);
task.get().wait_for_other_tasks();
}

// Now that we're sure all tasks are dead, shut down the pool of schedulers,
// waiting for them all to return.
pool.shutdown();
Expand Down Expand Up @@ -164,6 +158,17 @@ pub struct SchedPool {
priv deque_pool: deque::BufferPool<~task::GreenTask>,
priv sleepers: SleeperList,
priv factory: fn() -> ~rtio::EventLoop,
priv task_state: TaskState,
priv tasks_done: Port<()>,
}

/// This is an internal state shared among a pool of schedulers. This is used to
/// keep track of how many tasks are currently running in the pool and then
/// sending on a channel once the entire pool has been drained of all tasks.
#[deriving(Clone)]
struct TaskState {
cnt: UnsafeArc<AtomicUint>,
done: SharedChan<()>,
}

impl SchedPool {
Expand All @@ -182,6 +187,7 @@ impl SchedPool {
assert!(nscheds > 0);

// The pool of schedulers that will be returned from this function
let (p, state) = TaskState::new();
let mut pool = SchedPool {
threads: ~[],
handles: ~[],
Expand All @@ -192,6 +198,8 @@ impl SchedPool {
deque_pool: deque::BufferPool::new(),
next_friend: 0,
factory: factory,
task_state: state,
tasks_done: p,
};

// Create a work queue for each scheduler, ntimes. Create an extra
Expand All @@ -210,21 +218,30 @@ impl SchedPool {
(pool.factory)(),
worker,
pool.stealers.clone(),
pool.sleepers.clone());
pool.sleepers.clone(),
pool.task_state.clone());
pool.handles.push(sched.make_handle());
let sched = sched;
pool.threads.push(do Thread::start {
sched.bootstrap();
});
pool.threads.push(do Thread::start { sched.bootstrap(); });
}

return pool;
}

/// Creates a new task configured to run inside of this pool of schedulers.
/// This is useful to create a task which can then be sent to a specific
/// scheduler created by `spawn_sched` (and possibly pin it to that
/// scheduler).
pub fn task(&mut self, opts: TaskOpts, f: proc()) -> ~GreenTask {
GreenTask::configure(&mut self.stack_pool, opts, f)
}

/// Spawns a new task into this pool of schedulers, using the specified
/// options to configure the new task which is spawned.
///
/// New tasks are spawned in a round-robin fashion to the schedulers in this
/// pool, but tasks can certainly migrate among schedulers once they're in
/// the pool.
pub fn spawn(&mut self, opts: TaskOpts, f: proc()) {
let task = self.task(opts, f);

Expand Down Expand Up @@ -262,7 +279,8 @@ impl SchedPool {
(self.factory)(),
worker,
self.stealers.clone(),
self.sleepers.clone());
self.sleepers.clone(),
self.task_state.clone());
let ret = sched.make_handle();
self.handles.push(sched.make_handle());
let sched = sched;
Expand All @@ -271,9 +289,28 @@ impl SchedPool {
return ret;
}

/// Consumes the pool of schedulers, waiting for all tasks to exit and all
/// schedulers to shut down.
///
/// This function is required to be called in order to drop a pool of
/// schedulers, it is considered an error to drop a pool without calling
/// this method.
///
/// This only waits for all tasks in *this pool* of schedulers to exit, any
/// native tasks or extern pools will not be waited on
pub fn shutdown(mut self) {
self.stealers = ~[];

// Wait for everyone to exit. We may have reached a 0-task count
// multiple times in the past, meaning there could be several buffered
// messages on the `tasks_done` port. We're guaranteed that after *some*
// message the current task count will be 0, so we just receive in a
// loop until everything is totally dead.
while self.task_state.active() {
self.tasks_done.recv();
}

// Now that everyone's gone, tell everything to shut down.
for mut handle in util::replace(&mut self.handles, ~[]).move_iter() {
handle.send(Shutdown);
}
Expand All @@ -283,6 +320,31 @@ impl SchedPool {
}
}

impl TaskState {
fn new() -> (Port<()>, TaskState) {
let (p, c) = SharedChan::new();
(p, TaskState {
cnt: UnsafeArc::new(AtomicUint::new(0)),
done: c,
})
}

fn increment(&mut self) {
unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); }
}

fn active(&self) -> bool {
unsafe { (*self.cnt.get()).load(SeqCst) != 0 }
}

fn decrement(&mut self) {
let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) };
if prev == 1 {
self.done.send(());
}
}
}

impl Drop for SchedPool {
fn drop(&mut self) {
if self.threads.len() > 0 {
Expand Down
26 changes: 18 additions & 8 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::unstable::mutex::Mutex;
use std::unstable::raw;
use mpsc = std::sync::mpsc_queue;

use TaskState;
use context::Context;
use coroutine::Coroutine;
use sleeper_list::SleeperList;
Expand Down Expand Up @@ -85,6 +86,9 @@ pub struct Scheduler {
/// A flag to tell the scheduler loop it needs to do some stealing
/// in order to introduce randomness as part of a yield
steal_for_yield: bool,
/// Bookeeping for the number of tasks which are currently running around
/// inside this pool of schedulers
task_state: TaskState,

// n.b. currently destructors of an object are run in top-to-bottom in order
// of field declaration. Due to its nature, the pausable idle callback
Expand Down Expand Up @@ -120,11 +124,12 @@ impl Scheduler {
event_loop: ~EventLoop,
work_queue: deque::Worker<~GreenTask>,
work_queues: ~[deque::Stealer<~GreenTask>],
sleeper_list: SleeperList)
sleeper_list: SleeperList,
state: TaskState)
-> Scheduler {

Scheduler::new_special(pool_id, event_loop, work_queue, work_queues,
sleeper_list, true, None)
sleeper_list, true, None, state)

}

Expand All @@ -134,7 +139,8 @@ impl Scheduler {
work_queues: ~[deque::Stealer<~GreenTask>],
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
friend: Option<SchedHandle>,
state: TaskState)
-> Scheduler {

let (consumer, producer) = mpsc::queue(());
Expand All @@ -156,7 +162,8 @@ impl Scheduler {
rng: new_sched_rng(),
idle_callback: None,
yield_check_count: 0,
steal_for_yield: false
steal_for_yield: false,
task_state: state,
};

sched.yield_check_count = reset_yield_check(&mut sched.rng);
Expand Down Expand Up @@ -756,6 +763,7 @@ impl Scheduler {
let _cur = self.change_task_context(cur, stask, |sched, mut dead_task| {
let coroutine = dead_task.coroutine.take_unwrap();
coroutine.recycle(&mut sched.stack_pool);
sched.task_state.decrement();
});
fail!("should never return!");
}
Expand Down Expand Up @@ -955,11 +963,10 @@ mod test {
use std::rt::task::Task;
use std::rt::local::Local;

use {TaskState, PoolConfig, SchedPool};
use basic;
use sched::{TaskFromFriend, PinnedTask};
use task::{GreenTask, HomeSched};
use PoolConfig;
use SchedPool;

fn pool() -> SchedPool {
SchedPool::new(PoolConfig {
Expand Down Expand Up @@ -1078,14 +1085,16 @@ mod test {
let (normal_worker, normal_stealer) = pool.deque();
let (special_worker, special_stealer) = pool.deque();
let queues = ~[normal_stealer, special_stealer];
let (_p, state) = TaskState::new();

// Our normal scheduler
let mut normal_sched = ~Scheduler::new(
1,
basic::event_loop(),
normal_worker,
queues.clone(),
sleepers.clone());
sleepers.clone(),
state.clone());

let normal_handle = normal_sched.make_handle();
let friend_handle = normal_sched.make_handle();
Expand All @@ -1098,7 +1107,8 @@ mod test {
queues.clone(),
sleepers.clone(),
false,
Some(friend_handle));
Some(friend_handle),
state);

let special_handle = special_sched.make_handle();

Expand Down
39 changes: 36 additions & 3 deletions src/libgreen/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,32 @@ use stack::StackPool;
/// The necessary fields needed to keep track of a green task (as opposed to a
/// 1:1 task).
pub struct GreenTask {
/// Coroutine that this task is running on, otherwise known as the register
/// context and the stack that this task owns. This field is optional to
/// relinquish ownership back to a scheduler to recycle stacks at a later
/// date.
coroutine: Option<Coroutine>,

/// Optional handle back into the home sched pool of this task. This field
/// is lazily initialized.
handle: Option<SchedHandle>,

/// Slot for maintaining ownership of a scheduler. If a task is running,
/// this value will be Some(sched) where the task is running on "sched".
sched: Option<~Scheduler>,

/// Temporary ownership slot of a std::rt::task::Task object. This is used
/// to squirrel that libstd task away while we're performing green task
/// operations.
task: Option<~Task>,

/// Dictates whether this is a sched task or a normal green task
task_type: TaskType,

/// Home pool that this task was spawned into. This field is lazily
/// initialized until when the task is initially scheduled, and is used to
/// make sure that tasks are always woken up in the correct pool of
/// schedulers.
pool_id: uint,

// See the comments in the scheduler about why this is necessary
Expand Down Expand Up @@ -147,10 +168,15 @@ impl GreenTask {
// cleanup job after we have re-acquired ownership of the green
// task.
let mut task: ~GreenTask = unsafe { GreenTask::from_uint(ops) };
task.sched.get_mut_ref().run_cleanup_job();
task.pool_id = {
let sched = task.sched.get_mut_ref();
sched.run_cleanup_job();
sched.task_state.increment();
sched.pool_id
};

// Convert our green task to a libstd task and then execute the code
// requeted. This is the "try/catch" block for this green task and
// requested. This is the "try/catch" block for this green task and
// is the wrapper for *all* code run in the task.
let mut start = Some(start);
let task = task.swap().run(|| start.take_unwrap()());
Expand Down Expand Up @@ -350,6 +376,14 @@ impl Runtime for GreenTask {
self.put_task(to_wake);
assert!(self.sched.is_none());

// Optimistically look for a local task, but if one's not available to
// inspect (in order to see if it's in the same sched pool as we are),
// then just use our remote wakeup routine and carry on!
let mut running_task: ~Task = match Local::try_take() {
Some(task) => task,
None => return self.reawaken_remotely()
};

// Waking up a green thread is a bit of a tricky situation. We have no
// guarantee about where the current task is running. The options we
// have for where this current task is running are:
Expand All @@ -368,7 +402,6 @@ impl Runtime for GreenTask {
//
// In case 2 and 3, we need to remotely reawaken ourself in order to be
// transplanted back to the correct scheduler pool.
let mut running_task: ~Task = Local::take();
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
running_green_task.put_task(running_task);
Expand Down
Loading