Skip to content

Commit 7bd1949

Browse files
Eric ForgyEric Forgy
authored andcommitted
Add ReadyState, fix is_websocket_upgrade and add tests.
1 parent 4aa53a4 commit 7bd1949

File tree

2 files changed

+54
-35
lines changed

2 files changed

+54
-35
lines changed

src/WebSockets.jl

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,13 @@ using HTTP: header
1010
import ..@debug, ..DEBUG_LEVEL, ..@require, ..precondition_error
1111

1212

13-
1413
const WS_FINAL = 0x80
1514
const WS_CONTINUATION = 0x00
1615
const WS_TEXT = 0x01
1716
const WS_BINARY = 0x02
1817
const WS_CLOSE = 0x08
1918
const WS_PING = 0x09
2019
const WS_PONG = 0x0A
21-
2220
const WS_MASK = 0x80
2321

2422

@@ -37,29 +35,33 @@ struct WebSocketHeader
3735
end
3836

3937

38+
@enum ReadyState CONNECTED=0x1 CLOSING=0x2 CLOSED=0x3
39+
40+
4041
mutable struct WebSocket{T <: IO} <: IO
4142
io::T
4243
frame_type::UInt8
4344
server::Bool
4445
rxpayload::Vector{UInt8}
4546
txpayload::Vector{UInt8}
46-
txclosed::Bool
47-
rxclosed::Bool
47+
state::ReadyState
4848
end
4949

50+
5051
function WebSocket(io::T; server=false, binary=false) where T <: IO
5152
WebSocket{T}(io, binary ? WS_BINARY : WS_TEXT, server,
52-
UInt8[], UInt8[], false, false)
53+
UInt8[], UInt8[], CONNECTED)
5354
end
5455

5556

5657

5758
# Handshake
5859

60+
5961
is_websocket_upgrade(r::HTTP.Message) =
6062
(r isa HTTP.Request && r.method == "GET" || r.status == 101) &&
6163
HTTP.hasheader(r, "Connection", "upgrade") &&
62-
HTTP.hasheader(r, "Upgrade", "webscoket")
64+
HTTP.hasheader(r, "Upgrade", "websocket")
6365

6466

6567
function check_upgrade(http)
@@ -168,26 +170,18 @@ function Base.write(ws::WebSocket, x1, x2, xs...)
168170
end
169171

170172

171-
function IOExtras.closewrite(ws::WebSocket)
172-
@require !ws.txclosed
173-
opcode = WS_FINAL | WS_CLOSE
174-
@debug 1 "WebSocket ⬅️ $(WebSocketHeader(opcode, 0x00))"
175-
write(ws.io, opcode, 0x00)
176-
ws.txclosed = true
177-
end
178-
179-
180173
wslength(l) = l < 0x7E ? (UInt8(l), UInt8[]) :
181174
l <= 0xFFFF ? (0x7E, reinterpret(UInt8, [hton(UInt16(l))])) :
182175
(0x7F, reinterpret(UInt8, [hton(UInt64(l))]))
183176

184177

185178
wswrite(ws::WebSocket, x) = wswrite(ws, WS_FINAL | ws.frame_type, x)
186179

180+
187181
wswrite(ws::WebSocket, opcode::UInt8, x) = wswrite(ws, opcode, Vector{UInt8}(x))
188182

189-
function wswrite(ws::WebSocket, opcode::UInt8, bytes::Vector{UInt8})
190183

184+
function wswrite(ws::WebSocket, opcode::UInt8, bytes::Vector{UInt8})
191185
n = length(bytes)
192186
len, extended_len = wslength(n)
193187
if ws.server
@@ -218,23 +212,28 @@ end
218212

219213

220214
function Base.close(ws::WebSocket)
221-
if !ws.txclosed
222-
closewrite(ws)
223-
end
224-
while !ws.rxclosed
215+
@require ws.state == CONNECTED
216+
opcode = WS_FINAL | WS_CLOSE
217+
@debug 1 "WebSocket ⬅️ $(WebSocketHeader(opcode, 0x00))"
218+
write(ws.io, opcode, 0x00)
219+
ws.state = CLOSING
220+
println("(close) $(ws.server ? "Server" : "Client"): $(ws.state)")
221+
while !eof(ws) && ws.state == CLOSING
225222
readframe(ws)
226223
end
227224
end
228225

229226

230-
Base.isopen(ws::WebSocket) = !ws.rxclosed
227+
Base.isopen(ws::WebSocket) = (ws.state == CONNECTED) && isopen(ws.io)
231228

232229

233230

234231
# Receiving Frames
235232

233+
236234
Base.eof(ws::WebSocket) = eof(ws.io)
237235

236+
238237
Base.readavailable(ws::WebSocket) = collect(readframe(ws))
239238

240239

@@ -265,14 +264,18 @@ function readframe(ws::WebSocket)
265264
end
266265

267266
if h.opcode == WS_CLOSE
268-
ws.rxclosed = true
269267
if h.length >= 2
270268
status = UInt16(ws.rxpayload[1]) << 8 | ws.rxpayload[2]
271269
if status != 1000
272270
message = String(ws.rxpayload[3:h.length])
273271
throw(WebSocketError(status, message))
274272
end
275273
end
274+
if ws.state == CONNECTED
275+
close(ws)
276+
end
277+
ws.state = CLOSED
278+
close(ws.io)
276279
return UInt8[]
277280
elseif h.opcode == WS_PING
278281
write(ws.io, [WS_PONG, 0x00])
@@ -294,6 +297,7 @@ function WebSocketHeader(bytes...)
294297
return readheader(io)
295298
end
296299

300+
297301
function Base.show(io::IO, h::WebSocketHeader)
298302
print(io, "WebSocketHeader(",
299303
h.opcode == WS_CONTINUATION ? "CONTINUATION" :

test/WebSockets.jl

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,39 @@ using HTTP.IOExtras
55
@testset "WebSockets" begin
66

77
for s in ["ws", "wss"]
8+
info("Testing $(s)...")
9+
HTTP.WebSockets.open("$s://echo.websocket.org") do ws
10+
write(ws, HTTP.bytes("Foo"))
11+
@test !eof(ws)
12+
@test String(readavailable(ws)) == "Foo"
813

9-
HTTP.WebSockets.open("$s://echo.websocket.org") do io
10-
write(io, HTTP.bytes("Foo"))
11-
@test !eof(io)
12-
@test String(readavailable(io)) == "Foo"
13-
14-
write(io, HTTP.bytes("Hello"))
15-
write(io, " There")
16-
write(io, " World", "!")
17-
closewrite(io)
14+
close(ws)
15+
end
16+
end
1817

19-
buf = IOBuffer()
20-
write(buf, io)
21-
@test String(take!(buf)) == "Hello There World!"
2218

23-
close(io)
19+
p = 8000
20+
@async HTTP.listen(ip"127.0.0.1",p) do http
21+
if HTTP.WebSockets.is_websocket_upgrade(http.message)
22+
HTTP.WebSockets.upgrade(http) do ws
23+
data = ""
24+
while !eof(ws);
25+
data = String(readavailable(ws))
26+
write(ws,data)
27+
end
28+
end
2429
end
30+
end
31+
32+
sleep(2)
33+
34+
info("Testing local server...")
35+
HTTP.WebSockets.open("ws://127.0.0.1:$(p)") do ws
36+
write(ws, HTTP.bytes("Foo"))
37+
@test !eof(ws)
38+
@test String(readavailable(ws)) == "Foo"
2539

40+
close(ws)
2641
end
2742

2843
end # testset

0 commit comments

Comments
 (0)