Skip to content

Commit 7a25cf3

Browse files
committed
auto merge of rust-lang#15944 : alexcrichton/rust/task-dont-die, r=brson
Previously both spawning mechanisms were not resilient to task failures which were initiated from the task spawning infrastructure. Closes rust-lang#15895
2 parents f681420 + 8643a0d commit 7a25cf3

File tree

7 files changed

+111
-16
lines changed

7 files changed

+111
-16
lines changed

src/libgreen/sched.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ impl Scheduler {
219219
let message = stask.sched.get_mut_ref().message_queue.pop();
220220
rtassert!(match message { msgq::Empty => true, _ => false });
221221

222-
stask.task.get_mut_ref().destroyed = true;
222+
stask.task.take().unwrap().drop();
223223
}
224224

225225
// This does not return a scheduler, as the scheduler is placed

src/libgreen/task.rs

+18-3
Original file line numberDiff line numberDiff line change
@@ -442,15 +442,30 @@ impl Runtime for GreenTask {
442442
f: proc():Send) {
443443
self.put_task(cur_task);
444444

445+
// First, set up a bomb which when it goes off will restore the local
446+
// task unless its disarmed. This will allow us to gracefully fail from
447+
// inside of `configure` which allocates a new task.
448+
struct Bomb { inner: Option<Box<GreenTask>> }
449+
impl Drop for Bomb {
450+
fn drop(&mut self) {
451+
let _ = self.inner.take().map(|task| task.put());
452+
}
453+
}
454+
let mut bomb = Bomb { inner: Some(self) };
455+
445456
// Spawns a task into the current scheduler. We allocate the new task's
446457
// stack from the scheduler's stack pool, and then configure it
447458
// accordingly to `opts`. Afterwards we bootstrap it immediately by
448459
// switching to it.
449460
//
450461
// Upon returning, our task is back in TLS and we're good to return.
451-
let mut sched = self.sched.take_unwrap();
452-
let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f);
453-
sched.run_task(self, sibling)
462+
let sibling = {
463+
let sched = bomb.inner.get_mut_ref().sched.get_mut_ref();
464+
GreenTask::configure(&mut sched.stack_pool, opts, f)
465+
};
466+
let mut me = bomb.inner.take().unwrap();
467+
let sched = me.sched.take().unwrap();
468+
sched.run_task(me, sibling)
454469
}
455470

456471
// Local I/O is provided by the scheduler's event loop

src/libnative/task.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
7171
// Note that this increment must happen *before* the spawn in order to
7272
// guarantee that if this task exits it will always end up waiting for the
7373
// spawned task to exit.
74-
bookkeeping::increment();
74+
let token = bookkeeping::increment();
7575

7676
// Spawning a new OS thread guarantees that __morestack will never get
7777
// triggered, but we must manually set up the actual stack bounds once this
@@ -93,7 +93,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
9393
let mut task = task;
9494
task.put_runtime(ops);
9595
drop(task.run(|| { f.take_unwrap()() }).destroy());
96-
bookkeeping::decrement();
96+
drop(token);
9797
})
9898
}
9999

src/librustrt/bookkeeping.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,24 @@
1919
//! decrement() manually.
2020
2121
use core::atomics;
22+
use core::ops::Drop;
2223

2324
use mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
2425

2526
static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
2627
static mut TASK_LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT;
2728

28-
pub fn increment() {
29+
pub struct Token { _private: () }
30+
31+
impl Drop for Token {
32+
fn drop(&mut self) { decrement() }
33+
}
34+
35+
/// Increment the number of live tasks, returning a token which will decrement
36+
/// the count when dropped.
37+
pub fn increment() -> Token {
2938
let _ = unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst) };
39+
Token { _private: () }
3040
}
3141

3242
pub fn decrement() {

src/librustrt/local.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ mod test {
125125
}).join();
126126
}
127127

128-
fn cleanup_task(mut t: Box<Task>) {
129-
t.destroyed = true;
128+
fn cleanup_task(t: Box<Task>) {
129+
t.drop();
130130
}
131131

132132
}

src/librustrt/task.rs

+30-7
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,21 @@ pub struct Task {
100100
pub storage: LocalStorage,
101101
pub unwinder: Unwinder,
102102
pub death: Death,
103-
pub destroyed: bool,
104103
pub name: Option<SendStr>,
105104

105+
state: TaskState,
106106
imp: Option<Box<Runtime + Send>>,
107107
}
108108

