Skip to content

core: Allocate threads on demand, not on scheduler startup #3498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions src/libcore/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,8 @@ extern mod rustrt {

fn rust_get_sched_id() -> sched_id;
fn rust_new_sched(num_threads: libc::uintptr_t) -> sched_id;
fn sched_threads() -> libc::size_t;
fn rust_sched_threads() -> libc::size_t;
fn rust_sched_current_nonlazy_threads() -> libc::size_t;
fn rust_num_threads() -> libc::uintptr_t;

fn get_task_id() -> task_id;
Expand Down Expand Up @@ -2429,10 +2430,36 @@ fn test_sched_thread_per_core() {

do spawn_sched(ThreadPerCore) {
let cores = rustrt::rust_num_threads();
let reported_threads = rustrt::sched_threads();
let reported_threads = rustrt::rust_sched_threads();
assert(cores as uint == reported_threads as uint);
chan.send(());
}

port.recv();
}

#[test]
fn test_spawn_thread_on_demand() {
let (chan, port) = pipes::stream();

do spawn_sched(ManualThreads(2)) {
let max_threads = rustrt::rust_sched_threads();
assert(max_threads as int == 2);
let running_threads = rustrt::rust_sched_current_nonlazy_threads();
assert(running_threads as int == 1);

let (chan2, port2) = pipes::stream();

do spawn() {
chan2.send(());
}

let running_threads2 = rustrt::rust_sched_current_nonlazy_threads();
assert(running_threads2 as int == 2);

port2.recv();
chan.send(());
}

port.recv();
}
4 changes: 2 additions & 2 deletions src/libstd/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export run_tests_console;

#[abi = "cdecl"]
extern mod rustrt {
fn sched_threads() -> libc::size_t;
fn rust_sched_threads() -> libc::size_t;
}

// The name of a test. By convention this follows the rules for rust
Expand Down Expand Up @@ -330,7 +330,7 @@ const sched_overcommit : uint = 1u;
const sched_overcommit : uint = 4u;

fn get_concurrency() -> uint {
let threads = rustrt::sched_threads() as uint;
let threads = rustrt::rust_sched_threads() as uint;
if threads == 1u { 1u }
else { threads * sched_overcommit }
}
Expand Down
8 changes: 7 additions & 1 deletion src/rt/rust_builtin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,11 +627,17 @@ start_task(rust_task *target, fn_env_pair *f) {
}

