Skip to content

Commit

Permalink
Merge pull request #307 from njsmith/serve
Browse files Browse the repository at this point in the history
Add high-level server helpers
  • Loading branch information
njsmith authored Aug 22, 2017
2 parents 52c2349 + ed09967 commit dce4f65
Show file tree
Hide file tree
Showing 10 changed files with 497 additions and 63 deletions.
19 changes: 0 additions & 19 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -199,25 +199,6 @@ Apache 2. See `LICENSE
bytecode level, since exiting a with block seems to expand into 3
separate bytecodes?

- start_* convention -- if you want to run it synchronously, do
async with make_nursery() as nursery:
info, task = await start_foo(nursery)
return task.result.unwrap()
we might even want to wrap this idiom up in a convenience function

for our server helper, it's a start_ function
maybe it takes listener_nursery, connection_nursery arguments, to let you
set up the graceful shutdown thing? though draining is still a
problem. I guess just a matter of setting a deadline?

- should we provide a start_nursery?

problem: an empty nursery would close itself before start_nursery
even returns!

maybe as minimal extension to the existing thing,
open_nursery(autoclose=False), only closes when cancelled?

- possible improved robustness ("quality of implementation") ideas:
- if an abort callback fails, discard that task but clean up the
others (instead of discarding all)
Expand Down
24 changes: 16 additions & 8 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,18 @@ Abstract base classes



Generic stream implementations
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Generic stream tools
~~~~~~~~~~~~~~~~~~~~

Trio currently provides one generic utility class for working with
streams. And if you want to test code that's written against the
Trio currently provides a generic helper for writing servers that
listen for connections using one or more
:class:`~trio.abc.Listener`\s, and a generic utility class for working
with streams. And if you want to test code that's written against the
streams interface, you should also check out :ref:`testing-streams` in
:mod:`trio.testing`.

.. autofunction:: serve_listeners

.. autoclass:: StapledStream
:members:
:show-inheritance:
Expand All @@ -151,15 +155,19 @@ Sockets and networking
The high-level network interface is built on top of our stream
abstraction.

.. autofunction:: open_tcp_stream

.. autofunction:: serve_tcp

.. autofunction:: open_ssl_over_tcp_stream

.. autofunction:: serve_ssl_over_tcp

.. autoclass:: SocketStream
:members:
:undoc-members:
:show-inheritance:

.. autofunction:: open_tcp_stream

.. autofunction:: open_ssl_over_tcp_stream

.. autoclass:: SocketListener
:members:
:show-inheritance:
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ pytest-cov
ipython # for the IPython traceback integration tests
pyOpenSSL # for the ssl tests
trustme # for the ssl tests
pytest-catchlog
3 changes: 3 additions & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
from ._path import *
__all__ += _path.__all__

from ._highlevel_serve_listeners import *
__all__ += _highlevel_serve_listeners.__all__

from ._highlevel_open_tcp_stream import *
__all__ += _highlevel_open_tcp_stream.__all__

Expand Down
90 changes: 86 additions & 4 deletions trio/_highlevel_open_tcp_listeners.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import sys
from math import inf

from ._highlevel_socket import SocketListener
import trio
from . import socket as tsocket

__all__ = ["open_tcp_listeners"]
__all__ = ["open_tcp_listeners", "serve_tcp"]


# Default backlog size:
Expand Down Expand Up @@ -76,7 +76,8 @@ async def open_tcp_listeners(port, *, host=None, backlog=None):
``"0.0.0.0"`` for IPv4-only and ``"::"`` for IPv6-only.
backlog (int or None): The listen backlog to use. If you leave this as
``None`` then Trio will pick a good default.
``None`` then Trio will pick a good default. (Currently: whatever
your system has configured as the maximum backlog.)
Returns:
list of :class:`SocketListener`
Expand Down Expand Up @@ -121,7 +122,7 @@ async def open_tcp_listeners(port, *, host=None, backlog=None):
sock.bind(sockaddr)
sock.listen(backlog)

listeners.append(SocketListener(sock))
listeners.append(trio.SocketListener(sock))
except:
sock.close()
raise
Expand All @@ -131,3 +132,84 @@ async def open_tcp_listeners(port, *, host=None, backlog=None):
raise

return listeners


async def serve_tcp(
handler,
port,
*,
host=None,
backlog=None,
handler_nursery=None,
task_status=trio.STATUS_IGNORED
):
"""Listen for incoming TCP connections, and for each one start a task
running ``handler(stream)``.
This is a thin convenience wrapper around :func:`open_tcp_listeners` and
:func:`serve_listeners` – see them for full details.
.. warning::
If ``handler`` raises an exception, then this function doesn't do
anything special to catch it – so by default the exception will
propagate out and crash your server. If you don't want this, then catch
exceptions inside your ``handler``, or use a ``handler_nursery`` object
that responds to exceptions in some other way.
When used with ``nursery.start`` you get back the newly opened listeners.
So, for example, if you want to start a server in your test suite and then
connect to it to check that it's working properly, you can use something
like::
from trio.testing import open_stream_to_socket_listener
async with trio.open_nursery() as nursery:
listeners = await nursery.start(serve_tcp, handler, 0)
client_stream = await open_stream_to_socket_listener(listeners[0])
# Then send and receive data on 'client', for example:
await client.send_all(b"GET / HTTP/1.0\r\n\r\n")
This avoids several common pitfalls:
1. It lets the kernel pick a random open port, so your test suite doesn't
depend on any particular port being open.
2. It waits for the server to be accepting connections on that port before
``start`` returns, so there's no race condition where the incoming
connection arrives before the server is ready.
3. It uses the Listener object to find out which port was picked, so it
can connect to the right place.
Args:
handler: The handler to start for each incoming connection. Passed to
:func:`serve_listeners`.
port: The port to listen on. Use 0 to let the kernel pick an open port.
Passed to :func:`open_tcp_listeners`.
host (str, bytes, or None): The host interface to listen on; use
``None`` to bind to the wildcard address. Passed to
:func:`open_tcp_listeners`.
backlog: The listen backlog, or None to have a good default picked.
Passed to :func:`open_tcp_listeners`.
handler_nursery: The nursery to start handlers in, or None to use an
internal nursery. Passed to :func:`serve_listeners`.
task_status: This function can be used with ``nursery.start``.
Returns:
This function only returns when cancelled.
"""
listeners = await trio.open_tcp_listeners(port, host=host, backlog=backlog)
await trio.serve_listeners(
handler,
listeners,
handler_nursery=handler_nursery,
task_status=task_status
)
129 changes: 129 additions & 0 deletions trio/_highlevel_serve_listeners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import errno
import logging
import os

import trio

__all__ = ["serve_listeners"]

# Errors that accept(2) can return, and which indicate that the system is
# overloaded
ACCEPT_CAPACITY_ERRNOS = {
errno.EMFILE,
errno.ENFILE,
errno.ENOMEM,
errno.ENOBUFS,
}

# How long to sleep when we get one of those errors
SLEEP_TIME = 0.100

# The logger we use to complain when this happens
LOGGER = logging.getLogger("trio.serve_listeners")


async def _run_handler(stream, handler):
try:
await handler(stream)
finally:
await trio.aclose_forcefully(stream)


async def _serve_one_listener(listener, handler_nursery, handler):
async with listener:
while True:
try:
stream = await listener.accept()
except OSError as exc:
if exc.errno in ACCEPT_CAPACITY_ERRNOS:
LOGGER.error(
"accept returned %s (%s); retrying in %s seconds",
errno.errorcode[exc.errno],
os.strerror(exc.errno),
SLEEP_TIME,
exc_info=True
)
await trio.sleep(SLEEP_TIME)
else:
raise
else:
handler_nursery.start_soon(_run_handler, stream, handler)


async def serve_listeners(
handler,
listeners,
*,
handler_nursery=None,
task_status=trio.STATUS_IGNORED
):
"""Listen for incoming connections on ``listeners``, and for each one
start a task running ``handler(stream)``.
.. warning::
If ``handler`` raises an exception, then this function doesn't do
anything special to catch it – so by default the exception will
propagate out and crash your server. If you don't want this, then catch
exceptions inside your ``handler``, or use a ``handler_nursery`` object
that responds to exceptions in some other way.
Args:
handler: An async callable, that will be invoked like
``handler_nursery.start_soon(handler, stream)`` for each incoming
connection.
listeners: A list of :class:`~trio.abc.Listener` objects.
:func:`serve_listeners` takes responsibility for closing them.
handler_nursery: The nursery used to start handlers, or any object with
a ``start_soon`` method. If ``None`` (the default), then
:func:`serve_listeners` will create a new nursery internally and use
that.
task_status: This function can be used with ``nursery.start``, which
will return ``listeners``.
Returns:
This function never returns unless cancelled.
Resource handling:
If ``handler`` neglects to close the ``stream``, then it will be closed
using :func:`trio.aclose_forcefully`.
Error handling:
Most errors coming from :meth:`~trio.abc.Listener.accept` are allowed to
propagate out (crashing the server in the process). However, some errors –
those which indicate that the server is temporarily overloaded – are
handled specially. These are :class:`OSError`\s with one of the following
errnos:
* ``EMFILE``: process is out of file descriptors
* ``ENFILE``: system is out of file descriptors
* ``ENOBUFS``, ``ENOMEM``: the kernel hit some sort of memory limitation
when trying to create a socket object
When :func:`serve_listeners` gets one of these errors, then it:
* Logs the error to the standard library logger ``trio.serve_listeners``
(level = ERROR, with exception information included). By default this
causes it to be printed to stderr.
* Waits 100 ms before calling ``accept`` again, in hopes that the
system will recover.
"""
async with trio.open_nursery() as nursery:
if handler_nursery is None:
handler_nursery = nursery
for listener in listeners:
nursery.start_soon(
_serve_one_listener, listener, handler_nursery, handler
)
# The listeners are already queueing connections when we're called,
# but we wait until the end to call started() just in case we get an
# error or whatever.
task_status.started(listeners)
Loading

0 comments on commit dce4f65

Please sign in to comment.