Skip to content

Commit

Permalink
Linked failure in task.rs instead of rust_task.cpp (#1868, #1189)
Browse files Browse the repository at this point in the history
  • Loading branch information
bblum committed Jul 11, 2012
1 parent 152f2ea commit 5d6d3d0
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 67 deletions.
218 changes: 183 additions & 35 deletions src/libcore/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import result::result;
import dvec::extensions;
import dvec_iter::extensions;
import arc::methods;

export task;
export task_result;
Expand Down Expand Up @@ -563,7 +564,11 @@ unsafe fn unkillable(f: fn()) {
}


/* Internal */
/****************************************************************************
* Internal
****************************************************************************/

/* spawning */

type sched_id = int;
type task_id = int;
Expand All @@ -573,42 +578,185 @@ type task_id = int;
type rust_task = libc::c_void;
type rust_closure = libc::c_void;

fn spawn_raw(opts: task_opts, +f: fn~()) {
/* linked failure */

type taskgroup_arc = arc::exclusive<option<dvec::dvec<option<*rust_task>>>>;

class taskgroup {
// FIXME (#2816): Change dvec to an O(1) data structure (and change 'me'
// to a node-handle or somesuch when so done (or remove the field entirely
// if keyed by *rust_task)).
let tasks: taskgroup_arc; // 'none' means the group already failed.
let me: *rust_task;
let my_pos: uint;
// let parent_group: taskgroup_arc; // TODO(bblum)
// TODO XXX bblum: add a list of empty slots to get runtime back
let mut failed: bool;
new(-tasks: taskgroup_arc, me: *rust_task, my_pos: uint) {
self.tasks = tasks; self.me = me; self.my_pos = my_pos;
self.failed = true; // This will get un-set on successful exit.
}
// Runs on task exit.
drop {
if self.failed {
// Take everybody down with us.
kill_taskgroup(self.tasks, self.me, self.my_pos);
} else {
// Remove ourselves from the group.
leave_taskgroup(self.tasks, self.me, self.my_pos);
}
}
}

let mut f = if opts.supervise {
f
} else {
// FIXME (#1868, #1789): The runtime supervision API is weird here
// because it was designed to let the child unsupervise itself,
// when what we actually want is for parents to unsupervise new
// children.
fn~() {
rustrt::unsupervise();
f();
fn taskgroup_key(+_group: @taskgroup) { } // For TLS

fn enlist_in_taskgroup(group_arc: taskgroup_arc,
me: *rust_task) -> option<uint> {
do group_arc.with |_c, state| {
// If 'none', the group was failing. Can't enlist.
do state.map |tasks| {
// Try to find an empty slot.
alt tasks.position(|x| x == none) {
some(empty_index) {
tasks.set_elt(empty_index, some(me));
empty_index
}
none {
tasks.push(some(me));
tasks.len() - 1
}
}
}
};
}
}

unsafe {
let fptr = ptr::addr_of(f);
let closure: *rust_closure = unsafe::reinterpret_cast(fptr);
// NB: Runs in destructor/post-exit context. Can't 'fail'.
fn leave_taskgroup(group_arc: taskgroup_arc, me: *rust_task, index: uint) {
do group_arc.with |_c, state| {
// If 'none', already failing and we've already gotten a kill signal.
do state.map |tasks| {
assert tasks[index] == some(me);
tasks.set_elt(index, none);
};
};
}

let new_task = alt opts.sched {
none {
rustrt::new_task()
}
some(sched_opts) {
new_task_in_new_sched(sched_opts)
}
// NB: Runs in destructor/post-exit context. Can't 'fail'.
fn kill_taskgroup(group_arc: taskgroup_arc, me: *rust_task, index: uint) {
// NB: We could do the killing iteration outside of the group arc, by
// having "let mut newstate" here, swapping inside, and iterating after.
// But that would let other exiting tasks fall-through and exit while we
// were trying to kill them, causing potential use-after-free. A task's
// presence in the arc guarantees it's alive only while we hold the lock,
// so if we're failing, all concurrently exiting tasks must wait for us.
// To do it differently, we'd have to use the runtime's task refcounting.
do group_arc.with |_c, state| {
let mut newstate = none;
*state <-> newstate;
// Might already be none, if somebody is failing simultaneously.
// That's ok; only one task needs to do the dirty work. (Might also
// see 'none' if somebody already failed and we got a kill signal.)
do newstate.map |tasks| {
// First remove ourself (killing ourself won't do much good). This
// is duplicated here to avoid having to lock twice.
assert tasks[index] == some(me);
tasks.set_elt(index, none);
// Now send takedown signal.
for tasks.each |entry| {
do entry.map |task| {
rustrt::rust_task_kill_other(task);
};
}
};
assert !new_task.is_null();
};
}

fn share_parent_taskgroup() -> taskgroup_arc {
let me = rustrt::rust_get_task();
alt unsafe { local_get(me, taskgroup_key) } {
some(group) {
group.tasks.clone()
}
none {
/* Main task, doing first spawn ever. */
let tasks = arc::exclusive(some(dvec::from_elem(some(me))));
let group = @taskgroup(tasks.clone(), me, 0);
unsafe { local_set(me, taskgroup_key, group); }
tasks
}
}
}

fn spawn_raw(opts: task_opts, +f: fn~()) {
// Decide whether the child needs to be in a new linked failure group.
let child_tg: taskgroup_arc = if opts.supervise {
share_parent_taskgroup()
} else {
arc::exclusive(some(dvec::from_elem(none)))
};

do option::iter(opts.notify_chan) |c| {
// FIXME (#1087): Would like to do notification in Rust
rustrt::rust_task_config_notify(new_task, c);
unsafe {
let child_data_ptr = ~mut some((child_tg, f));
// Being killed with the unsafe task/closure pointers would leak them.
do unkillable {
// Agh. Get move-mode items into the closure. FIXME (#2829)
let mut child_data = none;
*child_data_ptr <-> child_data;
let (child_tg, f) = option::unwrap(child_data);
// Create child task.
let new_task = alt opts.sched {
none { rustrt::new_task() }
some(sched_opts) { new_task_in_new_sched(sched_opts) }
};
assert !new_task.is_null();
// Getting killed after here would leak the task.

let child_wrapper =
make_child_wrapper(new_task, child_tg, opts.supervise, f);
let fptr = ptr::addr_of(child_wrapper);
let closure: *rust_closure = unsafe::reinterpret_cast(fptr);

do option::iter(opts.notify_chan) |c| {
// FIXME (#1087): Would like to do notification in Rust
rustrt::rust_task_config_notify(new_task, c);
}

// Getting killed between these two calls would free the child's
// closure. (Reordering them wouldn't help - then getting killed
// between them would leak.)
rustrt::start_task(new_task, closure);
unsafe::forget(child_wrapper);
}
}

rustrt::start_task(new_task, closure);
unsafe::forget(f);
fn make_child_wrapper(child_task: *rust_task, -child_tg: taskgroup_arc,
supervise: bool, -f: fn~()) -> fn~() {
let child_tg_ptr = ~mut some(child_tg);
fn~() {
// Agh. Get move-mode items into the closure. FIXME (#2829)
let mut child_tg_opt = none;
*child_tg_ptr <-> child_tg_opt;
let child_tg = option::unwrap(child_tg_opt);
// Child task runs this code.
if !supervise {
// FIXME (#1868, #1789) take this out later
rustrt::unsupervise();
}
// Set up membership in taskgroup. If this returns none, the
// parent was already failing, so don't bother doing anything.
alt enlist_in_taskgroup(child_tg, child_task) {
some(my_index) {
let group = @taskgroup(child_tg, child_task, my_index);
unsafe { local_set(child_task, taskgroup_key, group); }
// Run the child's body.
f();
// Report successful exit. (TLS cleanup code will tear
// down the group.)
group.failed = false;
}
none { }
}
}
}

fn new_task_in_new_sched(opts: sched_opts) -> *rust_task {
Expand Down Expand Up @@ -640,7 +788,6 @@ fn spawn_raw(opts: task_opts, +f: fn~()) {
};
rustrt::rust_new_task_in_sched(sched_id)
}

}

/****************************************************************************
Expand Down Expand Up @@ -760,7 +907,7 @@ unsafe fn local_get<T>(task: *rust_task,
local_get_helper(task, key, false)
}

unsafe fn local_set<T>(task: *rust_task, key: local_data_key<T>, -data: @T) {
unsafe fn local_set<T>(task: *rust_task, key: local_data_key<T>, +data: @T) {
let map = get_task_local_map(task);
// Store key+data as *voids. Data is invisibly referenced once; key isn't.
let keyval = key_to_key_value(key);
Expand Down Expand Up @@ -822,7 +969,7 @@ unsafe fn local_data_get<T>(key: local_data_key<T>) -> option<@T> {
* Store a value in task-local data. If this key already has a value,
* that value is overwritten (and its destructor is run).
*/
unsafe fn local_data_set<T>(key: local_data_key<T>, -data: @T) {
unsafe fn local_data_set<T>(key: local_data_key<T>, +data: @T) {
local_set(rustrt::rust_get_task(), key, data)
}
/**
Expand Down Expand Up @@ -853,11 +1000,12 @@ extern mod rustrt {

fn start_task(task: *rust_task, closure: *rust_closure);

fn rust_task_is_unwinding(rt: *rust_task) -> bool;
fn rust_task_is_unwinding(task: *rust_task) -> bool;
fn unsupervise();
fn rust_osmain_sched_id() -> sched_id;
fn rust_task_inhibit_kill();
fn rust_task_allow_kill();
fn rust_task_kill_other(task: *rust_task);

#[rust_stack]
fn rust_get_task_local_data(task: *rust_task) -> *libc::c_void;
Expand Down Expand Up @@ -1232,7 +1380,7 @@ fn test_unkillable() {
let ch = po.chan();

// We want to do this after failing
do spawn {
do spawn_raw({ supervise: false with default_task_opts() }) {
for iter::repeat(10u) { yield() }
ch.send(());
}
Expand Down Expand Up @@ -1269,7 +1417,7 @@ fn test_unkillable_nested() {
let ch = po.chan();

// We want to do this after failing
do spawn {
do spawn_raw({ supervise: false with default_task_opts() }) {
for iter::repeat(10u) { yield() }
ch.send(());
}
Expand Down
35 changes: 5 additions & 30 deletions src/rt/rust_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "rust_env.h"
#include "rust_port.h"

// TODO(bblum): get rid of supervisors

// Tasks
rust_task::rust_task(rust_sched_loop *sched_loop, rust_task_state state,
rust_task *spawner, const char *name,
Expand Down Expand Up @@ -146,13 +148,9 @@ cleanup_task(cleanup_args *args) {

task->notify(!threw_exception);

if (threw_exception) {
#ifndef __WIN32__
task->conclude_failure();
#else
assert(false && "Shouldn't happen");
#ifdef __WIN32__
assert(!threw_exception && "No exception-handling yet on windows builds");
#endif
}
}

extern "C" CDECL void upcall_exchange_free(void *ptr);
Expand Down Expand Up @@ -262,10 +260,7 @@ void
rust_task::kill() {
scoped_lock with(kill_lock);

if (dead()) {
// Task is already dead, can't kill what's already dead.
fail_parent();
}
// XXX: bblum: kill/kill race

// Note the distinction here: kill() is when you're in an upcall
// from task A and want to force-fail task B, you do B->kill().
Expand Down Expand Up @@ -314,31 +309,11 @@ rust_task::begin_failure(char const *expr, char const *file, size_t line) {
throw this;
#else
die();
conclude_failure();
// FIXME (#908): Need unwinding on windows. This will end up aborting
sched_loop->fail();
#endif
}

void
rust_task::conclude_failure() {
fail_parent();
}

void
rust_task::fail_parent() {
scoped_lock with(supervisor_lock);
if (supervisor) {
DLOG(sched_loop, task,
"task %s @0x%" PRIxPTR
" propagating failure to supervisor %s @0x%" PRIxPTR,
name, this, supervisor->name, supervisor);
supervisor->kill();
}
if (NULL == supervisor && propagate_failure)
sched_loop->fail();
}

void
rust_task::unsupervise()
{
Expand Down
2 changes: 0 additions & 2 deletions src/rt/rust_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,6 @@ rust_task : public kernel_owned<rust_task>
// Fail self, assuming caller-on-stack is this task.
void fail();
void fail(char const *expr, char const *file, size_t line);
void conclude_failure();
void fail_parent();

// Disconnect from our supervisor.
void unsupervise();
Expand Down

0 comments on commit 5d6d3d0

Please sign in to comment.