Skip to content

Remove old task apis, some cleanup #8139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion src/libextra/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ mod tests {
fn test_sem_runtime_friendly_blocking() {
// Force the runtime to schedule two threads on the same sched_loop.
// When one blocks, it should schedule the other one.
do task::spawn_sched(task::ManualThreads(1)) {
do task::spawn_sched(task::SingleThreaded) {
let s = ~Semaphore::new(1);
let s2 = ~s.clone();
let (p,c) = comm::stream();
Expand Down
21 changes: 14 additions & 7 deletions src/libstd/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,10 +674,11 @@ mod test {
do run_in_newsched_task {
let (port, chan) = oneshot::<int>();
let port_cell = Cell::new(port);
let _thread = do spawntask_thread {
let thread = do spawntask_thread {
let _p = port_cell.take();
};
let _chan = chan;
thread.join();
}
}
}
Expand All @@ -689,13 +690,15 @@ mod test {
let (port, chan) = oneshot::<int>();
let chan_cell = Cell::new(chan);
let port_cell = Cell::new(port);
let _thread1 = do spawntask_thread {
let thread1 = do spawntask_thread {
let _p = port_cell.take();
};
let _thread2 = do spawntask_thread {
let thread2 = do spawntask_thread {
let c = chan_cell.take();
c.send(1);
};
thread1.join();
thread2.join();
}
}
}
Expand All @@ -707,19 +710,21 @@ mod test {
let (port, chan) = oneshot::<int>();
let chan_cell = Cell::new(chan);
let port_cell = Cell::new(port);
let _thread1 = do spawntask_thread {
let thread1 = do spawntask_thread {
let port_cell = Cell::new(port_cell.take());
let res = do spawntask_try {
port_cell.take().recv();
};
assert!(res.is_err());
};
let _thread2 = do spawntask_thread {
let thread2 = do spawntask_thread {
let chan_cell = Cell::new(chan_cell.take());
do spawntask {
chan_cell.take();
}
};
thread1.join();
thread2.join();
}
}
}
Expand All @@ -731,12 +736,14 @@ mod test {
let (port, chan) = oneshot::<~int>();
let chan_cell = Cell::new(chan);
let port_cell = Cell::new(port);
let _thread1 = do spawntask_thread {
let thread1 = do spawntask_thread {
chan_cell.take().send(~10);
};
let _thread2 = do spawntask_thread {
let thread2 = do spawntask_thread {
assert!(port_cell.take().recv() == ~10);
};
thread1.join();
thread2.join();
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/libstd/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ fn run_(main: ~fn(), use_main_sched: bool) -> int {
}

// Wait for schedulers
{ let _threads = threads; }
for threads.consume_iter().advance() |thread| {
thread.join();
}

// Return the exit code
unsafe {
Expand Down
13 changes: 7 additions & 6 deletions src/libstd/rt/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,10 +901,8 @@ mod test {
sched.run();
};

// wait for the end
let _thread1 = normal_thread;
let _thread2 = special_thread;

normal_thread.join();
special_thread.join();
}
}

Expand Down Expand Up @@ -1074,16 +1072,19 @@ mod test {
sched2.enqueue_task(task2);

let sched1_cell = Cell::new(sched1);
let _thread1 = do Thread::start {
let thread1 = do Thread::start {
let sched1 = sched1_cell.take();
sched1.run();
};

let sched2_cell = Cell::new(sched2);
let _thread2 = do Thread::start {
let thread2 = do Thread::start {
let sched2 = sched2_cell.take();
sched2.run();
};

thread1.join();
thread2.join();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/libstd/rt/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl Drop for Task {
impl Coroutine {

pub fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
static MIN_STACK_SIZE: uint = 100000; // XXX: Too much stack
static MIN_STACK_SIZE: uint = 2000000; // XXX: Too much stack

let start = Coroutine::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
Expand Down
4 changes: 3 additions & 1 deletion src/libstd/rt/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ pub fn run_in_mt_newsched_task(f: ~fn()) {
}

// Wait for schedulers
let _threads = threads;
for threads.consume_iter().advance() |thread| {
thread.join();
}
}

}
Expand Down
19 changes: 15 additions & 4 deletions src/libstd/rt/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ type raw_thread = libc::c_void;

pub struct Thread {
main: ~fn(),
raw_thread: *raw_thread
raw_thread: *raw_thread,
joined: bool
}

impl Thread {
Expand All @@ -27,18 +28,28 @@ impl Thread {
let raw = substart(&main);
Thread {
main: main,
raw_thread: raw
raw_thread: raw,
joined: false
}
}

pub fn join(self) {
assert!(!self.joined);
let mut this = self;
unsafe { rust_raw_thread_join(this.raw_thread); }
this.joined = true;
}
}

impl Drop for Thread {
fn drop(&self) {
unsafe { rust_raw_thread_join_delete(self.raw_thread) }
assert!(self.joined);
unsafe { rust_raw_thread_delete(self.raw_thread) }
}
}

extern {
pub unsafe fn rust_raw_thread_start(f: &(~fn())) -> *raw_thread;
pub unsafe fn rust_raw_thread_join_delete(thread: *raw_thread);
pub unsafe fn rust_raw_thread_join(thread: *raw_thread);
pub unsafe fn rust_raw_thread_delete(thread: *raw_thread);
}
3 changes: 2 additions & 1 deletion src/libstd/rt/uv/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ mod test {
let mut loop_ = Loop::new();
let watcher = AsyncWatcher::new(&mut loop_, |w, _| w.close(||()) );
let watcher_cell = Cell::new(watcher);
let _thread = do Thread::start {
let thread = do Thread::start {
let mut watcher = watcher_cell.take();
watcher.send();
};
loop_.run();
loop_.close();
thread.join();
}
}
}
12 changes: 8 additions & 4 deletions src/libstd/rt/uv/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ mod test {
}
}

let _client_thread = do Thread::start {
let client_thread = do Thread::start {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
Expand All @@ -739,6 +739,7 @@ mod test {
let mut loop_ = loop_;
loop_.run();
loop_.close();
client_thread.join();
}
}

Expand Down Expand Up @@ -790,7 +791,7 @@ mod test {
}
}

let _client_thread = do Thread::start {
let client_thread = do Thread::start {
rtdebug!("starting client thread");
let mut loop_ = Loop::new();
let mut tcp_watcher = { TcpWatcher::new(&mut loop_) };
Expand All @@ -814,6 +815,7 @@ mod test {
let mut loop_ = loop_;
loop_.run();
loop_.close();
client_thread.join();
}
}

Expand Down Expand Up @@ -855,7 +857,7 @@ mod test {
server.close(||{});
}

do Thread::start {
let thread = do Thread::start {
let mut loop_ = Loop::new();
let mut client = UdpWatcher::new(&loop_);
assert!(client.bind(client_addr).is_ok());
Expand All @@ -873,6 +875,7 @@ mod test {

loop_.run();
loop_.close();
thread.join();
}
}

Expand Down Expand Up @@ -914,7 +917,7 @@ mod test {
server.close(||{});
}

do Thread::start {
let thread = do Thread::start {
let mut loop_ = Loop::new();
let mut client = UdpWatcher::new(&loop_);
assert!(client.bind(client_addr).is_ok());
Expand All @@ -932,6 +935,7 @@ mod test {

loop_.run();
loop_.close();
thread.join();
}
}
}
3 changes: 2 additions & 1 deletion src/libstd/rt/uv/uvio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,12 @@ mod test_remote {
};
remote_cell.put_back(remote);
}
let _thread = do Thread::start {
let thread = do Thread::start {
remote_cell.take().fire();
};

assert!(tube.recv() == 1);
thread.join();
}
}
}
Expand Down
Loading