-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathsynchronizer.jl
123 lines (110 loc) · 3.38 KB
/
synchronizer.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
OrderedSynchronizer(i=1)
A threadsafe synchronizer that allows ensuring concurrent work is done
in a specific order. The `OrderedSynchronizer` is initialized with an
integer `i` that represents the current "order" of the synchronizer.
Work is "scheduled" by calling `put!(f, x, i)`, where `f` is a function
that will be called like `f()` when the synchronizer is at order `i`,
and will otherwise wait until other calls to `put!` have finished
to bring the synchronizer's state to `i`. Once `f()` is called, the
synchronizer's state is incremented by 1 and any waiting `put!` calls
check to see if it's their turn to execute.
A synchronizer's state can be reset to a specific value (1 by default)
by calling `reset!(x, i)`.
"""
mutable struct OrderedSynchronizer
coordinating_task::Task
cond::Threads.Condition
i::Int
@static if VERSION < v"1.7"
closed::Threads.Atomic{Bool}
else
@atomic closed::Bool
end
end
@static if VERSION < v"1.7"
OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, Threads.Atomic{Bool}(false))
else
OrderedSynchronizer(i=1) = OrderedSynchronizer(current_task(), Threads.Condition(), i, false)
end
"""
reset!(x::OrderedSynchronizer, i=1)
Reset the state of `x` to `i`.
"""
function reset!(x::OrderedSynchronizer, i=1)
Base.@lock x.cond begin
x.i = i
@static if VERSION < v"1.7"
x.closed[] = false
else
@atomic :monotonic x.closed = false
end
end
end
function Base.close(x::OrderedSynchronizer, excp::Exception=closed_exception())
Base.@lock x.cond begin
@static if VERSION < v"1.7"
x.closed[] = true
else
@atomic :monotonic x.closed = true
end
Base.notify_error(x.cond, excp)
end
return
end
@static if VERSION < v"1.7"
Base.isopen(x::OrderedSynchronizer) = !x.closed[]
else
Base.isopen(x::OrderedSynchronizer) = !(@atomic :monotonic x.closed)
end
closed_exception() = InvalidStateException("OrderedSynchronizer is closed.", :closed)
function check_closed(x::OrderedSynchronizer)
if !isopen(x)
# if the monotonic load succeed, now do an acquire fence
@static if VERSION < v"1.7"
!x.closed[] && Base.concurrency_violation()
else
!(@atomic :acquire x.closed) && Base.concurrency_violation()
end
throw(closed_exception())
end
end
"""
put!(f::Function, x::OrderedSynchronizer, i::Int, incr::Int=1)
Schedule `f` to be called when `x` is at order `i`. Note that `put!`
will block until `f` is executed. The typical usage involves something
like:
```julia
x = OrderedSynchronizer()
@sync for i = 1:N
Threads.@spawn begin
# do some concurrent work
# once work is done, schedule synchronization
put!(x, \$i) do
# report back result of concurrent work
# won't be executed until all `i-1` calls to `put!` have already finished
end
end
end
```
The `incr` argument controls how much the synchronizer's state is
incremented after `f` is called. By default, `incr` is 1.
"""
function Base.put!(f, x::OrderedSynchronizer, i, incr=1)
check_closed(x)
Base.@lock x.cond begin
# wait until we're ready to execute f
while x.i != i
check_closed(x)
wait(x.cond)
end
check_closed(x)
try
f()
catch e
Base.throwto(x.coordinating_task, e)
end
x.i += incr
notify(x.cond)
end
end