Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threading: fixup scheduler statepoints for GC #32238

Merged
merged 5 commits into from
Jun 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ struct _jl_tls_states_t {
int16_t tid;
uint64_t rngseed;
volatile size_t *safepoint;
volatile int8_t sleep_check_state;
// Whether it is safe to execute GC at the same time.
#define JL_GC_STATE_WAITING 1
// gc_state = 1 means the thread is doing GC or is waiting for the GC to
Expand Down
56 changes: 30 additions & 26 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,12 @@ static void wake_thread(int16_t self, int16_t tid)
{
if (self != tid) {
jl_ptls_t other = jl_all_tls_states[tid];
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping);
if (state == sleeping) {
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
}
}
}

Expand Down Expand Up @@ -386,19 +389,26 @@ JL_DLLEXPORT void jl_set_task_tid(jl_task_t *task, int tid)
// get the next runnable task from the multiq
static jl_task_t *get_next_task(jl_value_t *getsticky)
{
jl_gc_safepoint();
jl_task_t *task = (jl_task_t*)jl_apply(&getsticky, 1);
if (jl_typeis(task, jl_task_type)) {
int self = jl_get_ptls_states()->tid;
jl_set_task_tid(task, self);
return task;
}
jl_gc_safepoint();
#ifdef JULIA_ENABLE_THREADING
return multiq_deletemin();
#else
return NULL;
#endif
}

static int may_sleep(jl_ptls_t ptls)
{
return jl_atomic_load(&sleep_check_state) == sleeping && jl_atomic_load(&ptls->sleep_check_state) == sleeping;
}

extern volatile unsigned _threadedregion;

JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
Expand All @@ -408,12 +418,12 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
jl_task_t *task;

while (1) {
jl_gc_safepoint();
task = get_next_task(getsticky);
if (task)
return task;

#ifdef JULIA_ENABLE_THREADING
// quick, race-y check to see if there seems to be any stuff in there
jl_cpu_pause();
if (!multiq_check_empty()) {
start_cycles = 0;
Expand All @@ -425,6 +435,7 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
if (sleep_check_after_threshold(&start_cycles) || (!_threadedregion && ptls->tid == 0)) {
if (!sleep_check_now(ptls->tid))
continue;
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
task = get_next_task(getsticky);
if (task)
return task;
Expand All @@ -449,29 +460,26 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
JL_UV_UNLOCK();
}
else {
// otherwise, block until someone asks us for the lock
task = get_next_task(getsticky);
if (task) {
JL_UV_UNLOCK();
return task;
}
// otherwise, we may block until someone asks us for the lock
uv_loop_t *loop = jl_global_event_loop();
loop->stop_flag = 0;
active = uv_run(loop, UV_RUN_ONCE);
jl_gc_safepoint();
if (may_sleep(ptls)) {
loop->stop_flag = 0;
active = uv_run(loop, UV_RUN_ONCE);
}
JL_UV_UNLOCK();
// optimization: check again first if we added work for ourself
task = get_next_task(getsticky);
if (task)
return task;
// or someone else might have
if (jl_atomic_load(&sleep_check_state) != sleeping) {
// optimization: check again first if we may have work to do
if (!may_sleep(ptls)) {
start_cycles = 0;
continue;
}
// otherwise, we got a spurious wakeup since some other
// thread just wanted to steal libuv from us,
// thread that just wanted to steal libuv from us,
// just go right back to sleep on the other wake signal
// to let them take it from us without conflict
// TODO: this relinquishes responsibility for all event
// to the last thread to do an explicit operation,
// which may starve other threads of critical work
}
if (!_threadedregion && active && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
Expand All @@ -480,20 +488,16 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
continue;
}
}

// the other threads will just wait for on signal to resume
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (jl_atomic_load(&sleep_check_state) == sleeping) {
task = get_next_task(getsticky);
if (task)
break;
while (may_sleep(ptls)) {
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
}
uv_mutex_unlock(&ptls->sleep_lock);
jl_gc_safe_leave(ptls, gc_state);
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
start_cycles = 0;
if (task)
return task;
}
else {
// maybe check the kernel for new messages too
Expand Down