Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a convenience object for expressing once-like / per-runtime patterns #55793

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ variables. ([#53742]).
Multi-threading changes
-----------------------

* New types are defined to handle the pattern of code that must run once per process, called
a `OncePerProcess{T}` type, which allows defining a function that should be run exactly once
the first time it is called, and then always return the same result value of type `T`
every subsequent time afterwards. There are also `OncePerThread{T}` and `OncePerTask{T}` types for
similar usage with threads or tasks. ([#TBD])

Build system changes
--------------------

Expand Down
2 changes: 2 additions & 0 deletions base/docs/basedocs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ runtime initialization functions of external C libraries and initializing global
that involve pointers returned by external libraries.
See the [manual section about modules](@ref modules) for more details.

See also: [`OncePerProcess`](@ref).

# Examples
```julia
const foo_data_ptr = Ref{Ptr{Cvoid}}(0)
Expand Down
3 changes: 3 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export
OrdinalRange,
Pair,
PartialQuickSort,
OncePerProcess,
OncePerTask,
OncePerThread,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably not be exported?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*This = OncePerThread only. And still public, just not exported.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems fairly arbitrary?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's based on the idea that OncePerThread should be used very rarely (perhaps never in the absence of FFI)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, yes, it is fairly arbitrary, and I see the appeal to putting all three at the same level.

PermutedDimsArray,
QuickSort,
Rational,
Expand Down
282 changes: 282 additions & 0 deletions base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

const ThreadSynchronizer = GenericCondition{Threads.SpinLock}

"""
current_task()

Get the currently running [`Task`](@ref).
"""
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())

# Advisory reentrant lock
"""
ReentrantLock()
Expand Down Expand Up @@ -570,3 +577,278 @@ end
import .Base: Event
export Event
end

const PerStateInitial = 0x00
const PerStateHasrun = 0x01
const PerStateErrored = 0x02
const PerStateConcurrent = 0x03

"""
OncePerProcess{T}(init::Function)() -> T

Calling a `OncePerProcess` object returns a value of type `T` by running the
function `initializer` exactly once per process. All concurrent and future
calls in the same process will return exactly the same value. This is useful in
code that will be precompiled, as it allows setting up caches or other state
which won't get serialized.

## Example

```jldoctest
julia> const global_state = Base.OncePerProcess{Vector{UInt32}}() do
println("Making lazy global value...done.")
return [Libc.rand()]
end;
LilithHafner marked this conversation as resolved.
Show resolved Hide resolved

julia> (procstate = global_state()) |> typeof
Making lazy global value...done.
Vector{UInt32} (alias for Array{UInt32, 1})

julia> procstate === global_state()
true

julia> procstate === fetch(@async global_state())
true
```
"""
mutable struct OncePerProcess{T, F}
value::Union{Nothing,T}
@atomic state::UInt8 # 0=initial, 1=hasrun, 2=error
@atomic allow_compile_time::Bool
const initializer::F
const lock::ReentrantLock

function OncePerProcess{T,F}(initializer::F) where {T, F}
once = new{T,F}(nothing, PerStateInitial, true, initializer, ReentrantLock())
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :value, nothing)
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :state, PerStateInitial)
return once
end
end
OncePerProcess{T}(initializer::F) where {T, F} = OncePerProcess{T, F}(initializer)
OncePerProcess(initializer) = OncePerProcess{Base.promote_op(initializer), typeof(initializer)}(initializer)
@inline function (once::OncePerProcess{T})() where T
state = (@atomic :acquire once.state)
if state != PerStateHasrun
(@noinline function init_perprocesss(once, state)
state == PerStateErrored && error("OncePerProcess initializer failed previously")
once.allow_compile_time || __precompile__(false)
lock(once.lock)
try
state = @atomic :monotonic once.state
if state == PerStateInitial
once.value = once.initializer()
elseif state == PerStateErrored
error("OncePerProcess initializer failed previously")
elseif state != PerStateHasrun
error("invalid state for OncePerProcess")
end
catch
state == PerStateErrored || @atomic :release once.state = PerStateErrored
unlock(once.lock)
rethrow()
end
state == PerStateHasrun || @atomic :release once.state = PerStateHasrun
unlock(once.lock)
nothing
end)(once, state)
end
return once.value::T
end

function copyto_monotonic!(dest::AtomicMemory, src)
i = 1
for j in eachindex(src)
if isassigned(src, j)
@atomic :monotonic dest[i] = src[j]
#else
# _unsetindex_atomic!(dest, i, src[j], :monotonic)
end
i += 1
end
dest
end

function fill_monotonic!(dest::AtomicMemory, x)
for i = 1:length(dest)
@atomic :monotonic dest[i] = x
end
dest
end


# share a lock/condition, since we just need it briefly, so some contention is okay
const PerThreadLock = ThreadSynchronizer()
"""
OncePerThread{T}(init::Function)() -> T

Calling a `OncePerThread` object returns a value of type `T` by running the function
`initializer` exactly once per thread. All future calls in the same thread, and
concurrent or future calls with the same thread id, will return exactly the
same value. The object can also be indexed by the threadid for any existing
thread, to get (or initialize *on this thread*) the value stored for that
thread. Incorrect usage can lead to data-races or memory corruption so use only
if that behavior is correct within your library's threading-safety design.

!!! warning
It is not necessarily true that a Task only runs on one thread, therefore the value
returned here may alias other values or change in the middle of your program. This function
may get deprecated in the future. If initializer yields, the thread running the current
task after the call might not be the same as the one at the start of the call.

See also: [`OncePerTask`](@ref).

## Example

```jldoctest
julia> const thread_state = Base.OncePerThread{Vector{UInt32}}() do
println("Making lazy thread value...done.")
return [Libc.rand()]
end;

julia> (threadvec = thread_state()) |> typeof
Making lazy thread value...done.
Vector{UInt32} (alias for Array{UInt32, 1})

julia> threadvec === fetch(@async thread_state())
true

julia> threadvec === thread_state[Threads.threadid()]
true
```
"""
mutable struct OncePerThread{T, F}
@atomic xs::AtomicMemory{T} # values
@atomic ss::AtomicMemory{UInt8} # states: 0=initial, 1=hasrun, 2=error, 3==concurrent
const initializer::F

function OncePerThread{T,F}(initializer::F) where {T, F}
xs, ss = AtomicMemory{T}(), AtomicMemory{UInt8}()
once = new{T,F}(xs, ss, initializer)
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :xs, xs)
ccall(:jl_set_precompile_field_replace, Cvoid, (Any, Any, Any),
once, :ss, ss)
return once
end
end
OncePerThread{T}(initializer::F) where {T, F} = OncePerThread{T,F}(initializer)
OncePerThread(initializer) = OncePerThread{Base.promote_op(initializer), typeof(initializer)}(initializer)
@inline (once::OncePerThread)() = once[Threads.threadid()]
@inline function getindex(once::OncePerThread, tid::Integer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling, this getindex might be a bit too cute? Could just be an optional argument to the call method, i.e.:

function (once::OncePerThread)(tid::Integer = Threads.threadid())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that function call syntax would be better.

tid = Int(tid)
ss = @atomic :acquire once.ss
xs = @atomic :monotonic once.xs
# n.b. length(xs) >= length(ss)
if tid <= 0 || tid > length(ss) || (@atomic :acquire ss[tid]) != PerStateHasrun
(@noinline function init_perthread(once, tid)
local ss = @atomic :acquire once.ss
local xs = @atomic :monotonic once.xs
local len = length(ss)
# slow path to allocate it
nt = Threads.maxthreadid()
0 < tid <= nt || throw(ArgumentError("thread id outside of allocated range"))
if tid <= length(ss) && (@atomic :acquire ss[tid]) == PerStateErrored
error("OncePerThread initializer failed previously")
end
newxs = xs
newss = ss
if tid > len
# attempt to do all allocations outside of PerThreadLock for better scaling
@assert length(xs) >= length(ss) "logical constraint violation"
newxs = typeof(xs)(undef, len + nt)
newss = typeof(ss)(undef, len + nt)
end
# uses state and locks to ensure this runs exactly once per tid argument
lock(PerThreadLock)
try
ss = @atomic :monotonic once.ss
xs = @atomic :monotonic once.xs
if tid > length(ss)
@assert len <= length(ss) <= length(newss) "logical constraint violation"
fill_monotonic!(newss, PerStateInitial)
xs = copyto_monotonic!(newxs, xs)
ss = copyto_monotonic!(newss, ss)
@atomic :release once.xs = xs
@atomic :release once.ss = ss
end
state = @atomic :monotonic ss[tid]
while state == PerStateConcurrent
# lost race, wait for notification this is done running elsewhere
wait(PerThreadLock) # wait for initializer to finish without releasing this thread
ss = @atomic :monotonic once.ss
state = @atomic :monotonic ss[tid]
end
if state == PerStateInitial
# won the race, drop lock in exchange for state, and run user initializer
@atomic :monotonic ss[tid] = PerStateConcurrent
result = try
unlock(PerThreadLock)
once.initializer()
catch
lock(PerThreadLock)
ss = @atomic :monotonic once.ss
@atomic :release ss[tid] = PerStateErrored
notify(PerThreadLock)
rethrow()
end
# store result and notify waiters
lock(PerThreadLock)
xs = @atomic :monotonic once.xs
@atomic :release xs[tid] = result
ss = @atomic :monotonic once.ss
@atomic :release ss[tid] = PerStateHasrun
notify(PerThreadLock)
elseif state == PerStateErrored
error("OncePerThread initializer failed previously")
elseif state != PerStateHasrun
error("invalid state for OncePerThread")
end
finally
unlock(PerThreadLock)
end
nothing
end)(once, tid)
xs = @atomic :monotonic once.xs
end
return xs[tid]
end

"""
OncePerTask{T}(init::Function)() -> T

Calling a `OncePerTask` object returns a value of type `T` by running the function `initializer`
exactly once per Task. All future calls in the same Task will return exactly the same value.

See also: [`task_local_storage`](@ref).

## Example

```jldoctest
julia> const task_state = Base.OncePerTask{Vector{UInt32}}() do
println("Making lazy task value...done.")
return [Libc.rand()]
end;

julia> (taskvec = task_state()) |> typeof
Making lazy task value...done.
Vector{UInt32} (alias for Array{UInt32, 1})

julia> taskvec === task_state()
true

julia> taskvec === fetch(@async task_state())
Making lazy task value...done.
false
```
"""
mutable struct OncePerTask{T, F}
const initializer::F

OncePerTask{T}(initializer::F) where {T, F} = new{T,F}(initializer)
OncePerTask{T,F}(initializer::F) where {T, F} = new{T,F}(initializer)
OncePerTask(initializer) = new{Base.promote_op(initializer), typeof(initializer)}(initializer)
end
@inline (once::OncePerTask)() = get!(once.initializer, task_local_storage(), once)
7 changes: 0 additions & 7 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,6 @@ macro task(ex)
:(Task($thunk))
end

"""
current_task()

Get the currently running [`Task`](@ref).
"""
current_task() = ccall(:jl_get_current_task, Ref{Task}, ())

# task states

const task_state_runnable = UInt8(0)
Expand Down
3 changes: 3 additions & 0 deletions doc/src/base/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Main.include
Base.include_string
Base.include_dependency
__init__
Base.OncePerProcess
Base.OncePerTask
Base.OncePerThread
Base.which(::Any, ::Any)
Base.methods
Base.@show
Expand Down
2 changes: 1 addition & 1 deletion src/builtins.c
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ static inline size_t get_checked_fieldindex(const char *name, jl_datatype_t *st,
else {
jl_value_t *ts[2] = {(jl_value_t*)jl_long_type, (jl_value_t*)jl_symbol_type};
jl_value_t *t = jl_type_union(ts, 2);
jl_type_error("getfield", t, arg);
jl_type_error(name, t, arg);
}
if (mutabl && jl_field_isconst(st, idx)) {
jl_errorf("%s: const field .%s of type %s cannot be changed", name,
Expand Down
2 changes: 2 additions & 0 deletions src/gc-stock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2741,6 +2741,8 @@ static void gc_mark_roots(jl_gc_markqueue_t *mq)
gc_heap_snapshot_record_gc_roots((jl_value_t*)jl_global_roots_list, "global_roots_list");
gc_try_claim_and_push(mq, jl_global_roots_keyset, NULL);
gc_heap_snapshot_record_gc_roots((jl_value_t*)jl_global_roots_keyset, "global_roots_keyset");
gc_try_claim_and_push(mq, precompile_field_replace, NULL);
gc_heap_snapshot_record_gc_roots((jl_value_t*)precompile_field_replace, "precompile_field_replace");
}

// find unmarked objects that need to be finalized from the finalizer list "list".
Expand Down
2 changes: 2 additions & 0 deletions src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,8 @@ extern jl_genericmemory_t *jl_global_roots_keyset JL_GLOBALLY_ROOTED;
extern arraylist_t *jl_entrypoint_mis;
JL_DLLEXPORT int jl_is_globally_rooted(jl_value_t *val JL_MAYBE_UNROOTED) JL_NOTSAFEPOINT;
JL_DLLEXPORT jl_value_t *jl_as_global_root(jl_value_t *val, int insert) JL_GLOBALLY_ROOTED;
extern jl_svec_t *precompile_field_replace JL_GLOBALLY_ROOTED;
JL_DLLEXPORT void jl_set_precompile_field_replace(jl_value_t *val, jl_value_t *field, jl_value_t *newval) JL_GLOBALLY_ROOTED;

jl_opaque_closure_t *jl_new_opaque_closure(jl_tupletype_t *argt, jl_value_t *rt_lb, jl_value_t *rt_ub,
jl_value_t *source, jl_value_t **env, size_t nenv, int do_compile);
Expand Down
Loading