Skip to content

Commit

Permalink
Tasking for Emscripten/Wasm target (#32532)
Browse files Browse the repository at this point in the history
This is an implementation of Julia's coroutine/tasking system on top of
the binaryen Asyncify transform [1] recently implemented by Alon Zakai.
The wasm target is unusual in several ways:

1. It has an implicitly managed call stack that we may not modify directly
   (i.e. is only modified through calls/returns).
2. The event loop is inverted, in that the browser runs the main event loop and
   we get callbacks for events, rather than Julia being the main event loop.

Asyncify takes care of the first problem by providing a mechanism to explicitly
unwind and rewind the implicitly managed stack (essentially copying it to an
explicitly managed stack - see the linked implementation for more information).

For the second, I am currently using the ptls root_task to represent the browser
event loop, i.e. yielding to that task will return control back to the browser
and this is the task in which functions called by javascript will run unless they
explicitly construct a task to run. As a result, julia code executed in the main
task may not perform any blocking operations (though this is currently not enforced).
I think this is a sensible setup since the main task will want to run some minor julia
code (e.g. to introspect some data structures), but the bulk of the code will run in
their own tasks (e.g. the REPL backend task).

[1] https://github.com/WebAssembly/binaryen/blob/master/src/passes/Asyncify.cpp
  • Loading branch information
Keno authored Aug 2, 2019
1 parent 86c2126 commit 1ad8178
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 9 deletions.
2 changes: 1 addition & 1 deletion base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ function wait()
W = Workqueues[Threads.threadid()]
reftask = poptaskref(W)
result = try_yieldto(ensure_rescheduled, reftask)
process_events()
Sys.isjsvm() || process_events()
# return when we come out of the queue
return result
end
Expand Down
144 changes: 144 additions & 0 deletions src/jsvm-emscripten/asyncify_setup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
Module.preRun.push(function() {
if (typeof Asyncify !== "undefined") {
Asyncify.instrumentWasmExports = function (exports) { return exports; };
Asyncify.handleSleep = function (startAsync) {
if (ABORT) return;
Module['noExitRuntime'] = true;
if (Asyncify.state === Asyncify.State.Normal) {
// Prepare to sleep. Call startAsync, and see what happens:
// if the code decided to call our callback synchronously,
// then no async operation was in fact begun, and we don't
// need to do anything.
var reachedCallback = false;
var reachedAfterCallback = false;
var task = get_current_task();
startAsync(function(returnValue) {
assert(!returnValue || typeof returnValue === 'number'); // old emterpretify API supported other stuff
if (ABORT) return;
Asyncify.returnValue = returnValue || 0;
reachedCallback = true;
if (!reachedAfterCallback) {
// We are happening synchronously, so no need for async.
return;
}
schedule_and_wait(task);
});
reachedAfterCallback = true;
if (!reachedCallback) {
Module['_jl_task_wait']();
}
} else if (Asyncify.state === Asyncify.State.Rewinding) {
// Stop a resume.
finish_schedule_task();
} else {
abort('invalid state: ' + Asyncify.state);
}
return Asyncify.returnValue;
};
}
});

function get_current_task() {
return Module['_jl_get_current_task']();
}

function get_root_task() {
return Module['_jl_get_root_task']();
}

function task_ctx_ptr(task) {
return Module["_task_ctx_ptr"](task);
}

function ctx_save(ctx) {
var stackPtr = stackSave();

// Save the bottom of the C stack in the task context. It simultaneously
// serves as the top of the asyncify stack.
HEAP32[ctx + 4 >> 2] = stackPtr;

Asyncify.state = Asyncify.State.Unwinding;
Module['_asyncify_start_unwind'](ctx);
if (Browser.mainLoop.func) {
Browser.mainLoop.pause();
}
}

function do_start_task(old_stack)
{
try {
// start_task is always the entry point for any task
Module['_start_task']();
} catch(e) {
stackRestore(old_stack)
if (e !== e+0 && e !== 'killed') throw e;
maybe_schedule_next();
return;
}
// Either unwind or normal exit. In either case, we're back at the main task
if (Asyncify.state === Asyncify.State.Unwinding) {
// We just finished unwinding for a sleep.
Asyncify.state = Asyncify.State.Normal;
Module['_asyncify_stop_unwind']();
}
stackRestore(old_stack);
maybe_schedule_next();
}

function schedule_and_wait(task) {
Module['_jl_schedule_task'](task);
Module['_jl_task_wait']();
}

function finish_schedule_task() {
Asyncify.state = Asyncify.State.Normal;
Module['_asyncify_stop_rewind']();
}

next_ctx = 0;
next_need_start = true;
function set_next_ctx(ctx, needs_start) {
next_ctx = ctx;
next_need_start = needs_start;
}

function root_ctx() {
return task_ctx_ptr(get_root_task())
}

function ctx_switch(lastt_ctx) {
if (lastt_ctx == root_ctx()) {
// If we're in the root context, switch to
// the new ctx now, else we'll get there after
// unwinding.
return schedule_next()
} else if (lastt_ctx == 0) {
throw 'killed';
} else {
return ctx_save(lastt_ctx);
}
}

function schedule_next()
{
old_stack = stackSave();
var next_task_stack = HEAP32[next_ctx + 4 >> 2];
if (!next_need_start) {
Asyncify.state = Asyncify.State.Rewinding;
Module['_asyncify_start_rewind'](next_ctx);
if (Browser.mainLoop.func) {
Browser.mainLoop.resume();
}
}
next_ctx = -1;
stackRestore(next_task_stack);
do_start_task(old_stack)
}

function maybe_schedule_next() {
assert(next_ctx != -1);
if (next_ctx == root_ctx() || next_ctx == 0) {
return;
}
schedule_next()
}
15 changes: 15 additions & 0 deletions src/jsvm-emscripten/task.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
mergeInto(LibraryManager.library, {
jl_set_fiber: function(ctx) {
set_next_ctx(ctx, false);
return ctx_switch(0)
},
jl_swap_fiber: function(lastt_ctx, ctx) {
set_next_ctx(ctx, false);
return ctx_switch(lastt_ctx)
},
jl_start_fiber: function(lastt_ctx, ctx) {
set_next_ctx(ctx, true);
return ctx_switch(lastt_ctx)
}
});

16 changes: 15 additions & 1 deletion src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

// Options for task switching algorithm (in order of preference):
// JL_HAVE_ASM -- mostly setjmp
// JL_HAVE_ASYNCIFY -- task switching based on the binaryen asyncify transform
// JL_HAVE_UNW_CONTEXT -- hybrid of libunwind for start, setjmp for resume
// JL_HAVE_UCONTEXT -- posix standard API, requires syscall for resume
// JL_HAVE_SIGALTSTACK -- requires several syscall for start, setjmp for resume
Expand All @@ -27,14 +28,17 @@ typedef win32_ucontext_t jl_ucontext_t;
#if !defined(JL_HAVE_UCONTEXT) && \
!defined(JL_HAVE_ASM) && \
!defined(JL_HAVE_UNW_CONTEXT) && \
!defined(JL_HAVE_SIGALTSTACK)
!defined(JL_HAVE_SIGALTSTACK) && \
!defined(JL_HAVE_ASYNCIFY)
#if (defined(_CPU_X86_64_) || defined(_CPU_X86_) || defined(_CPU_AARCH64_) || \
defined(_CPU_ARM_) || defined(_CPU_PPC64_))
#define JL_HAVE_ASM
#elif defined(_OS_DARWIN_)
#define JL_HAVE_UNW_CONTEXT
#elif defined(_OS_LINUX_)
#define JL_HAVE_UCONTEXT
#elif defined(_OS_EMSCRIPTEN_)
#define JL_HAVE_ASYNCIFY
#else
#define JL_HAVE_UNW_CONTEXT
#endif
Expand All @@ -45,6 +49,16 @@ typedef struct {
jl_jmp_buf uc_mcontext;
} jl_ucontext_t;
#endif
#if defined(JL_HAVE_ASYNCIFY)
typedef struct {
// This is the extent of the asyncify stack, but because the top of the
// asyncify stack (stacktop) is also the bottom of the C stack, we can
// reuse stacktop for both. N.B.: This matches the layout of the
// __asyncify_data struct.
void *stackbottom;
void *stacktop;
} jl_ucontext_t;
#endif
#if defined(JL_HAVE_UCONTEXT) || defined(JL_HAVE_UNW_CONTEXT)
#define UNW_LOCAL_ONLY
#include <libunwind.h>
Expand Down
5 changes: 5 additions & 0 deletions src/partr.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,9 +505,14 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
start_cycles = 0;
}
else {
#ifndef JL_HAVE_ASYNCIFY
// maybe check the kernel for new messages too
if (jl_atomic_load(&jl_uv_n_waiters) == 0)
jl_process_events(jl_global_event_loop());
#else
// Yield back to browser event loop
return ptls->root_task;
#endif
}
}
}
Expand Down
79 changes: 72 additions & 7 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,22 @@ volatile int jl_in_stackwalk = 0;

