-
Notifications
You must be signed in to change notification settings - Fork 14
/
reader.jl
82 lines (53 loc) · 1.37 KB
/
reader.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
module Reader
using ..Client: read_one
include("decoder.jl")
function read_msg(socket)
msg = read_one(socket)
# Assert and chop last null char
@assert length(msg) > 1 && msg[1] != pop!(msg) == 0x00
split(String(msg), '\0')
end
"""
check_msg(ib, wrap)
Process one message and dispatch the appropriate callback. **Blocking**.
"""
function check_msg(ib, w)
msg = read_msg(ib.socket)
Decoder.decode(msg, w, ib.version)
end
"""
check_all(ib, wrap, flush=false)
Process all messages waiting in the queue. **Not blocking**.
If `flush=true`, messages are read but callbacks are not dispatched.
Return number of messages processed.
"""
function check_all(ib, w, flush=false)
count = 0
while bytesavailable(ib.socket) > 0 || ib.socket.status == Base.StatusOpen # =3
msg = read_msg(ib.socket)
flush || Decoder.decode(msg, w, ib.version)
count += 1
end
count
end
"""
start_reader(ib, wrap)
Start a new [`Task`](@ref) to process messages asynchronously.
"""
function start_reader(ib, w)
@async begin
try
while true
check_msg(ib, w)
end
catch e
if e isa EOFError
@warn "connection terminated"
else
@error "exception thrown" e
end
end
@info "reader exiting"
end
end
end