Skip to content

Commit

Permalink
deal with remaining race conditions in these APIs
Browse files Browse the repository at this point in the history
The jl_live_tasks API now reports all threads, instead of only Tasks
first started by the current thread. There is a new abstraction called
mtarraylist with adds functionality to small_arraylist (it is
layout-compatible). In particular, it makes it safe for another thread
to observe the content of the list concurrently with any mutations.
  • Loading branch information
vtjnash committed Sep 24, 2023
1 parent 1d9ebbc commit f662a2e
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ endif
SRCS := \
jltypes gf typemap smallintset ast builtins module interpreter symbol \
dlload sys init task array staticdata toplevel jl_uv datatype \
simplevector runtime_intrinsics precompile jloptions \
simplevector runtime_intrinsics precompile jloptions mtarraylist \
threading partr stackwalk gc gc-debug gc-pages gc-stacks gc-alloc-profiler method \
jlapi signal-handling safepoint timing subtype rtutils gc-heap-snapshot \
crc32c APInt-C processor ircode opaque_closure codegen-stubs coverage runtime_ccall
Expand Down
80 changes: 54 additions & 26 deletions src/gc-stacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ static void _jl_free_stack(jl_ptls_t ptls, void *stkbuf, size_t bufsz)
if (bufsz <= pool_sizes[JL_N_STACK_POOLS - 1]) {
unsigned pool_id = select_pool(bufsz);
if (pool_sizes[pool_id] == bufsz) {
arraylist_push(&ptls->heap.free_stacks[pool_id], stkbuf);
small_arraylist_push(&ptls->heap.free_stacks[pool_id], stkbuf);
return;
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ void jl_release_task_stack(jl_ptls_t ptls, jl_task_t *task)
#ifdef _COMPILER_ASAN_ENABLED_
__asan_unpoison_stack_memory((uintptr_t)stkbuf, bufsz);
#endif
arraylist_push(&ptls->heap.free_stacks[pool_id], stkbuf);
small_arraylist_push(&ptls->heap.free_stacks[pool_id], stkbuf);
}
}
}
Expand All @@ -160,9 +160,9 @@ JL_DLLEXPORT void *jl_malloc_stack(size_t *bufsz, jl_task_t *owner) JL_NOTSAFEPO
if (ssize <= pool_sizes[JL_N_STACK_POOLS - 1]) {
unsigned pool_id = select_pool(ssize);
ssize = pool_sizes[pool_id];
arraylist_t *pool = &ptls->heap.free_stacks[pool_id];
small_arraylist_t *pool = &ptls->heap.free_stacks[pool_id];
if (pool->len > 0) {
stk = arraylist_pop(pool);
stk = small_arraylist_pop(pool);
}
}
else {
Expand All @@ -181,8 +181,8 @@ JL_DLLEXPORT void *jl_malloc_stack(size_t *bufsz, jl_task_t *owner) JL_NOTSAFEPO
}
*bufsz = ssize;
if (owner) {
arraylist_t *live_tasks = &ptls->heap.live_tasks;
arraylist_push(live_tasks, owner);
small_arraylist_t *live_tasks = &ptls->heap.live_tasks;
mtarraylist_push(live_tasks, owner);
}
return stk;
}
Expand All @@ -206,7 +206,7 @@ void sweep_stack_pools(void)

// free half of stacks that remain unused since last sweep
for (int p = 0; p < JL_N_STACK_POOLS; p++) {
arraylist_t *al = &ptls2->heap.free_stacks[p];
small_arraylist_t *al = &ptls2->heap.free_stacks[p];
size_t n_to_free;
if (al->len > MIN_STACK_MAPPINGS_PER_POOL) {
n_to_free = al->len / 2;
Expand All @@ -217,12 +217,12 @@ void sweep_stack_pools(void)
n_to_free = 0;
}
for (int n = 0; n < n_to_free; n++) {
void *stk = arraylist_pop(al);
void *stk = small_arraylist_pop(al);
free_stack(stk, pool_sizes[p]);
}
}

arraylist_t *live_tasks = &ptls2->heap.live_tasks;
small_arraylist_t *live_tasks = &ptls2->heap.live_tasks;
size_t n = 0;
size_t ndel = 0;
size_t l = live_tasks->len;
Expand Down Expand Up @@ -265,24 +265,52 @@ void sweep_stack_pools(void)