#define ROOT_TASK_STACK_ADJUSTMENT 3000000

#ifdef JL_HAVE_ASYNCIFY
// Switching logic is implemented in JavaScript
#define STATIC_OR_JS JL_DLLEXPORT
#else
#define STATIC_OR_JS static
#endif

jl_sym_t *done_sym;
jl_sym_t *failed_sym;
jl_sym_t *runnable_sym;

extern size_t jl_page_size;
static char *jl_alloc_fiber(jl_ucontext_t *t, size_t *ssize, jl_task_t *owner) JL_NOTSAFEPOINT;
static void jl_set_fiber(jl_ucontext_t *t);
static void jl_start_fiber(jl_ucontext_t *lastt, jl_ucontext_t *t);
static void jl_swap_fiber(jl_ucontext_t *lastt, jl_ucontext_t *t);
STATIC_OR_JS void jl_set_fiber(jl_ucontext_t *t);
STATIC_OR_JS void jl_start_fiber(jl_ucontext_t *lastt, jl_ucontext_t *t);
STATIC_OR_JS void jl_swap_fiber(jl_ucontext_t *lastt, jl_ucontext_t *t);

#ifdef JL_HAVE_UNW_CONTEXT
static JL_THREAD_LOCAL unw_cursor_t jl_basecursor;
Expand Down Expand Up @@ -319,9 +326,11 @@ static void ctx_switch(jl_ptls_t ptls, jl_task_t **pt)
jl_swap_fiber(lastt_ctx, &t->ctx);
}
else {
#ifdef COPY_STACKS
if (always_copy_stacks)
jl_longjmp(ptls->base_ctx.uc_mcontext, 1);
else
#endif
jl_start_fiber(lastt_ctx, &t->ctx);
}
}
Expand Down Expand Up @@ -557,6 +566,44 @@ JL_DLLEXPORT jl_value_t *jl_get_current_task(void)
return (jl_value_t*)ptls->current_task;
}

