Skip to content

Commit

Permalink
threads: further optimize scheduler
Browse files Browse the repository at this point in the history
and add some debugging aids, controlled by changing the JULIA_DEBUG_SLEEPWAKE() macro in julia_threads.h
  • Loading branch information
vtjnash committed Jul 17, 2019
1 parent 261e2b9 commit 998b5ae
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 64 deletions.
15 changes: 11 additions & 4 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ void JL_UV_LOCK(void);
extern "C" {
#endif

// timers
// Returns time in nanosec
JL_DLLEXPORT uint64_t jl_hrtime(void);

STATIC_INLINE uint64_t rdtscp(void)
{
uint64_t rax, rdx;
asm volatile ( "rdtscp\n" : "=a" (rax), "=d" (rdx) : : "rcx" );
return (rdx << 32) + rax;
}

#include "timing.h"

#ifdef _COMPILER_MICROSOFT_
Expand Down Expand Up @@ -679,10 +690,6 @@ void jl_push_excstack(jl_excstack_t **stack JL_REQUIRE_ROOTED_SLOT JL_ROOTING_AR
uintptr_t *bt_data, size_t bt_size);
void jl_copy_excstack(jl_excstack_t *dest, jl_excstack_t *src) JL_NOTSAFEPOINT;

// timers
// Returns time in nanosec
JL_DLLEXPORT uint64_t jl_hrtime(void);

// congruential random number generator
// for a small amount of thread-local randomness
// we could just use libc:`rand()`, but we want to ensure this is fast
Expand Down
17 changes: 12 additions & 5 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
#include <atomics.h>
// threading ------------------------------------------------------------------

// WARNING: Threading support is incomplete and experimental
// Nonetheless, we define JL_THREAD and use it to give advanced notice to
// maintainers of what eventual threading support will change.
// JULIA_ENABLE_THREADING may be controlled by altering JULIA_THREADS in Make.user

// JULIA_ENABLE_THREADING is switched on in Make.inc if JULIA_THREADS is
// set (in Make.user)
// When running into scheduler issues, this may help provide information on the
// sequence of events that led to the issue. Normally, it is empty.
//#define JULIA_DEBUG_SLEEPWAKE(x) x
#define JULIA_DEBUG_SLEEPWAKE(x)

// Options for task switching algorithm (in order of preference):
// JL_HAVE_ASM -- mostly setjmp
Expand Down Expand Up @@ -210,6 +210,13 @@ struct _jl_tls_states_t {
// Saved exception for previous external API call or NULL if cleared.
// Access via jl_exception_occurred().
struct _jl_value_t *previous_exception;

JULIA_DEBUG_SLEEPWAKE(
uint64_t uv_run_enter;
uint64_t uv_run_leave;
uint64_t sleep_enter;
uint64_t sleep_leave;
)
};

// Update codegen version in `ccall.cpp` after changing either `pause` or `wake`
Expand Down
103 changes: 73 additions & 30 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,30 @@ static const int16_t not_sleeping = 0;
// to see if it is safe to transition to sleeping.
static const int16_t sleeping = 1;

// invariant: No thread is ever asleep unless sleep_check_state is sleeping (or we have a wakeup signal pending).
// invariant: Any particular thread is not asleep unless that thread's sleep_check_state is sleeping.
// invariant: The transition of a thread state to sleeping must be followed by a check that there wasn't work pending for it.
// information: Observing thread not-sleeping is sufficient to ensure the target thread will subsequently inspect its local queue.
// information: Observing global not-sleeping is NOT sufficient to ensure the target thread will subsequently inspect its local queue.
// information: Observing thread is-sleeping says it may be necessary to notify it at least once to wakeup. It may already be awake however for a variety of reasons.
// information: When we transition global state from sleeping to not_sleeping, we also transition the local to not_sleeping to reduce repeat wakeup attempts.

JULIA_DEBUG_SLEEPWAKE(
uint64_t wakeup_enter;
uint64_t wakeup_leave;
uint64_t io_wakeup_enter;
uint64_t io_wakeup_leave;
);


JL_DLLEXPORT void jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT
{
// Try to acquire the lock on this task.
// If this fails, we'll check for that error later (in jl_switchto).
if (jl_atomic_load_acquire(&task->tid) != tid) {
jl_atomic_compare_exchange(&task->tid, -1, tid);
}
}

#ifdef JULIA_ENABLE_THREADING

Expand Down Expand Up @@ -160,11 +184,10 @@ static inline jl_task_t *multiq_deletemin(void)
return NULL;

task = heaps[rn1].tasks[0];
jl_set_task_tid(task, ptls->tid);
if (jl_atomic_load_acquire(&task->tid) != ptls->tid) {
if (jl_atomic_compare_exchange(&task->tid, -1, ptls->tid) != -1) {
jl_mutex_unlock_nogc(&heaps[rn1].lock);
goto retry;
}
jl_mutex_unlock_nogc(&heaps[rn1].lock);
goto retry;
}
heaps[rn1].tasks[0] = heaps[rn1].tasks[--heaps[rn1].ntasks];
heaps[rn1].tasks[heaps[rn1].ntasks] = NULL;
Expand Down Expand Up @@ -298,6 +321,7 @@ JL_DLLEXPORT int jl_enqueue_task(jl_task_t *task)
// sleep_check_after_threshold() -- if sleep_threshold ns have passed, return 1
static int sleep_check_after_threshold(uint64_t *start_cycles)
{
JULIA_DEBUG_SLEEPWAKE( return 1 ); // hammer on the sleep/wake logic much harder
if (!(*start_cycles)) {
*start_cycles = jl_hrtime();
return 0;
Expand All @@ -314,11 +338,13 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)
static void wake_thread(int16_t tid)
{
jl_ptls_t other = jl_all_tls_states[tid];
int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping);
if (state == sleeping) {
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
if (jl_atomic_load(&other->sleep_check_state) != not_sleeping) {
int16_t state = jl_atomic_exchange(&other->sleep_check_state, not_sleeping); // prohibit it from sleeping
if (state == sleeping) { // see if it was possibly sleeping before now
uv_mutex_lock(&other->sleep_lock);
uv_cond_signal(&other->wake_signal);
uv_mutex_unlock(&other->sleep_lock);
}
}
}

Expand All @@ -338,16 +364,25 @@ static int sleep_check_after_threshold(uint64_t *start_cycles)

#endif

static void wake_libuv(void)
{
JULIA_DEBUG_SLEEPWAKE( io_wakeup_enter = rdtscp() );
jl_wake_libuv();
JULIA_DEBUG_SLEEPWAKE( io_wakeup_leave = rdtscp() );
}

/* ensure thread tid is awake if necessary */
JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
{
jl_ptls_t ptls = jl_get_ptls_states();
int16_t uvlock = jl_atomic_load(&jl_uv_mutex.owner);
int16_t self = ptls->tid;
unsigned long system_self = jl_all_tls_states[self]->system_id;
int16_t uvlock = jl_atomic_load_acquire(&jl_uv_mutex.owner);
JULIA_DEBUG_SLEEPWAKE( wakeup_enter = rdtscp() );
if (tid == self || tid == -1) {
// we're already awake, but make sure we'll exit uv_run
jl_atomic_store(&ptls->sleep_check_state, not_sleeping);
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping);
if (uvlock == system_self)
uv_stop(jl_global_event_loop());
}
Expand All @@ -356,39 +391,32 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
// something added to the sticky-queue: notify that thread
wake_thread(tid);
// check if we need to notify uv_run too
if (uvlock != system_self)
jl_wake_libuv();
unsigned long system_tid = jl_all_tls_states[tid]->system_id;
if (uvlock != system_self && jl_atomic_load(&jl_uv_mutex.owner) == system_tid)
wake_libuv();
}
// check if the other threads might be sleeping
if (tid == -1) {
// check if the other threads might be sleeping
if (jl_atomic_load_acquire(&sleep_check_state) != not_sleeping) {
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
// something added to the multi-queue: notify all threads
// in the future, we might want to instead wake some fraction of threads,
// and let each of those wake additional threads if they find work
if (jl_atomic_load(&sleep_check_state) != not_sleeping) {
int16_t state = jl_atomic_exchange(&sleep_check_state, not_sleeping);
if (state == sleeping) {
for (tid = 0; tid < jl_n_threads; tid++)
if (tid != self)
wake_thread(tid);
// check if we need to notify uv_run too
if (uvlock != system_self)
jl_wake_libuv();
if (uvlock != system_self && jl_atomic_load(&jl_uv_mutex.owner) != 0)
wake_libuv();
}
}
}
#endif
JULIA_DEBUG_SLEEPWAKE( wakeup_leave = rdtscp() );
}


JL_DLLEXPORT void jl_set_task_tid(jl_task_t *task, int tid) JL_NOTSAFEPOINT
{
// Try to acquire the lock on this task.
// If this fails, we'll check for that error later (in jl_switchto).
if (jl_atomic_load_acquire(&task->tid) != tid) {
jl_atomic_compare_exchange(&task->tid, -1, tid);
}
}

// get the next runnable task from the multiq
static jl_task_t *get_next_task(jl_value_t *getsticky)
{
Expand Down Expand Up @@ -440,8 +468,12 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
continue;
jl_atomic_store(&ptls->sleep_check_state, sleeping); // acquire sleep-check lock
task = get_next_task(getsticky);
if (task)
if (task) {
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
return task;
}

// one thread should win this race and watch the event loop
// inside a threaded region, any thread can listen for IO messages,
// although none are allowed to create new ones
Expand All @@ -468,11 +500,15 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
jl_gc_safepoint();
if (may_sleep(ptls)) {
loop->stop_flag = 0;
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_enter = rdtscp() );
active = uv_run(loop, UV_RUN_ONCE);
JULIA_DEBUG_SLEEPWAKE( ptls->uv_run_leave = rdtscp() );
}
JL_UV_UNLOCK();
// optimization: check again first if we may have work to do
if (!may_sleep(ptls)) {
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
continue;
}
Expand All @@ -487,19 +523,26 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
if (!_threadedregion && active && ptls->tid == 0) {
// thread 0 is the only thread permitted to run the event loop
// so it needs to stay alive
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
continue;
}
}