extern "C" CDECL size_t
sched_threads() {
rust_sched_current_nonlazy_threads() {
rust_task *task = rust_get_current_task();
return task->sched->number_of_threads();
}

extern "C" CDECL size_t
rust_sched_threads() {
rust_task *task = rust_get_current_task();
return task->sched->max_number_of_threads();
}

extern "C" CDECL rust_port*
rust_port_take(rust_port_id id) {
rust_task *task = rust_get_current_task();
Expand Down
12 changes: 7 additions & 5 deletions src/rt/rust_kernel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ rust_kernel::rust_kernel(rust_env *env) :

// Create the single threaded scheduler that will run on the platform's
// main thread
rust_manual_sched_launcher_factory launchfac;
osmain_scheduler = create_scheduler(&launchfac, 1, false);
osmain_driver = launchfac.get_driver();
rust_manual_sched_launcher_factory *launchfac =
new rust_manual_sched_launcher_factory();
osmain_scheduler = create_scheduler(launchfac, 1, false);
osmain_driver = launchfac->get_driver();
sched_reaper.start();
}

Expand Down Expand Up @@ -79,8 +80,9 @@ void rust_kernel::free(void *mem) {

rust_sched_id
rust_kernel::create_scheduler(size_t num_threads) {
rust_thread_sched_launcher_factory launchfac;
return create_scheduler(&launchfac, num_threads, true);
rust_thread_sched_launcher_factory *launchfac =
new rust_thread_sched_launcher_factory();
return create_scheduler(launchfac, num_threads, true);
}

rust_sched_id
Expand Down
87 changes: 58 additions & 29 deletions src/rt/rust_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,41 @@
#include "rust_sched_launcher.h"

rust_scheduler::rust_scheduler(rust_kernel *kernel,
size_t num_threads,
size_t max_num_threads,
rust_sched_id id,
bool allow_exit,
bool killed,
rust_sched_launcher_factory *launchfac) :
ref_count(1),
kernel(kernel),
live_threads(num_threads),
live_threads(0),
live_tasks(0),
cur_thread(0),
may_exit(allow_exit),
num_threads(num_threads),
killed(killed),
launchfac(launchfac),
max_num_threads(max_num_threads),
id(id)
{
create_task_threads(launchfac, killed);
// Create the first thread
scoped_lock with(lock);
threads.push(create_task_thread(0));
}

void rust_scheduler::delete_this() {
destroy_task_threads();
delete launchfac;
delete this;
}

rust_sched_launcher *
rust_scheduler::create_task_thread(rust_sched_launcher_factory *launchfac,
int id, bool killed) {
rust_scheduler::create_task_thread(int id) {
lock.must_have_lock();
live_threads++;
rust_sched_launcher *thread = launchfac->create(this, id, killed);
KLOG(kernel, kern, "created task thread: " PTR ", id: %d",
thread, id);
KLOG(kernel, kern, "created task thread: " PTR
", id: %d, live_threads: %d",
thread, id, live_threads);
return thread;
}

Expand All @@ -43,27 +50,19 @@ rust_scheduler::destroy_task_thread(rust_sched_launcher *thread) {
delete thread;
}

void
rust_scheduler::create_task_threads(rust_sched_launcher_factory *launchfac,
bool killed) {
KLOG(kernel, kern, "Using %d scheduler threads.", num_threads);

for(size_t i = 0; i < num_threads; ++i) {
threads.push(create_task_thread(launchfac, i, killed));
}
}

void
rust_scheduler::destroy_task_threads() {
for(size_t i = 0; i < num_threads; ++i) {
scoped_lock with(lock);
for(size_t i = 0; i < threads.size(); ++i) {
destroy_task_thread(threads[i]);
}
}

void
rust_scheduler::start_task_threads()
{
for(size_t i = 0; i < num_threads; ++i) {
scoped_lock with(lock);
for(size_t i = 0; i < threads.size(); ++i) {
rust_sched_launcher *thread = threads[i];
thread->start();
}
Expand All @@ -72,16 +71,25 @@ rust_scheduler::start_task_threads()
void
rust_scheduler::join_task_threads()
{
for(size_t i = 0; i < num_threads; ++i) {
scoped_lock with(lock);
for(size_t i = 0; i < threads.size(); ++i) {
rust_sched_launcher *thread = threads[i];
thread->join();
}
}

void
rust_scheduler::kill_all_tasks() {
for(size_t i = 0; i < num_threads; ++i) {
rust_sched_launcher *thread = threads[i];
array_list<rust_sched_launcher *> copied_threads;
{
scoped_lock with(lock);
killed = true;
for (size_t i = 0; i < threads.size(); ++i) {
copied_threads.push(threads[i]);
}
}
for(size_t i = 0; i < copied_threads.size(); ++i) {
rust_sched_launcher *thread = copied_threads[i];
thread->get_loop()->kill_all_tasks();
}
}
Expand All @@ -92,10 +100,19 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) {
{
scoped_lock with(lock);
live_tasks++;
thread_no = cur_thread++;
if (cur_thread >= num_threads)
cur_thread = 0;

if (cur_thread < threads.size()) {
thread_no = cur_thread;
} else {
assert(threads.size() < max_num_threads);
thread_no = threads.size();
rust_sched_launcher *thread = create_task_thread(thread_no);
thread->start();
threads.push(thread);
}
cur_thread = (thread_no + 1) % max_num_threads;
}
KLOG(kernel, kern, "Creating task %s, on thread %d.", name, thread_no);
kernel->register_task();
rust_sched_launcher *thread = threads[thread_no];
return thread->get_loop()->create_task(spawner, name);
Expand All @@ -119,17 +136,29 @@ rust_scheduler::release_task() {

void
rust_scheduler::exit() {
// Take a copy of num_threads. After the last thread exits this
// Take a copy of the number of threads. After the last thread exits this
// scheduler will get destroyed, and our fields will cease to exist.
size_t current_num_threads = num_threads;
//
// This is also the reason we can't use the lock here (as in the other
// cases when accessing `threads`), after the loop the lock won't exist
// anymore. This is safe because this method is only called when all the
// task are dead, so there is no chance of a task trying to create new
// threads.
size_t current_num_threads = threads.size();
for(size_t i = 0; i < current_num_threads; ++i) {
threads[i]->get_loop()->exit();
}
}

size_t
rust_scheduler::max_number_of_threads() {
return max_num_threads;
}

size_t
rust_scheduler::number_of_threads() {
return num_threads;
scoped_lock with(lock);
return threads.size();
}

void
Expand Down
13 changes: 6 additions & 7 deletions src/rt/rust_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ class rust_scheduler : public kernel_owned<rust_scheduler> {
uintptr_t live_tasks;
size_t cur_thread;
bool may_exit;
bool killed;

rust_sched_launcher_factory *launchfac;
array_list<rust_sched_launcher *> threads;
const size_t num_threads;
const size_t max_num_threads;

rust_sched_id id;

void create_task_threads(rust_sched_launcher_factory *launchfac,
bool killed);
void destroy_task_threads();

rust_sched_launcher *
create_task_thread(rust_sched_launcher_factory *launchfac, int id,
bool killed);
rust_sched_launcher *create_task_thread(int id);
void destroy_task_thread(rust_sched_launcher *thread);

void exit();
Expand All @@ -51,7 +49,7 @@ class rust_scheduler : public kernel_owned<rust_scheduler> {
void delete_this();

public:
rust_scheduler(rust_kernel *kernel, size_t num_threads,
rust_scheduler(rust_kernel *kernel, size_t max_num_threads,
rust_sched_id id, bool allow_exit, bool killed,
rust_sched_launcher_factory *launchfac);

Expand All @@ -62,6 +60,7 @@ class rust_scheduler : public kernel_owned<rust_scheduler> {

void release_task();

size_t max_number_of_threads();
size_t number_of_threads();
// Called by each thread when it terminates. When all threads
// terminate the scheduler does as well.
Expand Down
3 changes: 2 additions & 1 deletion src/rt/rustrt.def.in
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ rust_port_size
rust_process_wait
rust_ptr_eq
rust_run_program
rust_sched_current_nonlazy_threads
rust_sched_threads
rust_set_exit_status
rust_start
rust_getcwd
Expand All @@ -58,7 +60,6 @@ rust_get_task
rust_get_stack_segment
rust_task_weaken
rust_task_unweaken
sched_threads
shape_log_str
start_task
vec_reserve_shared_actual
Expand Down
4 changes: 2 additions & 2 deletions src/test/run-pass/morestack6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ extern mod rustrt {
fn last_os_error() -> ~str;
fn rust_getcwd() -> ~str;
fn get_task_id() -> libc::intptr_t;
fn sched_threads();
fn rust_sched_threads();
fn rust_get_task();
}

fn calllink01() { rustrt::rust_get_sched_id(); }
fn calllink02() { rustrt::last_os_error(); }
fn calllink03() { rustrt::rust_getcwd(); }
fn calllink08() { rustrt::get_task_id(); }
fn calllink09() { rustrt::sched_threads(); }
fn calllink09() { rustrt::rust_sched_threads(); }
fn calllink10() { rustrt::rust_get_task(); }

fn runtest(f: fn~(), frame_backoff: u32) {
Expand Down