Skip to content

Commit

Permalink
threads: avoid deadlock from recursive lock acquire
Browse files Browse the repository at this point in the history
Finalizers can't safely acquire many essential locks (such as the
iolock, to cleanup libuv objects) if they are run inside another lock.
Therefore, inhibit all finalizers on the thread until all locks are
released (previously, this was only true for our internal locks).
  • Loading branch information
vtjnash committed Dec 3, 2020
1 parent 49b8e61 commit 59aedd1
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 33 deletions.
3 changes: 2 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Command-line option changes
Multi-threading changes
-----------------------

* Locks now automatically inhibit finalizers from running, to avoid deadlock ([#TBD]).
* New function `Base.Threads.foreach(f, channel::Channel)` for multithreaded `Channel` consumption ([#34543]).

Build system changes
--------------------
Expand All @@ -85,7 +87,6 @@ New library functions
---------------------

* New function `Base.kron!` and corresponding overloads for various matrix types for performing Kronecker product in-place ([#31069]).
* New function `Base.Threads.foreach(f, channel::Channel)` for multithreaded `Channel` consumption ([#34543]).
* New function `Base.readeach(io, T)` for iteratively performing `read(io, T)` ([#36150]).
* `Iterators.map` is added. It provides another syntax `Iterators.map(f, iterators...)`
for writing `(f(args...) for args in zip(iterators...))`, i.e. a lazy `map` ([#34352]).
Expand Down
10 changes: 10 additions & 0 deletions base/gcutils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ Control whether garbage collection is enabled using a boolean argument (`true` f
"""
enable(on::Bool) = ccall(:jl_gc_enable, Int32, (Int32,), on) != 0

"""
GC.enable_finalizers(on::Bool)
Increment or decrement the counter that controls the running of finalizers on
the current Task. Finalizers will only run when the counter is at zero. (Set
`true` for enabling, `false` for disabling). They may still run concurrently on
another Task or thread.
"""
enable_finalizers(on::Bool) = ccall(:jl_gc_enable_finalizers, Cvoid, (Ptr{Cvoid}, Int32,), C_NULL, on)

"""
GC.@preserve x1 x2 ... xn expr
Expand Down
25 changes: 22 additions & 3 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,24 @@ const ThreadSynchronizer = GenericCondition{Threads.SpinLock}
"""
ReentrantLock()
Creates a re-entrant lock for synchronizing [`Task`](@ref)s.
The same task can acquire the lock as many times as required.
Each [`lock`](@ref) must be matched with an [`unlock`](@ref).
Creates a re-entrant lock for synchronizing [`Task`](@ref)s. The same task can
acquire the lock as many times as required. Each [`lock`](@ref) must be matched
with an [`unlock`](@ref).
Calling 'lock' will also inhibit running of finalizers on that thread until the
corresponding 'unlock'. Use of the standard lock pattern illustrated below
should naturally be supported, but beware of inverting the try/lock order or
missing the try block entirely (e.g. attempting to return with the lock still
held):
```
lock(l)
try
<atomic work>
finally
unlock(l)
end
```
"""
mutable struct ReentrantLock <: AbstractLock
locked_by::Union{Task, Nothing}
Expand Down Expand Up @@ -50,6 +65,7 @@ function trylock(rl::ReentrantLock)
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
GC.enable_finalizers(false)
got = true
else
got = false
Expand Down Expand Up @@ -77,6 +93,7 @@ function lock(rl::ReentrantLock)
if rl.reentrancy_cnt == 0
rl.locked_by = t
rl.reentrancy_cnt = 1
GC.enable_finalizers(false)
break
end
try
Expand Down Expand Up @@ -118,6 +135,7 @@ function unlock(rl::ReentrantLock)
rethrow()
end
end
GC.enable_finalizers(true)
unlock(rl.cond_wait)
end
return
Expand All @@ -139,6 +157,7 @@ function unlockall(rl::ReentrantLock)
rethrow()
end
end
GC.enable_finalizers(true)
unlock(rl.cond_wait)
return n
end
Expand Down
11 changes: 10 additions & 1 deletion base/locks-mt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ Base.assert_havelock(l::SpinLock) = islocked(l) ? nothing : Base.concurrency_vio
function lock(l::SpinLock)
while true
if _get(l) == 0
GC.enable_finalizers(false)
p = _xchg!(l, 1)
if p == 0
return
end
GC.enable_finalizers(true)
end
ccall(:jl_cpu_pause, Cvoid, ())
# Temporary solution before we have gc transition support in codegen.
Expand All @@ -74,13 +76,20 @@ end

function trylock(l::SpinLock)
if _get(l) == 0
return _xchg!(l, 1) == 0
GC.enable_finalizers(false)
p = _xchg!(l, 1)
if p == 0
return true
end
GC.enable_finalizers(true)
end
return false
end

function unlock(l::SpinLock)
_get(l) == 0 && error("unlock count must match lock count")
_set!(l, 0)
GC.enable_finalizers(true)
ccall(:jl_cpu_wake, Cvoid, ())
return
end
Expand Down
32 changes: 28 additions & 4 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ static void run_finalizer(jl_ptls_t ptls, jl_value_t *o, jl_value_t *ff)
jl_printf((JL_STREAM*)STDERR_FILENO, "error in running finalizer: ");
jl_static_show((JL_STREAM*)STDERR_FILENO, jl_current_exception());
jl_printf((JL_STREAM*)STDERR_FILENO, "\n");
jlbacktrace(); // writen to STDERR_FILENO
jlbacktrace(); // written to STDERR_FILENO
}
}

Expand Down Expand Up @@ -392,12 +392,36 @@ static void run_finalizers(jl_ptls_t ptls)
arraylist_free(&copied_list);
}

JL_DLLEXPORT int jl_gc_get_finalizers_inhibited(jl_ptls_t ptls)
{
if (ptls == NULL)
ptls = jl_get_ptls_states();
return ptls->finalizers_inhibited;
}

JL_DLLEXPORT void jl_gc_enable_finalizers(jl_ptls_t ptls, int on)
{
if (ptls == NULL)
ptls = jl_get_ptls_states();
int old_val = ptls->finalizers_inhibited;
int new_val = old_val + (on ? -1 : 1);
if (new_val < 0) {
JL_TRY {
jl_error(""); // get a backtrace
}
JL_CATCH {
jl_printf((JL_STREAM*)STDERR_FILENO, "WARNING: GC finalizers already enabled on this thread.\n");
// Only print the backtrace once, to avoid spamming the logs
static int backtrace_printed = 0;
if (backtrace_printed == 0) {
backtrace_printed = 1;
jlbacktrace(); // written to STDERR_FILENO
}
}
return;
}
ptls->finalizers_inhibited = new_val;
if (!new_val && old_val && !ptls->in_finalizer) {
if (!new_val && old_val && !ptls->in_finalizer && ptls->locks.len == 0) {
ptls->in_finalizer = 1;
run_finalizers(ptls);
ptls->in_finalizer = 0;
Expand Down Expand Up @@ -1581,7 +1605,7 @@ STATIC_INLINE uintptr_t gc_read_stack(void *_addr, uintptr_t offset,
JL_NORETURN NOINLINE void gc_assert_datatype_fail(jl_ptls_t ptls, jl_datatype_t *vt,
jl_gc_mark_sp_t sp)
{
jl_printf(JL_STDOUT, "GC error (probable corruption) :\n");
jl_safe_printf("GC error (probable corruption) :\n");
gc_debug_print_status();
jl_(vt);
gc_debug_critical_error();
Expand Down Expand Up @@ -3192,7 +3216,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
// Only disable finalizers on current thread
// Doing this on all threads is racy (it's impossible to check
// or wait for finalizers on other threads without dead lock).
if (!ptls->finalizers_inhibited) {
if (!ptls->finalizers_inhibited && ptls->locks.len == 0) {
int8_t was_in_finalizer = ptls->in_finalizer;
ptls->in_finalizer = 1;
run_finalizers(ptls);
Expand Down
3 changes: 0 additions & 3 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1730,7 +1730,6 @@ typedef struct _jl_handler_t {
int8_t gc_state;
size_t locks_len;
sig_atomic_t defer_signal;
int finalizers_inhibited;
jl_timing_block_t *timing_stack;
size_t world_age;
} jl_handler_t;
Expand All @@ -1753,8 +1752,6 @@ typedef struct _jl_task_t {
int16_t tid;
// multiqueue priority
int16_t prio;
// current world age
size_t world_age;
// saved exception stack
jl_excstack_t *excstack;
// current exception handler
Expand Down
9 changes: 4 additions & 5 deletions src/locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,9 @@ static inline void jl_lock_frame_pop(void)

static inline void jl_mutex_lock(jl_mutex_t *lock)
{
jl_ptls_t ptls = jl_get_ptls_states();
JL_SIGATOMIC_BEGIN();
jl_mutex_wait(lock, 1);
jl_lock_frame_push(lock);
jl_gc_enable_finalizers(ptls, 0);
}

static inline int jl_mutex_trylock_nogc(jl_mutex_t *lock)
Expand All @@ -111,10 +109,8 @@ static inline int jl_mutex_trylock(jl_mutex_t *lock)
{
int got = jl_mutex_trylock_nogc(lock);
if (got) {
jl_ptls_t ptls = jl_get_ptls_states();
JL_SIGATOMIC_BEGIN();
jl_lock_frame_push(lock);
jl_gc_enable_finalizers(ptls, 0);
}
return got;
}
Expand All @@ -134,9 +130,12 @@ static inline void jl_mutex_unlock(jl_mutex_t *lock)
{
jl_ptls_t ptls = jl_get_ptls_states();
jl_mutex_unlock_nogc(lock);
jl_gc_enable_finalizers(ptls, 1);
jl_lock_frame_pop();
JL_SIGATOMIC_END();
if (ptls->locks.len == 0 && ptls->finalizers_inhibited == 0) {
ptls->finalizers_inhibited = 1;
jl_gc_enable_finalizers(ptls, 1); // call run_finalizers (may GC)
}
}

static inline void jl_mutex_init(jl_mutex_t *lock) JL_NOTSAFEPOINT
Expand Down
12 changes: 8 additions & 4 deletions src/rtutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ JL_DLLEXPORT void jl_enter_handler(jl_handler_t *eh)
eh->gc_state = ptls->gc_state;
eh->locks_len = ptls->locks.len;
eh->defer_signal = ptls->defer_signal;
eh->finalizers_inhibited = ptls->finalizers_inhibited;
eh->world_age = ptls->world_age;
current_task->eh = eh;
#ifdef ENABLE_TIMINGS
Expand Down Expand Up @@ -249,14 +248,14 @@ JL_DLLEXPORT void jl_eh_restore_state(jl_handler_t *eh)
current_task->eh = eh->prev;
ptls->pgcstack = eh->gcstack;
small_arraylist_t *locks = &ptls->locks;
if (locks->len > eh->locks_len) {
for (size_t i = locks->len;i > eh->locks_len;i--)
int unlocks = locks->len > eh->locks_len;
if (unlocks) {
for (size_t i = locks->len; i > eh->locks_len; i--)
jl_mutex_unlock_nogc((jl_mutex_t*)locks->items[i - 1]);
locks->len = eh->locks_len;
}
ptls->world_age = eh->world_age;
ptls->defer_signal = eh->defer_signal;
ptls->finalizers_inhibited = eh->finalizers_inhibited;
if (old_gc_state != eh->gc_state) {
jl_atomic_store_release(&ptls->gc_state, eh->gc_state);
if (old_gc_state) {
Expand All @@ -266,6 +265,11 @@ JL_DLLEXPORT void jl_eh_restore_state(jl_handler_t *eh)
if (old_defer_signal && !eh->defer_signal) {
jl_sigint_safepoint(ptls);
}
if (unlocks && eh->locks_len == 0 && ptls->finalizers_inhibited == 0) {
// call run_finalizers
ptls->finalizers_inhibited = 1;
jl_gc_enable_finalizers(ptls, 1);
}
}

JL_DLLEXPORT void jl_pop_handler(int n)
Expand Down
17 changes: 14 additions & 3 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,8 @@ static void ctx_switch(jl_ptls_t ptls)
}

// set up global state for new task
lastt->world_age = ptls->world_age;
ptls->pgcstack = t->gcstack;
ptls->world_age = t->world_age;
ptls->world_age = 0;
t->gcstack = NULL;
#ifdef MIGRATE_TASKS
ptls->previous_task = lastt;
Expand Down Expand Up @@ -510,8 +509,14 @@ JL_DLLEXPORT void jl_switch(void)
else if (t->tid != ptls->tid) {
jl_error("cannot switch to task running on another thread");
}

// Store old values on the stack and reset
sig_atomic_t defer_signal = ptls->defer_signal;
int8_t gc_state = jl_gc_unsafe_enter(ptls);
size_t world_age = ptls->world_age;
int finalizers_inhibited = ptls->finalizers_inhibited;
ptls->world_age = 0;
ptls->finalizers_inhibited = 0;

#ifdef ENABLE_TIMINGS
jl_timing_block_t *blk = ct->timing_stack;
Expand All @@ -533,7 +538,12 @@ JL_DLLEXPORT void jl_switch(void)
assert(ptls == refetch_ptls());
#endif

ct = ptls->current_task;
// Pop old values back off the stack
assert(ct == ptls->current_task &&
0 == ptls->world_age &&
0 == ptls->finalizers_inhibited);
ptls->world_age = world_age;
ptls->finalizers_inhibited = finalizers_inhibited;

#ifdef ENABLE_TIMINGS
assert(blk == ct->timing_stack);
Expand Down Expand Up @@ -796,6 +806,7 @@ STATIC_OR_JS void NOINLINE JL_NORETURN start_task(void)
jl_ptls_t ptls = jl_get_ptls_states();
jl_task_t *t = ptls->current_task;
jl_value_t *res;
assert(ptls->finalizers_inhibited == 0);

#ifdef MIGRATE_TASKS
jl_task_t *pt = ptls->previous_task;
Expand Down
5 changes: 3 additions & 2 deletions stdlib/Distributed/src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ function flush_gc_msgs(w::Worker)
end

# del_msgs gets populated by finalizers, so be very careful here about ordering of allocations
# XXX: threading requires this to be atomic
new_array = Any[]
msgs = w.del_msgs
w.del_msgs = new_array
Expand Down Expand Up @@ -166,7 +167,7 @@ function send_msg_(w::Worker, header, msg, now::Bool)
wait(w.initialized)
end
io = w.w_stream
lock(io.lock)
lock(io)
try
reset_state(w.w_serializer)
serialize_hdr_raw(io, header)
Expand All @@ -179,7 +180,7 @@ function send_msg_(w::Worker, header, msg, now::Bool)
flush(io)
end
finally
unlock(io.lock)
unlock(io)
end
end

Expand Down
Loading

0 comments on commit 59aedd1

Please sign in to comment.