Skip to content

Commit

Permalink
attempt to make asynchttpserver better; fixes #15925; [backport:1.0] (#…
Browse files Browse the repository at this point in the history
…15957)

* attempt to make asynchttpserver better; fixes #15925; [backport:1.0]
* better documentation
* fixes 'nim doc'
* makes test green again
* ported to FreeRTOS
* fixes the doc rendering
  • Loading branch information
Araq authored Nov 13, 2020
2 parents c39fa0d + 02f8b11 commit 562c627
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 36 deletions.
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@

- `doAssertRaises` now correctly handles foreign exceptions.

- Added `asyncdispatch.activeDescriptors` that returns the number of currently
active async event handles/file descriptors
- Added `asyncdispatch.maxDescriptors` that returns the maximum number of
active async event handles/file descriptors.


## Language changes

- `nimscript` now handles `except Exception as e`
Expand Down
24 changes: 24 additions & 0 deletions lib/pure/asyncdispatch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1934,3 +1934,27 @@ proc waitFor*[T](fut: Future[T]): T =
poll()

fut.read

proc activeDescriptors*(): int {.inline.} =
## Returns the current number of active file descriptors for the current
## event loop. This is a cheap operation that does not involve a system call.
when defined(windows):
result = getGlobalDispatcher().handles.len
elif not defined(nimdoc):
result = getGlobalDispatcher().selector.count

when defined(posix):
import posix

when defined(linux) or defined(windows) or defined(macosx) or defined(bsd):
proc maxDescriptors*(): int {.raises: OSError.} =
## Returns the maximum number of active file descriptors for the current
## process. This involves a system call. For now `maxDescriptors` is
## supported on the following OSes: Windows, Linux, OSX, BSD.
when defined(windows):
result = 16_700_000
else:
var fdLim: RLimit
if getrlimit(RLIMIT_NOFILE, fdLim) < 0:
raiseOSError(osLastError())
result = int(fdLim.rlim_cur) - 1
106 changes: 82 additions & 24 deletions lib/pure/asynchttpserver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
##
## This HTTP server has not been designed to be used in production, but
## for testing applications locally. Because of this, when deploying your
## application in production you should use a reverse proxy (for example nginx)
## application in production you should use a reverse proxy (for example nginx)
## instead of allowing users to connect directly to this server.
##
## Basic usage
Expand All @@ -21,14 +21,27 @@
## respond to all requests with a ``200 OK`` response code and "Hello World"
## as the response body.
##
## .. code-block::nim
## import asynchttpserver, asyncdispatch
## .. code-block:: Nim
##
## var server = newAsyncHttpServer()
## proc cb(req: Request) {.async.} =
## await req.respond(Http200, "Hello World")
## import asynchttpserver, asyncdispatch
##
## waitFor server.serve(Port(8080), cb)
## proc main {.async.} =
## var server = newAsyncHttpServer()
## proc cb(req: Request) {.async.} =
## let headers = {"Date": "Tue, 29 Apr 2014 23:40:08 GMT",
## "Content-type": "text/plain; charset=utf-8"}
## await req.respond(Http200, "Hello World", headers.newHttpHeaders())
##
## server.listen Port(5555)
## while true:
## if server.shouldAcceptRequest(5):
## var (address, client) = await server.socket.acceptAddr()
## asyncCheck processClient(server, client, address, cb)
## else:
## poll()
##
## asyncCheck main()
## runForever()

import asyncnet, asyncdispatch, parseutils, uri, strutils
import httpcore
Expand Down Expand Up @@ -58,14 +71,12 @@ type
reuseAddr: bool
reusePort: bool
maxBody: int ## The maximum content-length that will be read for the body.
maxFDs: int

proc newAsyncHttpServer*(reuseAddr = true, reusePort = false,
maxBody = 8388608): AsyncHttpServer =
## Creates a new ``AsyncHttpServer`` instance.
new result
result.reuseAddr = reuseAddr
result.reusePort = reusePort
result.maxBody = maxBody
result = AsyncHttpServer(reuseAddr: reuseAddr, reusePort: reusePort, maxBody: maxBody)

