From dc68ffcae356554bd18cde6a147e777c7690cf12 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Mon, 6 Mar 2023 12:31:00 -0500 Subject: [PATCH 1/5] Extend `Threads.threadpoolsize` Allow specifying which thread pool's size to retrieve. --- base/threadingconstructs.jl | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 643cd95e57ebf..bb088ef8b01eb 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -66,15 +66,25 @@ Returns the number of threadpools currently configured. nthreadpools() = Int(unsafe_load(cglobal(:jl_n_threadpools, Cint))) """ - Threads.threadpoolsize() + Threads.threadpoolsize(pool::Symbol = :default) -> Int -Get the number of threads available to the Julia default worker-thread pool. +Get the number of threads available to the default thread pool (or to the +specified thread pool). See also: `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebra`](@ref man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed) standard library. """ -threadpoolsize() = Threads._nthreads_in_pool(Int8(0)) +function threadpoolsize(pool::Symbol = :default) + if pool === :default + tpid = Int8(0) + elseif pool === :interactive + tpid = Int8(1) + else + error("invalid threadpool specified") + end + return _nthreads_in_pool(tpid) +end function threading_run(fun, static) ccall(:jl_enter_threaded_region, Cvoid, ()) From 944dce960a8828a63eb9949001b0b7ae958139a0 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Mon, 6 Mar 2023 12:34:29 -0500 Subject: [PATCH 2/5] Update `nthreads(pool)` Now returns `threadpoolsize(pool)`. --- base/threadingconstructs.jl | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index bb088ef8b01eb..ccee6476f5a80 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -32,16 +32,7 @@ See also `BLAS.get_num_threads` and `BLAS.set_num_threads` in the [`LinearAlgebr man-linalg) standard library, and `nprocs()` in the [`Distributed`](@ref man-distributed) standard library and [`Threads.maxthreadid()`](@ref). """ -function nthreads(pool::Symbol) - if pool === :default - tpid = Int8(0) - elseif pool === :interactive - tpid = Int8(1) - else - error("invalid threadpool specified") - end - return _nthreads_in_pool(tpid) -end +nthreads(pool::Symbol) = threadpoolsize(pool) function _nthreads_in_pool(tpid::Int8) p = unsafe_load(cglobal(:jl_n_threads_per_pool, Ptr{Cint})) From 38727be9d7a1d1ca3a2cd407470cf6374ed3c2c6 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Mon, 6 Mar 2023 12:44:15 -0500 Subject: [PATCH 3/5] Fix task thread pool assignment If a task is spawned with `:interactive` but there are no interactive threads, set the task's thread pool to `:default` so that we don't have to keep checking it in other places. --- base/threadingconstructs.jl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index ccee6476f5a80..e7257759b15a9 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -344,7 +344,11 @@ macro spawn(args...) let $(letargs...) local task = Task($thunk) task.sticky = false - ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, $tpid) + local tpid_actual = $tpid + if _nthreads_in_pool(tpid_actual) == 0 + tpid_actual = Int8(0) + end + ccall(:jl_set_task_threadpoolid, Cint, (Any, Int8), task, tpid_actual) if $(Expr(:islocal, var)) put!($var, task) end From a3a92e8dd82146155c15cb7f5dc411fe29a6fc32 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Thu, 16 Feb 2023 20:49:31 -0500 Subject: [PATCH 4/5] Fix `enq_work` behavior when single-threaded If there's only one thread in the task's preferred thread pool, use that thread's work queue. --- base/task.jl | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/base/task.jl b/base/task.jl index ce34d2f179fc5..63d0e9b6bd757 100644 --- a/base/task.jl +++ b/base/task.jl @@ -767,22 +767,33 @@ end function enq_work(t::Task) (t._state === task_state_runnable && t.queue === nothing) || error("schedule: Task not runnable") - if t.sticky || Threads.threadpoolsize() == 1 + + # Sticky tasks go into their thread's work queue. + if t.sticky tid = Threads.threadid(t) if tid == 0 - # Issue #41324 - # t.sticky && tid == 0 is a task that needs to be co-scheduled with - # the parent task. If the parent (current_task) is not sticky we must - # set it to be sticky. - # XXX: Ideally we would be able to unset this - current_task().sticky = true + # The task is not yet stuck to a thread. Stick it to the current + # thread and do the same to the parent task (the current task) so + # that the tasks are correctly co-scheduled (issue #41324). + # XXX: Ideally we would be able to unset this. tid = Threads.threadid() ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) + current_task().sticky = true end push!(workqueue_for(tid), t) else - Partr.multiq_insert(t, t.priority) - tid = 0 + tp = Threads.threadpool(t) + if Threads.threadpoolsize(tp) == 1 + # There's only one thread in the task's assigned thread pool; + # use its work queue. + tid = (tp === :default) ? 1 : Threads.threadpoolsize(:default)+1 + ccall(:jl_set_task_tid, Cint, (Any, Cint), t, tid-1) + push!(workqueue_for(tid), t) + else + # Otherwise, put the task in the multiqueue. + Partr.multiq_insert(t, t.priority) + tid = 0 + end end ccall(:jl_wakeup_thread, Cvoid, (Int16,), (tid - 1) % Int16) return t From 55422d988c3a50ca0d272644e456c3959c62b097 Mon Sep 17 00:00:00 2001 From: K Pamnany Date: Thu, 16 Feb 2023 21:19:42 -0500 Subject: [PATCH 5/5] Fix test for threadpool use --- test/threadpool_use.jl | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/test/threadpool_use.jl b/test/threadpool_use.jl index 47c45bdd71eb8..64227c8a8110b 100644 --- a/test/threadpool_use.jl +++ b/test/threadpool_use.jl @@ -6,11 +6,6 @@ using Base.Threads @test nthreadpools() == 2 @test threadpool() === :default @test threadpool(2) === :interactive -dtask() = @test threadpool(current_task()) === :default -itask() = @test threadpool(current_task()) === :interactive -dt1 = @spawn dtask() -dt2 = @spawn :default dtask() -it = @spawn :interactive itask() -wait(dt1) -wait(dt2) -wait(it) +@test fetch(Threads.@spawn Threads.threadpool()) === :default +@test fetch(Threads.@spawn :default Threads.threadpool()) === :default +@test fetch(Threads.@spawn :interactive Threads.threadpool()) === :interactive