-
Notifications
You must be signed in to change notification settings - Fork 46
/
core.jl
186 lines (155 loc) · 4.51 KB
/
core.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import Base: push!, eltype, close
export Signal, Input, Node, push!, value, close
##### Node #####
const debug_memory = false # Set this to true to debug gc of nodes
const nodes = WeakKeyDict()
const io_lock = ReentrantLock()
if !debug_memory
type Node{T}
value::T
parents::Tuple
actions::Vector
alive::Bool
end
else
type Node{T}
value::T
parents::Tuple
actions::Vector
alive::Bool
bt
function Node(v, parents, actions, alive)
n=new(v,parents,actions,alive,backtrace())
nodes[n] = nothing
finalizer(n, log_gc)
n
end
end
end
log_gc(n) =
@async begin
lock(io_lock)
print(STDERR, "Node got gc'd. Creation backtrace:")
Base.show_backtrace(STDERR, n.bt)
println(STDOUT)
unlock(io_lock)
end
immutable Action
recipient::Node
f::Function
end
isrequired(a::Action) = a.recipient.alive
Node{T}(x::T, parents=()) = Node{T}(x, parents, Action[], true)
Node{T}(::Type{T}, x, parents=()) = Node{T}(x, parents, Action[], true)
# preserve/unpreserve nodes from gc
const _nodes = ObjectIdDict()
preserve(x::Node) = (_nodes[x] = get(_nodes,x,0)+1; x)
function unpreserve(x::Node)
v = _nodes[x]
v == 1 ? pop!(_nodes,x) : (_nodes[x] = v-1)
nothing
end
typealias Signal Node
typealias Input Node
Base.show(io::IO, n::Node) =
write(io, "Node{$(eltype(n))}($(n.value), nactions=$(length(n.actions))$(n.alive ? "" : ", closed"))")
value(n::Node) = n.value
eltype{T}(::Node{T}) = T
eltype{T}(::Type{Node{T}}) = T
##### Connections #####
function add_action!(f, node, recipient)
push!(node.actions, Action(recipient, f))
end
function remove_action!(f, node, recipient)
node.actions = filter(a -> a.f != f, node.actions)
end
function close(n::Node, warn_nonleaf=true)
finalize(n) # stop timer etc.
n.alive = false
if !isempty(n.actions)
any(map(isrequired, n.actions)) && warn_nonleaf &&
warn("closing a non-leaf node is not a good idea")
empty!(n.actions)
end
end
function send_value!(node::Node, x, timestep)
# Dead node?
!node.alive && return
# Set the value and do actions
node.value = x
for action in node.actions
do_action(action, timestep)
end
end
send_value!(wr::WeakRef, x, timestep) = send_value!(wr.value, x, timestep)
do_action(a::Action, timestep) =
isrequired(a) && a.f(a.recipient, timestep)
# If any actions have been gc'd, remove them
cleanup_actions(node::Node) =
node.actions = filter(isrequired, node.actions)
##### Messaging #####
const CHANNEL_SIZE = 1024
# Global channel for signal updates
const _messages = Channel{Any}(CHANNEL_SIZE)
# queue an update. meta comes back in a ReactiveException if there is an error
Base.push!(n::Node, x, onerror=print_error) = _push!(n, x, onerror)
function _push!(n, x, onerror=print_error)
taken = Base.n_avail(_messages)
if taken >= CHANNEL_SIZE
warn("Message queue is full. Ordering may be incorrect.")
@async put!(_messages, (n, x, onerror))
else
put!(_messages, (n, x, onerror))
end
nothing
end
# remove messages from the channel and propagate them
global run
let timestep = 0
function run(steps=typemax(Int))
runner_task = current_task()::Task
local waiting, node, value, onerror, iter = 1
try
while iter <= steps
timestep += 1
iter += 1
waiting = true
(node, value, onerror) = take!(_messages)
waiting = false
send_value!(node, value, timestep)
end
catch err
if isa(err, InterruptException)
println("Reactive event loop was inturrupted.")
rethrow()
else
bt = catch_backtrace()
onerror(node, value, CapturedException(err, bt))
end
end
end
end
# Default error handler function
function print_error(node, value, ex)
lock(io_lock)
io = STDERR
println(io, "Failed to push!")
print(io, " ")
show(io, value)
println(io)
println(io, "to node")
print(io, " ")
show(io, node)
println(io)
showerror(io, ex)
println(io)
unlock(io_lock)
end
# Run everything queued up till the instant of calling this function
run_till_now() = run(Base.n_avail(_messages))
# A decent default runner task
function __init__()
global runner_task = @async begin
Reactive.run()
end
end