proc addHeaders(msg: var string, headers: HttpHeaders) =
for k, v in headers:
Expand Down Expand Up @@ -294,13 +305,18 @@ proc processClient(server: AsyncHttpServer, client: AsyncSocket, address: string
)
if not retry: break

proc serve*(server: AsyncHttpServer, port: Port,
callback: proc (request: Request): Future[void] {.closure, gcsafe.},
address = "") {.async.} =
## Starts the process of listening for incoming HTTP connections on the
## specified address and port.
##
## When a request is made by a client the specified callback will be called.
const
nimMaxDescriptorsFallback* {.intdefine.} = 16_000 ## fallback value for \
## when `maxDescriptors` is not available.
## This can be set on the command line during compilation
## via `-d:nimMaxDescriptorsFallback=N`

proc listen*(server: AsyncHttpServer; port: Port; address = "") =
## Listen to the given port and address.
when declared(maxDescriptors):
server.maxFDs = try: maxDescriptors() except: nimMaxDescriptorsFallback
else:
server.maxFDs = nimMaxDescriptorsFallback
server.socket = newAsyncSocket()
if server.reuseAddr:
server.socket.setSockOpt(OptReuseAddr, true)
Expand All @@ -309,9 +325,44 @@ proc serve*(server: AsyncHttpServer, port: Port,
server.socket.bindAddr(port, address)
server.socket.listen()

proc shouldAcceptRequest*(server: AsyncHttpServer;
assumedDescriptorsPerRequest = 5): bool {.inline.} =
## Returns true if the process's current number of opened file
## descriptors is still within the maximum limit and so it's reasonable to
## accept yet another request.
result = assumedDescriptorsPerRequest < 0 or
(activeDescriptors() + assumedDescriptorsPerRequest < server.maxFDs)

proc acceptRequest*(server: AsyncHttpServer, port: Port,
callback: proc (request: Request): Future[void] {.closure, gcsafe.}) {.async.} =
## Accepts a single request. Write an explicit loop around this proc so that
## errors can be handled properly.
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, callback)

proc serve*(server: AsyncHttpServer, port: Port,
callback: proc (request: Request): Future[void] {.closure, gcsafe.},
address = "";
assumedDescriptorsPerRequest = 5) {.async.} =
## Starts the process of listening for incoming HTTP connections on the
## specified address and port.
##
## When a request is made by a client the specified callback will be called.
##
## If `assumedDescriptorsPerRequest` is 0 or greater the server cares about
## the process's maximum file descriptor limit. It then ensures that the
## process still has the resources for `assumedDescriptorsPerRequest`
## file descriptors before accepting a connection.
##
## You should prefer to call `acceptRequest` instead with a custom server
## loop so that you're in control over the error handling and logging.
listen server, port, address
while true:
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, callback)
if shouldAcceptRequest(server, assumedDescriptorsPerRequest):
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, callback)
else:
poll()
#echo(f.isNil)
#echo(f.repr)

Expand All @@ -320,7 +371,7 @@ proc close*(server: AsyncHttpServer) =
server.socket.close()

when not defined(testing) and isMainModule:
proc main =
proc main {.async.} =
var server = newAsyncHttpServer()
proc cb(req: Request) {.async.} =
#echo(req.reqMethod, " ", req.url)
Expand All @@ -329,6 +380,13 @@ when not defined(testing) and isMainModule:
"Content-type": "text/plain; charset=utf-8"}
await req.respond(Http200, "Hello World", headers.newHttpHeaders())

asyncCheck server.serve(Port(5555), cb)
runForever()
main()
server.listen Port(5555)
while true:
if server.shouldAcceptRequest(5):
var (address, client) = await server.socket.acceptAddr()
asyncCheck processClient(server, client, address, cb)
else:
poll()