109+
// Once a task has entered the `Armed` state it must be destroyed via `drop`,
110+
// and no other method. This state is used to track this transition.
111+
#[deriving(PartialEq)]
112+
enum TaskState {
113+
New,
114+
Armed,
115+
Destroyed,
116+
}
117+
109118
pub struct TaskOpts {
110119
/// Invoke this procedure with the result of the task when it finishes.
111120
pub on_exit: Option<proc(Result): Send>,
@@ -159,7 +168,7 @@ impl Task {
159168
storage: LocalStorage(None),
160169
unwinder: Unwinder::new(),
161170
death: Death::new(),
162-
destroyed: false,
171+
state: New,
163172
name: None,
164173
imp: None,
165174
}
@@ -203,7 +212,7 @@ impl Task {
203212
/// }).destroy();
204213
/// # }
205214
/// ```
206-
pub fn run(self: Box<Task>, f: ||) -> Box<Task> {
215+
pub fn run(mut self: Box<Task>, f: ||) -> Box<Task> {
207216
assert!(!self.is_destroyed(), "cannot re-use a destroyed task");
208217

209218
// First, make sure that no one else is in TLS. This does not allow
@@ -212,6 +221,7 @@ impl Task {
212221
if Local::exists(None::<Task>) {
213222
fail!("cannot run a task recursively inside another");
214223
}
224+
self.state = Armed;
215225
Local::put(self);
216226

217227
// There are two primary reasons that general try/catch is unsafe. The
@@ -333,12 +343,12 @@ impl Task {
333343
// Now that we're done, we remove the task from TLS and flag it for
334344
// destruction.
335345
let mut task: Box<Task> = Local::take();
336-
task.destroyed = true;
346+
task.state = Destroyed;
337347
return task;
338348
}
339349

340350
/// Queries whether this can be destroyed or not.
341-
pub fn is_destroyed(&self) -> bool { self.destroyed }
351+
pub fn is_destroyed(&self) -> bool { self.state == Destroyed }
342352

343353
/// Inserts a runtime object into this task, transferring ownership to the
344354
/// task. It is illegal to replace a previous runtime object in this task
@@ -453,12 +463,20 @@ impl Task {
453463
pub fn can_block(&self) -> bool {
454464
self.imp.get_ref().can_block()
455465
}
466+
467+
/// Consume this task, flagging it as a candidate for destruction.
468+
///
469+
/// This function is required to be invoked to destroy a task. A task
470+
/// destroyed through a normal drop will abort.
471+
pub fn drop(mut self) {
472+
self.state = Destroyed;
473+
}
456474
}
457475

458476
impl Drop for Task {
459477
fn drop(&mut self) {
460478
rtdebug!("called drop for a task: {}", self as *mut Task as uint);
461-
rtassert!(self.destroyed);
479+
rtassert!(self.state != Armed);
462480
}
463481
}
464482

@@ -634,12 +652,17 @@ mod test {
634652
begin_unwind("cause", file!(), line!())
635653
}
636654

655+
#[test]
656+
fn drop_new_task_ok() {
657+
drop(Task::new());
658+
}
659+
637660
// Task blocking tests
638661

639662
#[test]
640663
fn block_and_wake() {
641664
let task = box Task::new();
642665
let mut task = BlockedTask::block(task).wake().unwrap();
643-
task.destroyed = true;
666+
task.drop();
644667
}
645668
}
+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2+
// file at the top-level directory of this distribution and at
3+
// http://rust-lang.org/COPYRIGHT.
4+
//
5+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8+
// option. This file may not be copied, modified, or distributed
9+
// except according to those terms.
10+
11+
// ignore-macos apparently gargantuan mmap requests are ok?
12+
13+
#![feature(phase)]
14+
15+
#[phase(plugin)]
16+
extern crate green;
17+
extern crate native;
18+
19+
use std::task::TaskBuilder;
20+
use native::NativeTaskBuilder;
21+
22+
green_start!(main)
23+
24+
fn main() {
25+
test();
26+
27+
let (tx, rx) = channel();
28+
TaskBuilder::new().native().spawn(proc() {
29+
tx.send(test());
30+
});
31+
rx.recv();
32+
}
33+
34+
#[cfg(not(target_word_size = "64"))]
35+
fn test() {}
36+
37+
#[cfg(target_word_size = "64")]
38+
fn test() {
39+
let (tx, rx) = channel();
40+
spawn(proc() {
41+
TaskBuilder::new().stack_size(1024 * 1024 * 1024 * 64).spawn(proc() {
42+
});
43+
tx.send(());
44+
});
45+
46+
assert!(rx.recv_opt().is_err());
47+
}

0 commit comments

Comments
 (0)