#ifdef JL_HAVE_ASYNCIFY
JL_DLLEXPORT jl_ucontext_t *task_ctx_ptr(jl_task_t *t)
{
return &t->ctx;
}

JL_DLLEXPORT jl_value_t *jl_get_root_task(void)
{
jl_ptls_t ptls = jl_get_ptls_states();
return (jl_value_t*)ptls->root_task;
}

void JL_DLLEXPORT jl_task_wait()
{
static jl_function_t *wait_func = NULL;
if (!wait_func) {
wait_func = (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("wait"));
}
size_t last_age = jl_get_ptls_states()->world_age;
jl_get_ptls_states()->world_age = jl_get_world_counter();
jl_apply(&wait_func, 1);
jl_get_ptls_states()->world_age = last_age;
}

void JL_DLLEXPORT jl_schedule_task(jl_task_t *task)
{
static jl_function_t *sched_func = NULL;
if (!sched_func) {
sched_func = (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("schedule"));
}
size_t last_age = jl_get_ptls_states()->world_age;
jl_get_ptls_states()->world_age = jl_get_world_counter();
jl_value_t *args[] = {(jl_value_t*)sched_func, (jl_value_t*)task};
jl_apply(args, 2);
jl_get_ptls_states()->world_age = last_age;
}
#endif

// Do one-time initializations for task system
void jl_init_tasks(void) JL_GC_DISABLED
{
Expand All @@ -565,7 +612,7 @@ void jl_init_tasks(void) JL_GC_DISABLED
runnable_sym = jl_symbol("runnable");
}

static void NOINLINE JL_NORETURN start_task(void)
STATIC_OR_JS void NOINLINE JL_NORETURN start_task(void)
{
#ifdef _OS_WINDOWS_
#if defined(_CPU_X86_64_)
Expand Down Expand Up @@ -939,6 +986,22 @@ static void jl_init_basefiber(size_t ssize)
}
#endif

#if defined(JL_HAVE_ASYNCIFY)
static void jl_init_basefiber(size_t ssize)
{
}
static char *jl_alloc_fiber(jl_ucontext_t *t, size_t *ssize, jl_task_t *owner) JL_NOTSAFEPOINT
{
void *stk = jl_malloc_stack(ssize, owner);
if (stk == NULL)
return NULL;
t->stackbottom = stk;
t->stacktop = ((char*)stk) + *ssize;
return (char*)stk;
}
// jl_*_fiber implemented in js
#endif

// Initialize a root task using the given stack.
void jl_init_root_task(void *stack_lo, void *stack_hi)
{
Expand Down Expand Up @@ -986,15 +1049,17 @@ void jl_init_root_task(void *stack_lo, void *stack_hi)
arraylist_new(&ptls->current_task->locks, 0);
#endif

#ifdef COPY_STACKS
if (always_copy_stacks) {
ptls->stackbase = stack_hi;
ptls->stacksize = ssize;
if (jl_setjmp(ptls->base_ctx.uc_mcontext, 0))
start_task();
return;
}
else {
jl_init_basefiber(JL_STACK_SIZE);
}
#endif

jl_init_basefiber(JL_STACK_SIZE);
}

JL_DLLEXPORT int jl_is_task_started(jl_task_t *t)
Expand Down

0 comments on commit 1ad8178

Please sign in to comment.