diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 441e821b81a35..d4da2a968313a 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -1655,7 +1655,8 @@ extern mod rustrt { fn rust_get_sched_id() -> sched_id; fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id; - fn sched_threads() -> libc::size_t; + fn rust_sched_threads() -> libc::size_t; + fn rust_sched_current_nonlazy_threads() -> libc::size_t; fn rust_num_threads() -> libc::uintptr_t; fn get_task_id() -> task_id; @@ -2429,10 +2430,36 @@ fn test_sched_thread_per_core() { do spawn_sched(ThreadPerCore) { let cores = rustrt::rust_num_threads(); - let reported_threads = rustrt::sched_threads(); + let reported_threads = rustrt::rust_sched_threads(); assert(cores as uint == reported_threads as uint); chan.send(()); } port.recv(); } + +#[test] +fn test_spawn_thread_on_demand() { + let (chan, port) = pipes::stream(); + + do spawn_sched(ManualThreads(2)) { + let max_threads = rustrt::rust_sched_threads(); + assert(max_threads as int == 2); + let running_threads = rustrt::rust_sched_current_nonlazy_threads(); + assert(running_threads as int == 1); + + let (chan2, port2) = pipes::stream(); + + do spawn() { + chan2.send(()); + } + + let running_threads2 = rustrt::rust_sched_current_nonlazy_threads(); + assert(running_threads2 as int == 2); + + port2.recv(); + chan.send(()); + } + + port.recv(); +} diff --git a/src/libstd/test.rs b/src/libstd/test.rs index a2e7be51de81d..e69ef521e1298 100644 --- a/src/libstd/test.rs +++ b/src/libstd/test.rs @@ -26,7 +26,7 @@ export run_tests_console; #[abi = "cdecl"] extern mod rustrt { - fn sched_threads() -> libc::size_t; + fn rust_sched_threads() -> libc::size_t; } // The name of a test. By convention this follows the rules for rust @@ -330,7 +330,7 @@ const sched_overcommit : uint = 1u; const sched_overcommit : uint = 4u; fn get_concurrency() -> uint { - let threads = rustrt::sched_threads() as uint; + let threads = rustrt::rust_sched_threads() as uint; if threads == 1u { 1u } else { threads * sched_overcommit } } diff --git a/src/rt/rust_builtin.cpp b/src/rt/rust_builtin.cpp index a601908359c8c..8829089822c72 100644 --- a/src/rt/rust_builtin.cpp +++ b/src/rt/rust_builtin.cpp @@ -627,11 +627,17 @@ start_task(rust_task *target, fn_env_pair *f) { } extern "C" CDECL size_t -sched_threads() { +rust_sched_current_nonlazy_threads() { rust_task *task = rust_get_current_task(); return task->sched->number_of_threads(); } +extern "C" CDECL size_t +rust_sched_threads() { + rust_task *task = rust_get_current_task(); + return task->sched->max_number_of_threads(); +} + extern "C" CDECL rust_port* rust_port_take(rust_port_id id) { rust_task *task = rust_get_current_task(); diff --git a/src/rt/rust_kernel.cpp b/src/rt/rust_kernel.cpp index 698ee8667281b..669ebd55a7c9c 100644 --- a/src/rt/rust_kernel.cpp +++ b/src/rt/rust_kernel.cpp @@ -31,9 +31,10 @@ rust_kernel::rust_kernel(rust_env *env) : // Create the single threaded scheduler that will run on the platform's // main thread - rust_manual_sched_launcher_factory launchfac; - osmain_scheduler = create_scheduler(&launchfac, 1, false); - osmain_driver = launchfac.get_driver(); + rust_manual_sched_launcher_factory *launchfac = + new rust_manual_sched_launcher_factory(); + osmain_scheduler = create_scheduler(launchfac, 1, false); + osmain_driver = launchfac->get_driver(); sched_reaper.start(); } @@ -79,8 +80,9 @@ void rust_kernel::free(void *mem) { rust_sched_id rust_kernel::create_scheduler(size_t num_threads) { - rust_thread_sched_launcher_factory launchfac; - return create_scheduler(&launchfac, num_threads, true); + rust_thread_sched_launcher_factory *launchfac = + new rust_thread_sched_launcher_factory(); + return create_scheduler(launchfac, num_threads, true); } rust_sched_id diff --git a/src/rt/rust_scheduler.cpp b/src/rt/rust_scheduler.cpp index 5dd1a261c0efd..9bb311a8908a9 100644 --- a/src/rt/rust_scheduler.cpp +++ b/src/rt/rust_scheduler.cpp @@ -6,34 +6,41 @@ #include "rust_sched_launcher.h" rust_scheduler::rust_scheduler(rust_kernel *kernel, - size_t num_threads, + size_t max_num_threads, rust_sched_id id, bool allow_exit, bool killed, rust_sched_launcher_factory *launchfac) : ref_count(1), kernel(kernel), - live_threads(num_threads), + live_threads(0), live_tasks(0), cur_thread(0), may_exit(allow_exit), - num_threads(num_threads), + killed(killed), + launchfac(launchfac), + max_num_threads(max_num_threads), id(id) { - create_task_threads(launchfac, killed); + // Create the first thread + scoped_lock with(lock); + threads.push(create_task_thread(0)); } void rust_scheduler::delete_this() { destroy_task_threads(); + delete launchfac; delete this; } rust_sched_launcher * -rust_scheduler::create_task_thread(rust_sched_launcher_factory *launchfac, - int id, bool killed) { +rust_scheduler::create_task_thread(int id) { + lock.must_have_lock(); + live_threads++; rust_sched_launcher *thread = launchfac->create(this, id, killed); - KLOG(kernel, kern, "created task thread: " PTR ", id: %d", - thread, id); + KLOG(kernel, kern, "created task thread: " PTR + ", id: %d, live_threads: %d", + thread, id, live_threads); return thread; } @@ -43,19 +50,10 @@ rust_scheduler::destroy_task_thread(rust_sched_launcher *thread) { delete thread; } -void -rust_scheduler::create_task_threads(rust_sched_launcher_factory *launchfac, - bool killed) { - KLOG(kernel, kern, "Using %d scheduler threads.", num_threads); - - for(size_t i = 0; i < num_threads; ++i) { - threads.push(create_task_thread(launchfac, i, killed)); - } -} - void rust_scheduler::destroy_task_threads() { - for(size_t i = 0; i < num_threads; ++i) { + scoped_lock with(lock); + for(size_t i = 0; i < threads.size(); ++i) { destroy_task_thread(threads[i]); } } @@ -63,7 +61,8 @@ rust_scheduler::destroy_task_threads() { void rust_scheduler::start_task_threads() { - for(size_t i = 0; i < num_threads; ++i) { + scoped_lock with(lock); + for(size_t i = 0; i < threads.size(); ++i) { rust_sched_launcher *thread = threads[i]; thread->start(); } @@ -72,7 +71,8 @@ rust_scheduler::start_task_threads() void rust_scheduler::join_task_threads() { - for(size_t i = 0; i < num_threads; ++i) { + scoped_lock with(lock); + for(size_t i = 0; i < threads.size(); ++i) { rust_sched_launcher *thread = threads[i]; thread->join(); } @@ -80,8 +80,16 @@ rust_scheduler::join_task_threads() void rust_scheduler::kill_all_tasks() { - for(size_t i = 0; i < num_threads; ++i) { - rust_sched_launcher *thread = threads[i]; + array_list copied_threads; + { + scoped_lock with(lock); + killed = true; + for (size_t i = 0; i < threads.size(); ++i) { + copied_threads.push(threads[i]); + } + } + for(size_t i = 0; i < copied_threads.size(); ++i) { + rust_sched_launcher *thread = copied_threads[i]; thread->get_loop()->kill_all_tasks(); } } @@ -92,10 +100,19 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) { { scoped_lock with(lock); live_tasks++; - thread_no = cur_thread++; - if (cur_thread >= num_threads) - cur_thread = 0; + + if (cur_thread < threads.size()) { + thread_no = cur_thread; + } else { + assert(threads.size() < max_num_threads); + thread_no = threads.size(); + rust_sched_launcher *thread = create_task_thread(thread_no); + thread->start(); + threads.push(thread); + } + cur_thread = (thread_no + 1) % max_num_threads; } + KLOG(kernel, kern, "Creating task %s, on thread %d.", name, thread_no); kernel->register_task(); rust_sched_launcher *thread = threads[thread_no]; return thread->get_loop()->create_task(spawner, name); @@ -119,17 +136,29 @@ rust_scheduler::release_task() { void rust_scheduler::exit() { - // Take a copy of num_threads. After the last thread exits this + // Take a copy of the number of threads. After the last thread exits this // scheduler will get destroyed, and our fields will cease to exist. - size_t current_num_threads = num_threads; + // + // This is also the reason we can't use the lock here (as in the other + // cases when accessing `threads`), after the loop the lock won't exist + // anymore. This is safe because this method is only called when all the + // task are dead, so there is no chance of a task trying to create new + // threads. + size_t current_num_threads = threads.size(); for(size_t i = 0; i < current_num_threads; ++i) { threads[i]->get_loop()->exit(); } } +size_t +rust_scheduler::max_number_of_threads() { + return max_num_threads; +} + size_t rust_scheduler::number_of_threads() { - return num_threads; + scoped_lock with(lock); + return threads.size(); } void diff --git a/src/rt/rust_scheduler.h b/src/rt/rust_scheduler.h index 767ecaf7d1e3b..019f69f7a3160 100644 --- a/src/rt/rust_scheduler.h +++ b/src/rt/rust_scheduler.h @@ -30,19 +30,17 @@ class rust_scheduler : public kernel_owned { uintptr_t live_tasks; size_t cur_thread; bool may_exit; + bool killed; + rust_sched_launcher_factory *launchfac; array_list threads; - const size_t num_threads; + const size_t max_num_threads; rust_sched_id id; - void create_task_threads(rust_sched_launcher_factory *launchfac, - bool killed); void destroy_task_threads(); - rust_sched_launcher * - create_task_thread(rust_sched_launcher_factory *launchfac, int id, - bool killed); + rust_sched_launcher *create_task_thread(int id); void destroy_task_thread(rust_sched_launcher *thread); void exit(); @@ -51,7 +49,7 @@ class rust_scheduler : public kernel_owned { void delete_this(); public: - rust_scheduler(rust_kernel *kernel, size_t num_threads, + rust_scheduler(rust_kernel *kernel, size_t max_num_threads, rust_sched_id id, bool allow_exit, bool killed, rust_sched_launcher_factory *launchfac); @@ -62,6 +60,7 @@ class rust_scheduler : public kernel_owned { void release_task(); + size_t max_number_of_threads(); size_t number_of_threads(); // Called by each thread when it terminates. When all threads // terminate the scheduler does as well. diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index cb2f36fe31ba4..14116394b3d35 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -48,6 +48,8 @@ rust_port_size rust_process_wait rust_ptr_eq rust_run_program +rust_sched_current_nonlazy_threads +rust_sched_threads rust_set_exit_status rust_start rust_getcwd @@ -58,7 +60,6 @@ rust_get_task rust_get_stack_segment rust_task_weaken rust_task_unweaken -sched_threads shape_log_str start_task vec_reserve_shared_actual diff --git a/src/test/run-pass/morestack6.rs b/src/test/run-pass/morestack6.rs index 3036d4c201fa2..da4cfd0b47144 100644 --- a/src/test/run-pass/morestack6.rs +++ b/src/test/run-pass/morestack6.rs @@ -8,7 +8,7 @@ extern mod rustrt { fn last_os_error() -> ~str; fn rust_getcwd() -> ~str; fn get_task_id() -> libc::intptr_t; - fn sched_threads(); + fn rust_sched_threads(); fn rust_get_task(); } @@ -16,7 +16,7 @@ fn calllink01() { rustrt::rust_get_sched_id(); } fn calllink02() { rustrt::last_os_error(); } fn calllink03() { rustrt::rust_getcwd(); } fn calllink08() { rustrt::get_task_id(); } -fn calllink09() { rustrt::sched_threads(); } +fn calllink09() { rustrt::rust_sched_threads(); } fn calllink10() { rustrt::rust_get_task(); } fn runtest(f: fn~(), frame_backoff: u32) {