Skip to content

Commit

Permalink
feat(server): spawn task sooner in listenloop
Browse files Browse the repository at this point in the history
  • Loading branch information
pankgeorg committed Sep 2, 2023
1 parent 8f35185 commit f804214
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 26 deletions.
6 changes: 3 additions & 3 deletions docs/examples/cors_server.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const CORS_RES_HEADERS = ["Access-Control-Allow-Origin" => "*"]
#=
JSONMiddleware minimizes code by automatically converting the request body
to JSON to pass to the other service functions automatically. JSONMiddleware
recieves the body of the response from the other service funtions and sends
receives the body of the response from the other service funtions and sends
back a success response code
=#
function JSONMiddleware(handler)
Expand All @@ -65,9 +65,9 @@ function JSONMiddleware(handler)
end

#= CorsMiddleware: handles preflight request with the OPTIONS flag
If a request was recieved with the correct headers, then a response will be
If a request was received with the correct headers, then a response will be
sent back with a 200 code, if the correct headers were not specified in the request,
then a CORS error will be recieved on the client side
then a CORS error will be received on the client side
Since each request passes throught the CORS Handler, then if the request is
not a preflight request, it will simply go to the JSONMiddleware to be passed to the
Expand Down
78 changes: 56 additions & 22 deletions src/Servers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export listen, listen!, Server, forceclose, port

using Sockets, Logging, LoggingExtras, MbedTLS, Dates
using MbedTLS: SSLContext, SSLConfig
using ConcurrentUtilities: Lockable, lock
using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions
import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str

Expand Down Expand Up @@ -83,10 +84,19 @@ accept(s::Listener{SSLConfig}) = getsslcontext(Sockets.accept(s.server), s.ssl)

function getsslcontext(tcp, sslconfig)
try
handshake_done = Ref{Bool}(false)
ssl = MbedTLS.SSLContext()
MbedTLS.setup!(ssl, sslconfig)
MbedTLS.associate!(ssl, tcp)
MbedTLS.handshake!(ssl)
handshake_task = @async begin
MbedTLS.handshake!(ssl)
handshake_done[] = true
end
timedwait(5.0) do
handshake_done[] || istaskdone(handshake_task)
end
!istaskdone(handshake_task) && wait(handshake_task)
handshake_done[] || throw(Base.IOError("SSL handshake timed out", Base.ETIMEDOUT))
return ssl
catch e
@try Base.IOError close(tcp)
Expand Down Expand Up @@ -363,31 +373,55 @@ Accepts new tcp connections and spawns async tasks to handle them."
function listenloop(f, listener, conns, tcpisvalid,
max_connections, readtimeout, access_log, ready_to_accept, verbose)
sem = Base.Semaphore(max_connections)
ssl = Lockable(listener.ssl)
connections = Lockable(conns)
verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())"
notify(ready_to_accept)
while isopen(listener)
try
Base.acquire(sem)
io = accept(listener)
if io === nothing
@warnv 1 "unable to accept new connection"
continue
elseif !tcpisvalid(io)
@warnv 1 "!tcpisvalid: $io"
close(io)
continue
end
conn = Connection(io)
conn.state = IDLE
push!(conns, conn)
conn.host, conn.port = listener.hostname, listener.hostport
@async try
handle_connection(f, conn, listener, readtimeout, access_log)
finally
# handle_connection is in charge of closing the underlying io
delete!(conns, conn)
Base.release(sem)
end
io = Sockets.accept(listener.server)
Threads.@spawn begin
local conn = nothing
isssl = !isnothing(listener.ssl)
try
if io === nothing
@warnv 1 "unable to accept new connection"
return
end
if isssl
io = lock(ssl) do ssl
return getsslcontext(io, ssl)
end
end
if !tcpisvalid(io)
close(io)
return
end
conn = Connection(io)
conn.state = IDLE
lock(connections) do conns
push!(conns, conn)
end
conn.host, conn.port = listener.hostname, listener.hostport
handle_connection(f, conn, listener, readtimeout, access_log)
catch e
if e isa Base.IOError && e.code == Base.UV_ECONNABORTED
verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing"
else
@errorv 2 "Server on $(listener.hostname):$(listener.hostport) errored" exception=(e, catch_backtrace())
# quick little sleep in case there's a temporary
# local error accepting and this might help avoid quickly re-erroring
sleep(0.05 + rand() * 0.05)
end
# handle_connection is in charge of closing the underlying io, but it may not get there
finally
!isnothing(conn) && lock(connections) do conns
delete!(conns, conn)
end
Base.release(sem)
end
end # Task.@spawn
catch e
if e isa Base.IOError && e.code == Base.UV_ECONNABORTED
verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing"
Expand Down Expand Up @@ -442,7 +476,7 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log)
request.response.status = 200

try
# invokelatest becuase the perf is negligible, but this makes live-editing handlers more Revise friendly
# invokelatest because the perf is negligible, but this makes live-editing handlers more Revise friendly
@debugv 1 "invoking handler"
Base.invokelatest(f, http)
# If `startwrite()` was never called, throw an error so we send a 500 and log this
Expand Down
2 changes: 1 addition & 1 deletion src/WebSockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ function Base.close(ws::WebSocket, body::CloseFrameBody=CloseFrameBody(1000, "")
ws.readclosed = true
end
end
# we either recieved the responding CLOSE frame and readclosed was set
# we either received the responding CLOSE frame and readclosed was set
# or there was an error/timeout reading it; in any case, readclosed should be closed now
@assert ws.readclosed
# if we're the server, it's our job to close the underlying socket
Expand Down

0 comments on commit f804214

Please sign in to comment.