// the other threads will just wait for on signal to resume
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_enter = rdtscp() );
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_mutex_lock(&ptls->sleep_lock);
while (may_sleep(ptls)) {
uv_cond_wait(&ptls->wake_signal, &ptls->sleep_lock);
// TODO: help with gc work here, if applicable
}
uv_mutex_unlock(&ptls->sleep_lock);
JULIA_DEBUG_SLEEPWAKE( ptls->sleep_leave = rdtscp() );
jl_gc_safe_leave(ptls, gc_state); // contains jl_gc_safepoint
if (jl_atomic_load(&ptls->sleep_check_state) != not_sleeping)
jl_atomic_store(&ptls->sleep_check_state, not_sleeping); // let other threads know they don't need to wake us
start_cycles = 0;
}
else {
Expand Down
8 changes: 1 addition & 7 deletions src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,18 +505,12 @@ JL_DLLEXPORT void jl_threading_run(jl_value_t *func)
args2[1] = (jl_value_t*)t;
jl_apply(args2, 2);
#ifdef JULIA_ENABLE_THREADING
if (i == 1) {
if (i == 2 && nthreads > 2) {
// let threads know work is coming (optimistic)
jl_wakeup_thread(-1);
}
#endif
}
#ifdef JULIA_ENABLE_THREADING
if (nthreads > 2) {
// let threads know work is ready (guaranteed)
jl_wakeup_thread(-1);
}
#endif
// join with all tasks
JL_TRY {
for (int i = 0; i < nthreads; i++) {
Expand Down
7 changes: 0 additions & 7 deletions src/timing.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ void jl_timing_block_stop(jl_timing_block_t *cur_block);
#define JL_TIMING(owner)
#else

static inline uint64_t rdtscp(void)
{
uint64_t rax,rdx;
asm volatile ( "rdtscp\n" : "=a" (rax), "=d" (rdx) : : "rcx" );
return (rdx << 32) + rax;
}

#define JL_TIMING_OWNERS \
X(ROOT), \
X(GC), \
Expand Down
14 changes: 3 additions & 11 deletions test/threads.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test, Base.Threads

let p, cmd = `$(Base.julia_cmd()) --depwarn=error --startup-file=no threads_exec.jl`
# test both nthreads==1 and nthreads>1. spawn a process to test whichever
# case we are not running currently.
other_nthreads = nthreads() == 1 ? 4 : 1
p = run(pipeline(setenv(cmd, "JULIA_NUM_THREADS" => other_nthreads), stdout = stdout, stderr = stderr),
wait = false)
include("threads_exec.jl")
if !success(p)
error("threads test failed with nthreads == $other_nthreads")
let cmd = `$(Base.julia_cmd()) --depwarn=error --startup-file=no threads_exec.jl`
for test_nthreads in (1, 8, 8) # run once to try single-threaded mode, then try a couple times to trigger bad races
run(pipeline(setenv(cmd, "JULIA_NUM_THREADS" => test_nthreads), stdout = stdout, stderr = stderr))
end
end
8 changes: 8 additions & 0 deletions test/threads_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ using Test
using Base.Threads
using Base.Threads: SpinLock, Mutex

Timer(1200) do t
# set up a watchdog alarm for 20 minutes
# so that we can attempt to get a "friendly" backtrace if something gets stuck
# (expected test duration is about 30 seconds)
ccall(:uv_kill, Cint, (Cint, Cint), getpid(), Base.SIGTERM)
end


# threading constructs

let a = zeros(Int, 2 * nthreads())
Expand Down

0 comments on commit 998b5ae

Please sign in to comment.