diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index b8a522be96d97..e4f14e26ac5a9 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -246,8 +246,8 @@ For example, the above conditions imply that: - Communicating between iterations using blocking primitives like `Channel`s is incorrect. - Write only to locations not shared across iterations (unless a lock or atomic operation is used). -- The value of [`threadid()`](@ref Threads.threadid) may change even within a single - iteration. See [`Task Migration`](@ref man-task-migration) +- Unless the `:static` schedule is used, the value of [`threadid()`](@ref Threads.threadid) + may change even within a single iteration. See [`Task Migration`](@ref man-task-migration). ## Schedulers diff --git a/doc/src/manual/multi-threading.md b/doc/src/manual/multi-threading.md index afb1c749f9447..056ceb1363fd7 100644 --- a/doc/src/manual/multi-threading.md +++ b/doc/src/manual/multi-threading.md @@ -239,6 +239,68 @@ julia> a Note that [`Threads.@threads`](@ref) does not have an optional reduction parameter like [`@distributed`](@ref). +### Using `@threads` without data races + +Taking the example of a naive sum + +```julia-repl +julia> function sum_single(a) + s = 0 + for i in a + s += i + end + s + end +sum_single (generic function with 1 method) + +julia> sum_single(1:1_000_000) +500000500000 +``` + +Simply adding `@threads` exposes a data race with multiple threads reading and writing `s` at the same time. +```julia-repl +julia> function sum_multi_bad(a) + s = 0 + Threads.@threads for i in a + s += i + end + s + end +sum_multi_bad (generic function with 1 method) + +julia> sum_multi_bad(1:1_000_000) +70140554652 +``` + +Note that the result is not `500000500000` as it should be, and will most likely change each evaluation. + +To fix this, buffers that are specific to the task may be used to segment the sum into chunks that are race-free. +Here `sum_single` is reused, with its own internal buffer `s`, and vector `a` is split into `nthreads()` +chunks for parallel work via `nthreads()` `@spawn`-ed tasks. + +```julia-repl +julia> function sum_multi_good(a) + chunks = Iterators.partition(a, length(a) รท Threads.nthreads()) + tasks = map(chunks) do chunk + Threads.@spawn sum_single(chunk) + end + chunk_sums = fetch.(tasks) + return sum_single(chunk_sums) + end +sum_multi_good (generic function with 1 method) + +julia> sum_multi_good(1:1_000_000) +500000500000 +``` +!!! Note + Buffers should not be managed based on `threadid()` i.e. `buffers = zeros(Threads.nthreads())` because concurrent tasks + can yield, meaning multiple concurrent tasks may use the same buffer on a given thread, introducing risk of data races. + Further, when more than one thread is available tasks may change thread at yield points, which is known as + [task migration](@ref man-task-migration). + +Another option is the use of atomic operations on variables shared across tasks/threads, which may be more performant +depending on the characteristics of the operations. + ## Atomic Operations Julia supports accessing and modifying values *atomically*, that is, in a thread-safe way to avoid @@ -390,11 +452,13 @@ threads in Julia: ## [Task Migration](@id man-task-migration) -After a task starts running on a certain thread (e.g. via [`@spawn`](@ref Threads.@spawn) or -[`@threads`](@ref Threads.@threads)), it may move to a different thread if the task yields. +After a task starts running on a certain thread it may move to a different thread if the task yields. + +Such tasks may have been started with [`@spawn`](@ref Threads.@spawn) or [`@threads`](@ref Threads.@threads), +although the `:static` schedule option for `@threads` does freeze the threadid. -This means that [`threadid()`](@ref Threads.threadid) should not be treated as constant within a task, and therefore -should not be used to index into a vector of buffers or stateful objects. +This means that in most cases [`threadid()`](@ref Threads.threadid) should not be treated as constant within a task, +and therefore should not be used to index into a vector of buffers or stateful objects. !!! compat "Julia 1.7" Task migration was introduced in Julia 1.7. Before this tasks always remained on the same thread that they were