From fd18b80f8dc95267618876c8acbcc4ed9a8f1091 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sat, 2 Sep 2023 10:15:07 +0300 Subject: [PATCH 1/4] feat(server): spawn task sooner in listenloop --- docs/examples/cors_server.jl | 6 ++-- src/Servers.jl | 69 ++++++++++++++++++++++++------------ src/WebSockets.jl | 2 +- 3 files changed, 51 insertions(+), 26 deletions(-) diff --git a/docs/examples/cors_server.jl b/docs/examples/cors_server.jl index 2275b3619..ad6765cbc 100644 --- a/docs/examples/cors_server.jl +++ b/docs/examples/cors_server.jl @@ -39,7 +39,7 @@ const CORS_RES_HEADERS = ["Access-Control-Allow-Origin" => "*"] #= JSONMiddleware minimizes code by automatically converting the request body to JSON to pass to the other service functions automatically. JSONMiddleware -recieves the body of the response from the other service funtions and sends +receives the body of the response from the other service funtions and sends back a success response code =# function JSONMiddleware(handler) @@ -65,9 +65,9 @@ function JSONMiddleware(handler) end #= CorsMiddleware: handles preflight request with the OPTIONS flag -If a request was recieved with the correct headers, then a response will be +If a request was received with the correct headers, then a response will be sent back with a 200 code, if the correct headers were not specified in the request, -then a CORS error will be recieved on the client side +then a CORS error will be received on the client side Since each request passes throught the CORS Handler, then if the request is not a preflight request, it will simply go to the JSONMiddleware to be passed to the diff --git a/src/Servers.jl b/src/Servers.jl index 6afb16590..89fdb4b64 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -13,6 +13,7 @@ export listen, listen!, Server, forceclose, port using Sockets, Logging, LoggingExtras, MbedTLS, Dates using MbedTLS: SSLContext, SSLConfig +using ConcurrentUtilities: Lockable, lock using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str @@ -83,10 +84,19 @@ accept(s::Listener{SSLConfig}) = getsslcontext(Sockets.accept(s.server), s.ssl) function getsslcontext(tcp, sslconfig) try + handshake_done = Ref{Bool}(false) ssl = MbedTLS.SSLContext() MbedTLS.setup!(ssl, sslconfig) MbedTLS.associate!(ssl, tcp) - MbedTLS.handshake!(ssl) + handshake_task = @async begin + MbedTLS.handshake!(ssl) + handshake_done[] = true + end + timedwait(5.0) do + handshake_done[] || istaskdone(handshake_task) + end + !istaskdone(handshake_task) && wait(handshake_task) + handshake_done[] || throw(Base.IOError("SSL handshake timed out", Base.ETIMEDOUT)) return ssl catch e @try Base.IOError close(tcp) @@ -363,31 +373,46 @@ Accepts new tcp connections and spawns async tasks to handle them." function listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose) sem = Base.Semaphore(max_connections) + ssl = Lockable(listener.ssl) + connections = Lockable(conns) verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())" notify(ready_to_accept) while isopen(listener) try Base.acquire(sem) - io = accept(listener) - if io === nothing - @warnv 1 "unable to accept new connection" - continue - elseif !tcpisvalid(io) - @warnv 1 "!tcpisvalid: $io" - close(io) - continue - end - conn = Connection(io) - conn.state = IDLE - push!(conns, conn) - conn.host, conn.port = listener.hostname, listener.hostport - @async try - handle_connection(f, conn, listener, readtimeout, access_log) - finally - # handle_connection is in charge of closing the underlying io - delete!(conns, conn) - Base.release(sem) - end + io = Sockets.accept(listener.server) + Threads.@spawn begin + local conn = nothing + isssl = !isnothing(listener.ssl) + try + if io === nothing + @warnv 1 "unable to accept new connection" + return + end + if isssl + io = lock(ssl) do ssl + return getsslcontext(io, ssl) + end + end + if !tcpisvalid(io) + close(io) + return + end + conn = Connection(io) + conn.state = IDLE + lock(connections) do conns + push!(conns, conn) + end + conn.host, conn.port = listener.hostname, listener.hostport + handle_connection(f, conn, listener, readtimeout, access_log) + finally + # handle_connection is in charge of closing the underlying io, but it may not get there + !isnothing(conn) && lock(connections) do conns + delete!(conns, conn) + end + Base.release(sem) + end + end # Task.@spawn catch e if e isa Base.IOError && e.code == Base.UV_ECONNABORTED verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing" @@ -442,7 +467,7 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) request.response.status = 200 try - # invokelatest becuase the perf is negligible, but this makes live-editing handlers more Revise friendly + # invokelatest because the perf is negligible, but this makes live-editing handlers more Revise friendly @debugv 1 "invoking handler" Base.invokelatest(f, http) # If `startwrite()` was never called, throw an error so we send a 500 and log this diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 56b6933a9..2fbf6a07c 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -587,7 +587,7 @@ function Base.close(ws::WebSocket, body::CloseFrameBody=CloseFrameBody(1000, "") ws.readclosed = true end end - # we either recieved the responding CLOSE frame and readclosed was set + # we either received the responding CLOSE frame and readclosed was set # or there was an error/timeout reading it; in any case, readclosed should be closed now @assert ws.readclosed # if we're the server, it's our job to close the underlying socket From 95da70f8db1e0b9eafc272dda837cc7e98874bea Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sat, 2 Sep 2023 12:15:52 +0300 Subject: [PATCH 2/4] feat(Servers): empty backlog before creating tasks --- src/Servers.jl | 108 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 75 insertions(+), 33 deletions(-) diff --git a/src/Servers.jl b/src/Servers.jl index 89fdb4b64..41b69cae3 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -13,7 +13,7 @@ export listen, listen!, Server, forceclose, port using Sockets, Logging, LoggingExtras, MbedTLS, Dates using MbedTLS: SSLContext, SSLConfig -using ConcurrentUtilities: Lockable, lock +using ConcurrentUtilities: ConcurrentUtilities, Lockable, lock using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str @@ -366,6 +366,47 @@ function listen!(f, listener::Listener; return Server(listener, on_shutdown, conns, tsk) end +using Base: iolock_begin, iolock_end, uv_error, preserve_handle, unpreserve_handle, + StatusClosing, StatusClosed, StatusActive, UV_EAGAIN, UV_ECONNABORTED +using Sockets: accept_nonblock + +function acceptmany(server; MAXSIZE=Sockets.BACKLOG_DEFAULT) + result = Vector{TCPSocket}() + sizehint!(result, MAXSIZE) + iolock_begin() + if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed + throw(ArgumentError("server not connected, make sure \"listen\" has been called")) + end + while isopen(server) + client = TCPSocket() + err = Sockets.accept_nonblock(server, client) + while err == 0 && length(result) < MAXSIZE # Don't try to fill more than half the buffer + push!(result, client) + client = TCPSocket() + err = Sockets.accept_nonblock(server, client) + end + if length(result) > 0 + iolock_end() + return result + end + if err != UV_EAGAIN + uv_error("accept", err) + end + preserve_handle(server) + lock(server.cond) + iolock_end() + try + wait(server.cond) + finally + unlock(server.cond) + unpreserve_handle(server) + end + iolock_begin() + end + uv_error("accept", UV_ECONNABORTED) + nothing +end + """" Main server loop. Accepts new tcp connections and spawns async tasks to handle them." @@ -379,40 +420,41 @@ function listenloop(f, listener, conns, tcpisvalid, notify(ready_to_accept) while isopen(listener) try - Base.acquire(sem) - io = Sockets.accept(listener.server) - Threads.@spawn begin - local conn = nothing - isssl = !isnothing(listener.ssl) - try - if io === nothing - @warnv 1 "unable to accept new connection" - return - end - if isssl - io = lock(ssl) do ssl - return getsslcontext(io, ssl) + for io in acceptmany(listener.server) + @async begin + max_connections < typemax(Int) && Base.acquire(sem) + local conn = nothing + isssl = !isnothing(listener.ssl) + try + if io === nothing + @warnv 1 "unable to accept new connection" + return end + if isssl + io = lock(ssl) do ssl + return getsslcontext(io, ssl) + end + end + if !tcpisvalid(io) + close(io) + return + end + conn = Connection(io) + conn.state = IDLE + lock(connections) do conns + push!(conns, conn) + end + conn.host, conn.port = listener.hostname, listener.hostport + handle_connection(f, conn, listener, readtimeout, access_log) + finally + # handle_connection is in charge of closing the underlying io, but it may not get there + !isnothing(conn) && lock(connections) do conns + delete!(conns, conn) + end + max_connections < typemax(Int) && Base.release(sem) end - if !tcpisvalid(io) - close(io) - return - end - conn = Connection(io) - conn.state = IDLE - lock(connections) do conns - push!(conns, conn) - end - conn.host, conn.port = listener.hostname, listener.hostport - handle_connection(f, conn, listener, readtimeout, access_log) - finally - # handle_connection is in charge of closing the underlying io, but it may not get there - !isnothing(conn) && lock(connections) do conns - delete!(conns, conn) - end - Base.release(sem) - end - end # Task.@spawn + end # Task.@spawn + end catch e if e isa Base.IOError && e.code == Base.UV_ECONNABORTED verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing" From f719bf64c9e4eab2492e23b1c1f8fec605b3972d Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sat, 2 Sep 2023 12:19:15 +0300 Subject: [PATCH 3/4] fix comment --- src/Servers.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Servers.jl b/src/Servers.jl index 41b69cae3..cfd625e19 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -453,7 +453,7 @@ function listenloop(f, listener, conns, tcpisvalid, end max_connections < typemax(Int) && Base.release(sem) end - end # Task.@spawn + end # @async end catch e if e isa Base.IOError && e.code == Base.UV_ECONNABORTED From 811b6b347aa698ef5053bfd85b5b62dc3ec14502 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sat, 2 Sep 2023 13:59:25 +0300 Subject: [PATCH 4/4] limit connections before async --- src/HTTP.jl | 1 + src/Servers.jl | 46 ++++------------------------------------------ src/accept.jl | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 42 deletions(-) create mode 100644 src/accept.jl diff --git a/src/HTTP.jl b/src/HTTP.jl index cc3d027b8..231b1368d 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -77,6 +77,7 @@ include("clientlayers/ConnectionRequest.jl"); using .ConnectionRequest include("clientlayers/StreamRequest.jl"); using .StreamRequest include("download.jl") +include("accept.jl") include("Servers.jl") ;using .Servers; using .Servers: listen include("Handlers.jl") ;using .Handlers; using .Handlers: serve include("parsemultipart.jl") ;using .MultiPartParsing: parse_multipart_form diff --git a/src/Servers.jl b/src/Servers.jl index cfd625e19..b9e7a59a2 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -16,6 +16,7 @@ using MbedTLS: SSLContext, SSLConfig using ConcurrentUtilities: ConcurrentUtilities, Lockable, lock using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str +using ..Accept: acceptmany TRUE(x) = true getinet(host::String, port::Integer) = Sockets.InetAddr(parse(IPAddr, host), port) @@ -366,47 +367,6 @@ function listen!(f, listener::Listener; return Server(listener, on_shutdown, conns, tsk) end -using Base: iolock_begin, iolock_end, uv_error, preserve_handle, unpreserve_handle, - StatusClosing, StatusClosed, StatusActive, UV_EAGAIN, UV_ECONNABORTED -using Sockets: accept_nonblock - -function acceptmany(server; MAXSIZE=Sockets.BACKLOG_DEFAULT) - result = Vector{TCPSocket}() - sizehint!(result, MAXSIZE) - iolock_begin() - if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed - throw(ArgumentError("server not connected, make sure \"listen\" has been called")) - end - while isopen(server) - client = TCPSocket() - err = Sockets.accept_nonblock(server, client) - while err == 0 && length(result) < MAXSIZE # Don't try to fill more than half the buffer - push!(result, client) - client = TCPSocket() - err = Sockets.accept_nonblock(server, client) - end - if length(result) > 0 - iolock_end() - return result - end - if err != UV_EAGAIN - uv_error("accept", err) - end - preserve_handle(server) - lock(server.cond) - iolock_end() - try - wait(server.cond) - finally - unlock(server.cond) - unpreserve_handle(server) - end - iolock_begin() - end - uv_error("accept", UV_ECONNABORTED) - nothing -end - """" Main server loop. Accepts new tcp connections and spawns async tasks to handle them." @@ -421,8 +381,10 @@ function listenloop(f, listener, conns, tcpisvalid, while isopen(listener) try for io in acceptmany(listener.server) + # I would prefer this inside the async, so we can loop and accept again, + # but https://github.com/JuliaWeb/HTTP.jl/pull/647/files says it's bad for performance + max_connections < typemax(Int) && Base.acquire(sem) @async begin - max_connections < typemax(Int) && Base.acquire(sem) local conn = nothing isssl = !isnothing(listener.ssl) try diff --git a/src/accept.jl b/src/accept.jl new file mode 100644 index 000000000..69afd5e64 --- /dev/null +++ b/src/accept.jl @@ -0,0 +1,47 @@ + +module Accept + +export acceptmany + +using Base: iolock_begin, iolock_end, uv_error, preserve_handle, unpreserve_handle, + StatusClosing, StatusClosed, StatusActive, UV_EAGAIN, UV_ECONNABORTED +using Sockets + +function acceptmany(server; MAXSIZE=Sockets.BACKLOG_DEFAULT) + result = Vector{TCPSocket}() + sizehint!(result, MAXSIZE) + iolock_begin() + if server.status != StatusActive && server.status != StatusClosing && server.status != StatusClosed + throw(ArgumentError("server not connected, make sure \"listen\" has been called")) + end + while isopen(server) + client = TCPSocket() + err = Sockets.accept_nonblock(server, client) + while err == 0 && length(result) < MAXSIZE # Don't try to fill more than half the buffer + push!(result, client) + client = TCPSocket() + err = Sockets.accept_nonblock(server, client) + end + if length(result) > 0 + iolock_end() + return result + end + if err != UV_EAGAIN + uv_error("accept", err) + end + preserve_handle(server) + lock(server.cond) + iolock_end() + try + wait(server.cond) + finally + unlock(server.cond) + unpreserve_handle(server) + end + iolock_begin() + end + uv_error("accept", UV_ECONNABORTED) + nothing +end + +end \ No newline at end of file