diff --git a/NEWS.md b/NEWS.md index a0274f403e66c9..eb9eea9ae7eea0 100644 --- a/NEWS.md +++ b/NEWS.md @@ -88,6 +88,7 @@ Multi-threading changes * `@threads` now allows an optional schedule argument. Use `@threads :static ...` to ensure that the same schedule will be used as in past versions; the default schedule is likely to change in the future. +* New function `Base.Threads.foreach(f, channel::Channel)` for multithreaded `Channel` consumption ([#34543]). Build system changes -------------------- diff --git a/base/gcutils.jl b/base/gcutils.jl index 5d6d8fc2c5805a..72882d68b8638f 100644 --- a/base/gcutils.jl +++ b/base/gcutils.jl @@ -91,6 +91,16 @@ Control whether garbage collection is enabled using a boolean argument (`true` f """ enable(on::Bool) = ccall(:jl_gc_enable, Int32, (Int32,), on) != 0 +""" + GC.enable_finalizers(on::Bool) + +Increment or decrement the counter that controls the running of finalizers on +the current Task. Finalizers will only run when the counter is at zero. (Set +`true` for enabling, `false` for disabling). They may still run concurrently on +another Task or thread. +""" +enable_finalizers(on::Bool) = ccall(:jl_gc_enable_finalizers, Cvoid, (Ptr{Cvoid}, Int32,), C_NULL, on) + """ GC.@preserve x1 x2 ... xn expr diff --git a/base/lock.jl b/base/lock.jl index b518766fe29fef..9d0a6b053f05ed 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -4,9 +4,24 @@ """ ReentrantLock() -Creates a re-entrant lock for synchronizing [`Task`](@ref)s. -The same task can acquire the lock as many times as required. -Each [`lock`](@ref) must be matched with an [`unlock`](@ref). +Creates a re-entrant lock for synchronizing [`Task`](@ref)s. The same task can +acquire the lock as many times as required. Each [`lock`](@ref) must be matched +with an [`unlock`](@ref). + +Calling 'lock' will also inhibit running of finalizers on that thread until the +corresponding 'unlock'. Use of the standard lock pattern illustrated below +should naturally be supported, but beware of inverting the try/lock order or +missing the try block entirely (e.g. attempting to return with the lock still +held): + +``` +lock(l) +try + +finally + unlock(l) +end +``` """ mutable struct ReentrantLock <: AbstractLock locked_by::Union{Task, Nothing} @@ -44,6 +59,7 @@ function trylock(rl::ReentrantLock) if rl.reentrancy_cnt == 0 rl.locked_by = t rl.reentrancy_cnt = 1 + GC.enable_finalizers(false) got = true elseif t === notnothing(rl.locked_by) rl.reentrancy_cnt += 1 @@ -71,6 +87,7 @@ function lock(rl::ReentrantLock) if rl.reentrancy_cnt == 0 rl.locked_by = t rl.reentrancy_cnt = 1 + GC.enable_finalizers(false) break elseif t === notnothing(rl.locked_by) rl.reentrancy_cnt += 1 @@ -111,6 +128,7 @@ function unlock(rl::ReentrantLock) rethrow() end end + GC.enable_finalizers(true) end unlock(rl.cond_wait) return @@ -132,6 +150,7 @@ function unlockall(rl::ReentrantLock) rethrow() end end + GC.enable_finalizers(true) unlock(rl.cond_wait) return n end diff --git a/base/locks-mt.jl b/base/locks-mt.jl index 2a492c175c6c36..c910ee6e013186 100644 --- a/base/locks-mt.jl +++ b/base/locks-mt.jl @@ -61,10 +61,12 @@ Base.assert_havelock(l::SpinLock) = islocked(l) ? nothing : Base.concurrency_vio function lock(l::SpinLock) while true if _get(l) == 0 + GC.enable_finalizers(false) p = _xchg!(l, 1) if p == 0 return end + GC.enable_finalizers(true) end ccall(:jl_cpu_pause, Cvoid, ()) # Temporary solution before we have gc transition support in codegen. @@ -74,13 +76,20 @@ end function trylock(l::SpinLock) if _get(l) == 0 - return _xchg!(l, 1) == 0 + GC.enable_finalizers(false) + p = _xchg!(l, 1) + if p == 0 + return true + end + GC.enable_finalizers(true) end return false end function unlock(l::SpinLock) + _get(l) == 0 && error("unlock count must match lock count") _set!(l, 0) + GC.enable_finalizers(true) ccall(:jl_cpu_wake, Cvoid, ()) return end diff --git a/src/gc.c b/src/gc.c index 4c26d6e467781f..5b69a4a1025a03 100644 --- a/src/gc.c +++ b/src/gc.c @@ -392,12 +392,36 @@ static void run_finalizers(jl_ptls_t ptls) arraylist_free(&copied_list); } +JL_DLLEXPORT int jl_gc_get_finalizers_inhibited(jl_ptls_t ptls) +{ + if (ptls == NULL) + ptls = jl_get_ptls_states(); + return ptls->finalizers_inhibited; +} + JL_DLLEXPORT void jl_gc_enable_finalizers(jl_ptls_t ptls, int on) { + if (ptls == NULL) + ptls = jl_get_ptls_states(); int old_val = ptls->finalizers_inhibited; int new_val = old_val + (on ? -1 : 1); + if (new_val < 0) { + JL_TRY { + jl_error(""); // get a backtrace + } + JL_CATCH { + jl_printf((JL_STREAM*)STDERR_FILENO, "WARNING: GC finalizers already enabled on this thread.\n"); + // Only print the backtrace once, to avoid spamming the logs + static int backtrace_printed = 0; + if (backtrace_printed == 0) { + backtrace_printed = 1; + jlbacktrace(); // written to STDERR_FILENO + } + } + return; + } ptls->finalizers_inhibited = new_val; - if (!new_val && old_val && !ptls->in_finalizer) { + if (!new_val && old_val && !ptls->in_finalizer && ptls->current_task->locks.len == 0) { ptls->in_finalizer = 1; run_finalizers(ptls); ptls->in_finalizer = 0; @@ -1580,7 +1604,7 @@ STATIC_INLINE uintptr_t gc_read_stack(void *_addr, uintptr_t offset, JL_NORETURN NOINLINE void gc_assert_datatype_fail(jl_ptls_t ptls, jl_datatype_t *vt, jl_gc_mark_sp_t sp) { - jl_printf(JL_STDOUT, "GC error (probable corruption) :\n"); + jl_safe_printf("GC error (probable corruption) :\n"); gc_debug_print_status(); jl_(vt); gc_debug_critical_error(); @@ -3121,7 +3145,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) // Only disable finalizers on current thread // Doing this on all threads is racy (it's impossible to check // or wait for finalizers on other threads without dead lock). - if (!ptls->finalizers_inhibited) { + if (!ptls->finalizers_inhibited && ptls->current_task->locks.len == 0) { int8_t was_in_finalizer = ptls->in_finalizer; ptls->in_finalizer = 1; run_finalizers(ptls); diff --git a/src/julia.h b/src/julia.h index a7f3052e59fb33..64482226d1e708 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1719,7 +1719,6 @@ typedef struct _jl_handler_t { int8_t gc_state; size_t locks_len; sig_atomic_t defer_signal; - int finalizers_inhibited; jl_timing_block_t *timing_stack; size_t world_age; } jl_handler_t; @@ -1751,8 +1750,6 @@ typedef struct _jl_task_t { jl_gcframe_t *gcstack; // saved exception stack jl_excstack_t *excstack; - // current world age - size_t world_age; // id of owning thread // does not need to be defined until the task runs diff --git a/src/locks.h b/src/locks.h index 6365eb681ca305..a0c7e9da7831e1 100644 --- a/src/locks.h +++ b/src/locks.h @@ -89,11 +89,9 @@ static inline void jl_lock_frame_pop(void) static inline void jl_mutex_lock(jl_mutex_t *lock) { - jl_ptls_t ptls = jl_get_ptls_states(); JL_SIGATOMIC_BEGIN(); jl_mutex_wait(lock, 1); jl_lock_frame_push(lock); - jl_gc_enable_finalizers(ptls, 0); } static inline int jl_mutex_trylock_nogc(jl_mutex_t *lock) @@ -116,10 +114,8 @@ static inline int jl_mutex_trylock(jl_mutex_t *lock) { int got = jl_mutex_trylock_nogc(lock); if (got) { - jl_ptls_t ptls = jl_get_ptls_states(); JL_SIGATOMIC_BEGIN(); jl_lock_frame_push(lock); - jl_gc_enable_finalizers(ptls, 0); } return got; } @@ -139,9 +135,12 @@ static inline void jl_mutex_unlock(jl_mutex_t *lock) { jl_ptls_t ptls = jl_get_ptls_states(); jl_mutex_unlock_nogc(lock); - jl_gc_enable_finalizers(ptls, 1); jl_lock_frame_pop(); JL_SIGATOMIC_END(); + if (ptls->current_task->locks.len == 0 && ptls->finalizers_inhibited == 0) { + ptls->finalizers_inhibited = 1; + jl_gc_enable_finalizers(ptls, 1); // call run_finalizers (may GC) + } } static inline void jl_mutex_init(jl_mutex_t *lock) JL_NOTSAFEPOINT diff --git a/src/rtutils.c b/src/rtutils.c index f9a8eb59a564ef..17cadf46574e92 100644 --- a/src/rtutils.c +++ b/src/rtutils.c @@ -215,7 +215,6 @@ JL_DLLEXPORT void jl_enter_handler(jl_handler_t *eh) eh->gc_state = ptls->gc_state; eh->locks_len = current_task->locks.len; eh->defer_signal = ptls->defer_signal; - eh->finalizers_inhibited = ptls->finalizers_inhibited; eh->world_age = ptls->world_age; current_task->eh = eh; #ifdef ENABLE_TIMINGS @@ -246,21 +245,26 @@ JL_DLLEXPORT void jl_eh_restore_state(jl_handler_t *eh) current_task->eh = eh->prev; ptls->pgcstack = eh->gcstack; arraylist_t *locks = ¤t_task->locks; - if (locks->len > eh->locks_len) { - for (size_t i = locks->len;i > eh->locks_len;i--) + int unlocks = locks->len > eh->locks_len; + if (unlocks) { + for (size_t i = locks->len; i > eh->locks_len; i--) jl_mutex_unlock_nogc((jl_mutex_t*)locks->items[i - 1]); locks->len = eh->locks_len; } ptls->world_age = eh->world_age; ptls->defer_signal = eh->defer_signal; ptls->gc_state = eh->gc_state; - ptls->finalizers_inhibited = eh->finalizers_inhibited; if (old_gc_state && !eh->gc_state) { jl_gc_safepoint_(ptls); } if (old_defer_signal && !eh->defer_signal) { jl_sigint_safepoint(ptls); } + if (unlocks && eh->locks_len == 0 && ptls->finalizers_inhibited == 0) { + // call run_finalizers + ptls->finalizers_inhibited = 1; + jl_gc_enable_finalizers(ptls, 1); + } } JL_DLLEXPORT void jl_pop_handler(int n) diff --git a/src/task.c b/src/task.c index 06e6d4895e7d76..5891b877b4b459 100644 --- a/src/task.c +++ b/src/task.c @@ -319,9 +319,8 @@ static void ctx_switch(jl_ptls_t ptls) } // set up global state for new task - lastt->world_age = ptls->world_age; ptls->pgcstack = t->gcstack; - ptls->world_age = t->world_age; + ptls->world_age = 0; t->gcstack = NULL; #ifdef MIGRATE_TASKS ptls->previous_task = lastt; @@ -404,8 +403,14 @@ JL_DLLEXPORT void jl_switch(void) else if (t->tid != ptls->tid) { jl_error("cannot switch to task running on another thread"); } + + // Store old values on the stack and reset sig_atomic_t defer_signal = ptls->defer_signal; int8_t gc_state = jl_gc_unsafe_enter(ptls); + size_t world_age = ptls->world_age; + int finalizers_inhibited = ptls->finalizers_inhibited; + ptls->world_age = 0; + ptls->finalizers_inhibited = 0; #ifdef ENABLE_TIMINGS jl_timing_block_t *blk = ct->timing_stack; @@ -427,7 +432,12 @@ JL_DLLEXPORT void jl_switch(void) assert(ptls == refetch_ptls()); #endif - ct = ptls->current_task; + // Pop old values back off the stack + assert(ct == ptls->current_task && + 0 == ptls->world_age && + 0 == ptls->finalizers_inhibited); + ptls->world_age = world_age; + ptls->finalizers_inhibited = finalizers_inhibited; #ifdef ENABLE_TIMINGS assert(blk == ct->timing_stack); @@ -680,6 +690,7 @@ STATIC_OR_JS void NOINLINE JL_NORETURN start_task(void) jl_ptls_t ptls = jl_get_ptls_states(); jl_task_t *t = ptls->current_task; jl_value_t *res; + assert(ptls->finalizers_inhibited == 0); #ifdef MIGRATE_TASKS jl_task_t *pt = ptls->previous_task; diff --git a/stdlib/Distributed/src/messages.jl b/stdlib/Distributed/src/messages.jl index bbfb13f276fa53..f8e1da156babcb 100644 --- a/stdlib/Distributed/src/messages.jl +++ b/stdlib/Distributed/src/messages.jl @@ -147,6 +147,7 @@ function flush_gc_msgs(w::Worker) end # del_msgs gets populated by finalizers, so be very careful here about ordering of allocations + # XXX: threading requires this to be atomic new_array = Any[] msgs = w.del_msgs w.del_msgs = new_array @@ -178,7 +179,7 @@ function send_msg_(w::Worker, header, msg, now::Bool) wait(w.initialized) end io = w.w_stream - lock(io.lock) + lock(io) try reset_state(w.w_serializer) serialize_hdr_raw(io, header) @@ -191,7 +192,7 @@ function send_msg_(w::Worker, header, msg, now::Bool) flush(io) end finally - unlock(io.lock) + unlock(io) end end diff --git a/test/core.jl b/test/core.jl index baf256a6eaf94b..8e12e4bc5bc979 100644 --- a/test/core.jl +++ b/test/core.jl @@ -6,7 +6,6 @@ using Random, SparseArrays, InteractiveUtils const Bottom = Union{} - # For curmod_* include("testenv.jl") @@ -2071,7 +2070,7 @@ mutable struct A6142 <: AbstractMatrix{Float64}; end +(x::A6142, y::AbstractRange) = "AbstractRange method called" #16324 ambiguity # issue #6175 -function g6175(); print(""); (); end +function g6175(); GC.safepoint(); (); end g6175(i::Real, I...) = g6175(I...) g6175(i, I...) = tuple(length(i), g6175(I...)...) @test g6175(1:5) === (5,) @@ -2211,7 +2210,7 @@ day_in(obj6387) function segfault6793(;gamma=1) A = 1 B = 1 - print() + GC.safepoint() return -gamma nothing @@ -3317,7 +3316,7 @@ function f11065() if i == 1 z = "z is defined" elseif i == 2 - print(z) + print(z) # z is undefined end end end @@ -4234,7 +4233,10 @@ end end # disable GC to make sure no collection/promotion happens # when we are constructing the objects +get_finalizers_inhibited() = ccall(:jl_gc_get_finalizers_inhibited, Int32, (Ptr{Cvoid},), C_NULL) let gc_enabled13995 = GC.enable(false) + @assert gc_enabled13995 + @assert get_finalizers_inhibited() == 0 finalized13995 = [false, false, false, false] create_dead_object13995(finalized13995) GC.enable(true) diff --git a/test/misc.jl b/test/misc.jl index d8593c8351d3b6..c4d26e3b223b95 100644 --- a/test/misc.jl +++ b/test/misc.jl @@ -87,9 +87,12 @@ let @test occursin("f()", warning_str) end +# Debugging tool: return the current state of the enable_finalizers counter. +get_finalizers_inhibited() = ccall(:jl_gc_get_finalizers_inhibited, Int32, (Ptr{Cvoid},), C_NULL) + # lock / unlock let l = ReentrantLock() - lock(l) + @test lock(l) === nothing @test islocked(l) success = Ref(false) @test trylock(l) do @@ -102,14 +105,43 @@ let l = ReentrantLock() @test success[] t = @async begin @test trylock(l) do - @test false + error("unreachable") end === false end + @test get_finalizers_inhibited() == 1 Base.wait(t) - unlock(l) + @test get_finalizers_inhibited() == 1 + @test unlock(l) === nothing + @test get_finalizers_inhibited() == 0 @test_throws ErrorException unlock(l) end +for l in (Threads.SpinLock(), ReentrantLock()) + @test get_finalizers_inhibited() == 0 + @test lock(get_finalizers_inhibited, l) == 1 + @test get_finalizers_inhibited() == 0 + try + GC.enable_finalizers(false) + GC.enable_finalizers(false) + @test get_finalizers_inhibited() == 2 + GC.enable_finalizers(true) + @test get_finalizers_inhibited() == 1 + finally + @test get_finalizers_inhibited() == 1 + GC.enable_finalizers(false) + @test get_finalizers_inhibited() == 2 + end + @test get_finalizers_inhibited() == 2 + GC.enable_finalizers(true) + @test get_finalizers_inhibited() == 1 + GC.enable_finalizers(true) + @test get_finalizers_inhibited() == 0 + @test_warn "WARNING: GC finalizers already enabled on this thread." GC.enable_finalizers(true) + + @test lock(l) === nothing + @test try unlock(l) finally end === nothing +end + # task switching @noinline function f6597(c)