asyncCheck main()
runForever()
4 changes: 2 additions & 2 deletions lib/pure/ioselects/ioselectors_epoll.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ when hasThreadSupport:
maxFD: int
numFD: int
fds: ptr SharedArray[SelectorKey[T]]
count: int
count*: int
Selector*[T] = ptr SelectorImpl[T]
else:
type
Expand All @@ -64,7 +64,7 @@ else:
maxFD: int
numFD: int
fds: seq[SelectorKey[T]]
count: int
count*: int
Selector*[T] = ref SelectorImpl[T]
type
SelectEventImpl = object
Expand Down
8 changes: 4 additions & 4 deletions lib/pure/ioselects/ioselectors_kqueue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ when defined(macosx) or defined(freebsd) or defined(dragonfly):
proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
newp: pointer, newplen: csize_t): cint
{.importc: "sysctl",header: """#include <sys/types.h>
#include <sys/sysctl.h>"""}
#include <sys/sysctl.h>""".}
elif defined(netbsd) or defined(openbsd):
# OpenBSD and NetBSD don't have KERN_MAXFILESPERPROC, so we are using
# KERN_MAXFILES, because KERN_MAXFILES is always bigger,
Expand All @@ -39,7 +39,7 @@ elif defined(netbsd) or defined(openbsd):
proc sysctl(name: ptr cint, namelen: cuint, oldp: pointer, oldplen: ptr csize_t,
newp: pointer, newplen: csize_t): cint
{.importc: "sysctl",header: """#include <sys/param.h>
#include <sys/sysctl.h>"""}
#include <sys/sysctl.h>""".}

when hasThreadSupport:
type
Expand All @@ -48,7 +48,7 @@ when hasThreadSupport:
maxFD: int
changes: ptr SharedArray[KEvent]
fds: ptr SharedArray[SelectorKey[T]]
count: int
count*: int
changesLock: Lock
changesSize: int
changesLength: int
Expand All @@ -61,7 +61,7 @@ else:
maxFD: int
changes: seq[KEvent]
fds: seq[SelectorKey[T]]
count: int
count*: int
sock: cint
Selector*[T] = ref SelectorImpl[T]

Expand Down
4 changes: 2 additions & 2 deletions lib/pure/ioselects/ioselectors_poll.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ when hasThreadSupport:
pollcnt: int
fds: ptr SharedArray[SelectorKey[T]]
pollfds: ptr SharedArray[TPollFd]
count: int
count*: int
lock: Lock
Selector*[T] = ptr SelectorImpl[T]
else:
Expand All @@ -31,7 +31,7 @@ else:
pollcnt: int
fds: seq[SelectorKey[T]]
pollfds: seq[TPollFd]
count: int
count*: int
Selector*[T] = ref SelectorImpl[T]

type
Expand Down
4 changes: 2 additions & 2 deletions lib/pure/ioselects/ioselectors_select.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ when hasThreadSupport:
eSet: FdSet
maxFD: int
fds: ptr SharedArray[SelectorKey[T]]
count: int
count*: int
lock: Lock
Selector*[T] = ptr SelectorImpl[T]
else:
Expand All @@ -69,7 +69,7 @@ else:
eSet: FdSet
maxFD: int
fds: seq[SelectorKey[T]]
count: int
count*: int
Selector*[T] = ref SelectorImpl[T]

type
Expand Down
4 changes: 2 additions & 2 deletions tests/errmsgs/tgcsafety.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ nimout: '''
type mismatch: got <AsyncHttpServer, Port, proc (req: Request): Future[system.void]{.locks: <unknown>.}>
but expected one of:
proc serve(server: AsyncHttpServer; port: Port;
callback: proc (request: Request): Future[void] {.closure, gcsafe.};
address = ""): owned(Future[void])
callback: proc (request: Request): Future[void] {.closure, gcsafe.};
address = ""; assumedDescriptorsPerRequest = 5): owned(Future[void])
first type mismatch at position: 3
required type for callback: proc (request: Request): Future[system.void]{.closure, gcsafe.}
but expression 'cb' is of type: proc (req: Request): Future[system.void]{.locks: <unknown>.}
Expand Down

0 comments on commit 562c627

Please sign in to comment.