Skip to content

Commit

Permalink
Handle notification channels in task.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
bblum committed Jul 25, 2012
1 parent 1e241b5 commit d9e8efc
Showing 1 changed file with 34 additions and 17 deletions.
51 changes: 34 additions & 17 deletions src/libcore/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,18 +612,23 @@ class taskgroup {
// Lists of tasks who will kill us if they fail, but whom we won't kill.
let parents: option<(taskgroup_arc,uint)>;
let is_main: bool;
let notifier: option<auto_notify>;
new(me: *rust_task, -tasks: taskgroup_arc, my_pos: uint,
-parents: option<(taskgroup_arc,uint)>, is_main: bool) {
self.me = me;
self.tasks = tasks;
self.my_pos = my_pos;
self.parents = parents;
self.is_main = is_main;
-parents: option<(taskgroup_arc,uint)>, is_main: bool,
-notifier: option<auto_notify>) {
self.me = me;
self.tasks = tasks;
self.my_pos = my_pos;
self.parents = parents;
self.is_main = is_main;
self.notifier = notifier;
self.notifier.iter(|x| { x.failed = false; });
}
// Runs on task exit.
drop {
// If we are failing, the whole taskgroup needs to die.
if rustrt::rust_task_is_unwinding(self.me) {
self.notifier.iter(|x| { x.failed = true; });
// Take everybody down with us.
kill_taskgroup(self.tasks, self.me, self.my_pos, self.is_main);
} else {
Expand All @@ -641,6 +646,19 @@ class taskgroup {
}
}

class auto_notify {
let notify_chan: comm::chan<notification>;
let mut failed: bool;
new(chan: comm::chan<notification>) {
self.notify_chan = chan;
self.failed = true; // Un-set above when taskgroup successfully made.
}
drop {
let result = if self.failed { failure } else { success };
comm::send(self.notify_chan, exit(get_task(), result));
}
}

fn enlist_in_taskgroup(group_arc: taskgroup_arc,
me: *rust_task) -> option<uint> {
do group_arc.with |_c, state| {
Expand Down Expand Up @@ -750,7 +768,7 @@ fn share_spawner_taskgroup(linked: bool)
let tasks = arc::exclusive(some((dvec::from_elem(some(me)),
dvec::dvec())));
// Main group has no parent group.
let group = @taskgroup(me, tasks.clone(), 0, none, true);
let group = @taskgroup(me, tasks.clone(), 0, none, true, none);
unsafe { local_set(me, taskgroup_key(), group); }
// Tell child task it's also in the main group.
// Whether or not it wanted our parent group, we haven't got one.
Expand Down Expand Up @@ -796,15 +814,11 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
// Getting killed after here would leak the task.

let child_wrapper =
make_child_wrapper(new_task, child_tg, parent_tg, is_main, f);
make_child_wrapper(new_task, child_tg, parent_tg, is_main,
opts.notify_chan, f);
let fptr = ptr::addr_of(child_wrapper);
let closure: *rust_closure = unsafe::reinterpret_cast(fptr);

do option::iter(opts.notify_chan) |c| {
// FIXME (#1087): Would like to do notification in Rust
rustrt::rust_task_config_notify(new_task, c);
}

// Getting killed between these two calls would free the child's
// closure. (Reordering them wouldn't help - then getting killed
// between them would leak.)
Expand All @@ -829,6 +843,7 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
// }
fn make_child_wrapper(child: *rust_task, -child_tg: taskgroup_arc,
-parent_tg: option<taskgroup_arc>, is_main: bool,
notify_chan: option<comm::chan<notification>>,
-f: fn~()) -> fn~() {
let child_tg_ptr = ~mut some((child_tg, parent_tg));
fn~() {
Expand All @@ -837,6 +852,11 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
*child_tg_ptr <-> tg_data_opt;
let (child_tg, parent_tg) = option::unwrap(tg_data_opt);
// Child task runs this code.

// Even if the below code fails to kick the child off, we must
// send something on the notify channel.
let notifier = notify_chan.map(|c| auto_notify(c));

// Set up membership in taskgroup. If this returns none, some
// task was already failing, so don't bother doing anything.
alt enlist_in_taskgroup(child_tg, child) {
Expand All @@ -862,7 +882,7 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
};
if enlist_ok {
let group = @taskgroup(child, child_tg, my_pos,
pg, is_main);
pg, is_main, notifier);
unsafe { local_set(child, taskgroup_key(), group); }
// Run the child's body.
f();
Expand Down Expand Up @@ -1129,9 +1149,6 @@ extern mod rustrt {
fn new_task() -> *rust_task;
fn rust_new_task_in_sched(id: sched_id) -> *rust_task;

fn rust_task_config_notify(
task: *rust_task, &&chan: comm::chan<notification>);

fn start_task(task: *rust_task, closure: *rust_closure);

fn rust_task_is_unwinding(task: *rust_task) -> bool;
Expand Down

0 comments on commit d9e8efc

Please sign in to comment.