JL_DLLEXPORT jl_array_t *jl_live_tasks(void)
{
jl_task_t *ct = jl_current_task;
jl_ptls_t ptls = ct->ptls;
arraylist_t *live_tasks = &ptls->heap.live_tasks;
size_t i, j, l;
jl_array_t *a;
do {
l = live_tasks->len;
a = jl_alloc_vec_any(l + 1); // may gc, changing the number of tasks
} while (l + 1 < live_tasks->len);
l = live_tasks->len;
void **lst = live_tasks->items;
j = 0;
((void**)jl_array_data(a))[j++] = ptls->root_task;
for (i = 0; i < l; i++) {
if (((jl_task_t*)lst[i])->stkbuf != NULL)
((void**)jl_array_data(a))[j++] = lst[i];
size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
size_t l = 0; // l is not reset on restart, so we keep getting more aggressive at making a big enough list everything it fails
restart:
for (size_t i = 0; i < nthreads; i++) {
// skip GC threads since they don't have tasks
if (gc_first_tid <= i && i < gc_first_tid + jl_n_gcthreads) {
continue;
}
jl_ptls_t ptls2 = allstates[i];
if (ptls2 == NULL)
continue;
small_arraylist_t *live_tasks = &ptls2->heap.live_tasks;
size_t n = mtarraylist_length(live_tasks);
l += n + (ptls2->root_task->stkbuf != NULL);
}
l += l / 20; // add 5% for margin of estimation error
jl_array_t *a = jl_alloc_vec_any(l); // may gc, changing the number of tasks and forcing us to reload everything
nthreads = jl_atomic_load_acquire(&jl_n_threads);
allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
size_t j = 0;
for (size_t i = 0; i < nthreads; i++) {
// skip GC threads since they don't have tasks
if (gc_first_tid <= i && i < gc_first_tid + jl_n_gcthreads) {
continue;
}
jl_ptls_t ptls2 = allstates[i];
if (ptls2 == NULL)
continue;
jl_task_t *t = ptls2->root_task;
if (t->stkbuf != NULL) {
if (j == l)
goto restart;
((void**)jl_array_data(a))[j++] = t;
}
small_arraylist_t *live_tasks = &ptls2->heap.live_tasks;
size_t n = mtarraylist_length(live_tasks);
for (size_t i = 0; i < n; i++) {
jl_task_t *t = (jl_task_t*)mtarraylist_get(live_tasks, i);
if (t->stkbuf != NULL) {
if (j == l)
goto restart;
((void**)jl_array_data(a))[j++] = t;
}
}
}
l = jl_array_len(a);
if (j < l) {
JL_GC_PUSH1(&a);
jl_array_del_end(a, l - j);
Expand Down
8 changes: 5 additions & 3 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -959,7 +959,7 @@ JL_DLLEXPORT jl_weakref_t *jl_gc_new_weakref_th(jl_ptls_t ptls,
jl_weakref_t *wr = (jl_weakref_t*)jl_gc_alloc(ptls, sizeof(void*),
jl_weakref_type);
wr->value = value; // NOTE: wb not needed here
arraylist_push(&ptls->heap.weak_refs, wr);
small_arraylist_push(&ptls->heap.weak_refs, wr);
return wr;
}

Expand Down Expand Up @@ -3624,8 +3624,10 @@ void jl_init_thread_heap(jl_ptls_t ptls)
p[i].freelist = NULL;
p[i].newpages = NULL;
}
arraylist_new(&heap->weak_refs, 0);
arraylist_new(&heap->live_tasks, 0);
small_arraylist_new(&heap->weak_refs, 0);
small_arraylist_new(&heap->live_tasks, 0);
for (int i = 0; i < JL_N_STACK_POOLS; i++)
small_arraylist_new(&heap->free_stacks[i], 0);
heap->mallocarrays = NULL;
heap->mafreelist = NULL;
heap->big_objects = NULL;
Expand Down
5 changes: 5 additions & 0 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,11 @@ JL_DLLEXPORT void *jl_gc_managed_realloc(void *d, size_t sz, size_t oldsz,
int isaligned, jl_value_t *owner);
JL_DLLEXPORT void jl_gc_safepoint(void);

void *mtarraylist_get(small_arraylist_t *_a, size_t idx) JL_NOTSAFEPOINT;
size_t mtarraylist_length(small_arraylist_t *_a) JL_NOTSAFEPOINT;
void mtarraylist_add(small_arraylist_t *_a, void *elt, size_t idx) JL_NOTSAFEPOINT;
void mtarraylist_push(small_arraylist_t *_a, void *elt) JL_NOTSAFEPOINT;

// object accessors -----------------------------------------------------------

#define jl_svec_len(t) (((jl_svec_t*)(t))->length)
Expand Down
6 changes: 3 additions & 3 deletions src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ typedef struct {

typedef struct {
// variable for tracking weak references
arraylist_t weak_refs;
small_arraylist_t weak_refs;
// live tasks started on this thread
// that are holding onto a stack from the pool
arraylist_t live_tasks;
small_arraylist_t live_tasks;

// variables for tracking malloc'd arrays
struct _mallocarray_t *mallocarrays;
Expand All @@ -170,7 +170,7 @@ typedef struct {
jl_gc_pool_t norm_pools[JL_GC_N_POOLS];

#define JL_N_STACK_POOLS 16
arraylist_t free_stacks[JL_N_STACK_POOLS];
small_arraylist_t free_stacks[JL_N_STACK_POOLS];
} jl_thread_heap_t;

typedef struct {
Expand Down
81 changes: 81 additions & 0 deletions src/mtarraylist.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// This file is a part of Julia. License is MIT: https://julialang.org/license

#include "julia.h"
#include "julia_internal.h"
#include "julia_assert.h"

#ifdef __cplusplus
extern "C" {
#endif

// this file provides some alternate API functions for small_arraylist (push and add)
// which can be safely observed from other threads concurrently
// there is only permitted to be a single writer thread (or a mutex)
// but there can be any number of observers

typedef struct {
_Atomic(uint32_t) len;
uint32_t max;
_Atomic(_Atomic(void*)*) items;
_Atomic(void*) _space[SMALL_AL_N_INLINE];
} small_mtarraylist_t;

// change capacity to at least newlen
static void mtarraylist_resizeto(small_mtarraylist_t *a, size_t len, size_t newlen) JL_NOTSAFEPOINT
{
size_t max = a->max;
if (newlen > max) {
size_t nm = max * 2;
if (nm == 0)
nm = 1;
while (newlen > nm)
nm *= 2;
void *olditems = (void*)jl_atomic_load_relaxed(&a->items);
void *p = calloc_s(nm * sizeof(void*));
memcpy(p, olditems, len * sizeof(void*));
jl_atomic_store_release(&a->items, (_Atomic(void*)*)p);
a->max = nm;
if (olditems != (void*)&a->_space[0]) {
jl_task_t *ct = jl_current_task;
jl_gc_add_quiescent(ct->ptls, olditems, free);
}
}
}

// single-threaded
void mtarraylist_push(small_arraylist_t *_a, void *elt)
{
small_mtarraylist_t *a = (small_mtarraylist_t*)_a;
size_t len = jl_atomic_load_relaxed(&a->len);
mtarraylist_resizeto(a, len, len + 1);
jl_atomic_store_release(&jl_atomic_load_relaxed(&a->items)[len], elt);
jl_atomic_store_release(&a->len, len + 1);
}

// single-threaded
void mtarraylist_add(small_arraylist_t *_a, void *elt, size_t idx)
{
small_mtarraylist_t *a = (small_mtarraylist_t*)_a;
size_t len = jl_atomic_load_relaxed(&a->len);
mtarraylist_resizeto(a, len, idx + 1);
jl_atomic_store_release(&jl_atomic_load_relaxed(&a->items)[idx], elt);
if (jl_atomic_load_relaxed(&a->len) < idx + 1)
jl_atomic_store_release(&a->len, idx + 1);
}

// concurrent-safe
size_t mtarraylist_length(small_arraylist_t *_a)
{
small_mtarraylist_t *a = (small_mtarraylist_t*)_a;
return jl_atomic_load_relaxed(&a->len);
}

// concurrent-safe
void *mtarraylist_get(small_arraylist_t *_a, size_t idx)
{
small_mtarraylist_t *a = (small_mtarraylist_t*)_a;
size_t len = jl_atomic_load_acquire(&a->len);
if (idx >= len)
return NULL;
return jl_atomic_load_relaxed(&jl_atomic_load_relaxed(&a->items)[idx]);
}
43 changes: 25 additions & 18 deletions src/stackwalk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ JL_DLLEXPORT void jlbacktrace(void) JL_NOTSAFEPOINT
}
}

// Print backtrace for specified task
// Print backtrace for specified task to jl_safe_printf stderr
JL_DLLEXPORT void jlbacktracet(jl_task_t *t) JL_NOTSAFEPOINT
{
jl_task_t *ct = jl_current_task;
Expand All @@ -1147,9 +1147,7 @@ JL_DLLEXPORT void jl_print_backtrace(void) JL_NOTSAFEPOINT

extern int gc_first_tid;

// Print backtraces for all live tasks, for all threads.
// WARNING: this is dangerous and can crash if used outside of gdb, if
// all of Julia's threads are not stopped!
// Print backtraces for all live tasks, for all threads, to jl_safe_printf stderr
JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
{
size_t nthreads = jl_atomic_load_acquire(&jl_n_threads);
Expand All @@ -1160,24 +1158,33 @@ JL_DLLEXPORT void jl_print_task_backtraces(int show_done) JL_NOTSAFEPOINT
continue;
}
jl_ptls_t ptls2 = allstates[i];
arraylist_t *live_tasks = &ptls2->heap.live_tasks;
size_t n = live_tasks->len;
if (ptls2 == NULL)
continue;
small_arraylist_t *live_tasks = &ptls2->heap.live_tasks;
size_t n = mtarraylist_length(live_tasks);
jl_task_t *t = ptls2->root_task;
int t_state = jl_atomic_load_relaxed(&t->_state);
jl_safe_printf("==== Thread %d created %zu live tasks\n",
ptls2->tid + 1, n + 1);
jl_safe_printf(" ---- Root task (%p)\n", ptls2->root_task);
jl_safe_printf(" (sticky: %d, started: %d, state: %d, tid: %d)\n",
ptls2->root_task->sticky, ptls2->root_task->started,
jl_atomic_load_relaxed(&ptls2->root_task->_state),
jl_atomic_load_relaxed(&ptls2->root_task->tid) + 1);
jlbacktracet(ptls2->root_task);
ptls2->tid + 1, n + (t_state != JL_TASK_STATE_DONE));
if (show_done || t_state != JL_TASK_STATE_DONE) {
jl_safe_printf(" ---- Root task (%p)\n", ptls2->root_task);
jl_safe_printf(" (sticky: %d, started: %d, state: %d, tid: %d)\n",
t->sticky, t->started, t_state,
jl_atomic_load_relaxed(&t->tid) + 1);
if (t->stkbuf != NULL)
jlbacktracet(t);
else
jl_safe_printf(" no stack\n");
jl_safe_printf(" ---- End root task\n");
}

void **lst = live_tasks->items;
for (size_t j = 0; j < live_tasks->len; j++) {
jl_task_t *t = (jl_task_t *)lst[j];
for (size_t j = 0; j < n; j++) {
jl_task_t *t = (jl_task_t*)mtarraylist_get(live_tasks, j);
if (t == NULL)
continue;
int t_state = jl_atomic_load_relaxed(&t->_state);
if (!show_done && t_state == JL_TASK_STATE_DONE) {
if (!show_done && t_state == JL_TASK_STATE_DONE)
continue;
}
jl_safe_printf(" ---- Task %zu (%p)\n", j + 1, t);
// n.b. this information might not be consistent with the stack printing after it, since it could start running or change tid, etc.
jl_safe_printf(" (sticky: %d, started: %d, state: %d, tid: %d)\n",
Expand Down
4 changes: 3 additions & 1 deletion src/threading.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ static uv_mutex_t tls_lock; // controls write-access to these variables:
_Atomic(jl_ptls_t*) jl_all_tls_states JL_GLOBALLY_ROOTED;
int jl_all_tls_states_size;
static uv_cond_t cond;
// concurrent reads are permitted, using the same pattern as mtsmall_arraylist
// it is implemented separately because the API of direct jl_all_tls_states use is already widely prevalent

// return calling thread's ID
JL_DLLEXPORT int16_t jl_threadid(void)
Expand Down Expand Up @@ -382,10 +384,10 @@ jl_ptls_t jl_init_threadtls(int16_t tid)
uv_cond_init(&ptls->wake_signal);

uv_mutex_lock(&tls_lock);
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
if (tid == -1)
tid = jl_atomic_load_relaxed(&jl_n_threads);
ptls->tid = tid;
jl_ptls_t *allstates = jl_atomic_load_relaxed(&jl_all_tls_states);
if (jl_all_tls_states_size <= tid) {
int i, newsize = jl_all_tls_states_size + tid + 2;
jl_ptls_t *newpptls = (jl_ptls_t*)calloc(newsize, sizeof(jl_ptls_t));
Expand Down

0 comments on commit f662a2e

Please sign in to comment.