diff --git a/README.rst b/README.rst index 152287f2e9..79f65ae28e 100644 --- a/README.rst +++ b/README.rst @@ -199,25 +199,6 @@ Apache 2. See `LICENSE bytecode level, since exiting a with block seems to expand into 3 separate bytecodes? - - start_* convention -- if you want to run it synchronously, do - async with make_nursery() as nursery: - info, task = await start_foo(nursery) - return task.result.unwrap() - we might even want to wrap this idiom up in a convenience function - - for our server helper, it's a start_ function - maybe it takes listener_nursery, connection_nursery arguments, to let you - set up the graceful shutdown thing? though draining is still a - problem. I guess just a matter of setting a deadline? - - - should we provide a start_nursery? - - problem: an empty nursery would close itself before start_nursery - even returns! - - maybe as minimal extension to the existing thing, - open_nursery(autoclose=False), only closes when cancelled? - - possible improved robustness ("quality of implementation") ideas: - if an abort callback fails, discard that task but clean up the others (instead of discarding all) diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index f23997be9b..d39f52c6be 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -132,14 +132,18 @@ Abstract base classes -Generic stream implementations -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Generic stream tools +~~~~~~~~~~~~~~~~~~~~ -Trio currently provides one generic utility class for working with -streams. And if you want to test code that's written against the +Trio currently provides a generic helper for writing servers that +listen for connections using one or more +:class:`~trio.abc.Listener`\s, and a generic utility class for working +with streams. And if you want to test code that's written against the streams interface, you should also check out :ref:`testing-streams` in :mod:`trio.testing`. +.. autofunction:: serve_listeners + .. autoclass:: StapledStream :members: :show-inheritance: @@ -151,15 +155,19 @@ Sockets and networking The high-level network interface is built on top of our stream abstraction. +.. autofunction:: open_tcp_stream + +.. autofunction:: serve_tcp + +.. autofunction:: open_ssl_over_tcp_stream + +.. autofunction:: serve_ssl_over_tcp + .. autoclass:: SocketStream :members: :undoc-members: :show-inheritance: -.. autofunction:: open_tcp_stream - -.. autofunction:: open_ssl_over_tcp_stream - .. autoclass:: SocketListener :members: :show-inheritance: diff --git a/test-requirements.txt b/test-requirements.txt index e82d0ced22..42a0b5b623 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,3 +3,4 @@ pytest-cov ipython # for the IPython traceback integration tests pyOpenSSL # for the ssl tests trustme # for the ssl tests +pytest-catchlog diff --git a/trio/__init__.py b/trio/__init__.py index 03e88bbaa5..c2c4ef46b8 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -55,6 +55,9 @@ from ._path import * __all__ += _path.__all__ +from ._highlevel_serve_listeners import * +__all__ += _highlevel_serve_listeners.__all__ + from ._highlevel_open_tcp_stream import * __all__ += _highlevel_open_tcp_stream.__all__ diff --git a/trio/_highlevel_open_tcp_listeners.py b/trio/_highlevel_open_tcp_listeners.py index e9eb1bf731..4ac87686b4 100644 --- a/trio/_highlevel_open_tcp_listeners.py +++ b/trio/_highlevel_open_tcp_listeners.py @@ -1,10 +1,10 @@ import sys from math import inf -from ._highlevel_socket import SocketListener +import trio from . import socket as tsocket -__all__ = ["open_tcp_listeners"] +__all__ = ["open_tcp_listeners", "serve_tcp"] # Default backlog size: @@ -76,7 +76,8 @@ async def open_tcp_listeners(port, *, host=None, backlog=None): ``"0.0.0.0"`` for IPv4-only and ``"::"`` for IPv6-only. backlog (int or None): The listen backlog to use. If you leave this as - ``None`` then Trio will pick a good default. + ``None`` then Trio will pick a good default. (Currently: whatever + your system has configured as the maximum backlog.) Returns: list of :class:`SocketListener` @@ -121,7 +122,7 @@ async def open_tcp_listeners(port, *, host=None, backlog=None): sock.bind(sockaddr) sock.listen(backlog) - listeners.append(SocketListener(sock)) + listeners.append(trio.SocketListener(sock)) except: sock.close() raise @@ -131,3 +132,84 @@ async def open_tcp_listeners(port, *, host=None, backlog=None): raise return listeners + + +async def serve_tcp( + handler, + port, + *, + host=None, + backlog=None, + handler_nursery=None, + task_status=trio.STATUS_IGNORED +): + """Listen for incoming TCP connections, and for each one start a task + running ``handler(stream)``. + + This is a thin convenience wrapper around :func:`open_tcp_listeners` and + :func:`serve_listeners` – see them for full details. + + .. warning:: + + If ``handler`` raises an exception, then this function doesn't do + anything special to catch it – so by default the exception will + propagate out and crash your server. If you don't want this, then catch + exceptions inside your ``handler``, or use a ``handler_nursery`` object + that responds to exceptions in some other way. + + When used with ``nursery.start`` you get back the newly opened listeners. + So, for example, if you want to start a server in your test suite and then + connect to it to check that it's working properly, you can use something + like:: + + from trio.testing import open_stream_to_socket_listener + + async with trio.open_nursery() as nursery: + listeners = await nursery.start(serve_tcp, handler, 0) + client_stream = await open_stream_to_socket_listener(listeners[0]) + + # Then send and receive data on 'client', for example: + await client.send_all(b"GET / HTTP/1.0\r\n\r\n") + + This avoids several common pitfalls: + + 1. It lets the kernel pick a random open port, so your test suite doesn't + depend on any particular port being open. + + 2. It waits for the server to be accepting connections on that port before + ``start`` returns, so there's no race condition where the incoming + connection arrives before the server is ready. + + 3. It uses the Listener object to find out which port was picked, so it + can connect to the right place. + + Args: + handler: The handler to start for each incoming connection. Passed to + :func:`serve_listeners`. + + port: The port to listen on. Use 0 to let the kernel pick an open port. + Passed to :func:`open_tcp_listeners`. + + host (str, bytes, or None): The host interface to listen on; use + ``None`` to bind to the wildcard address. Passed to + :func:`open_tcp_listeners`. + + backlog: The listen backlog, or None to have a good default picked. + Passed to :func:`open_tcp_listeners`. + + handler_nursery: The nursery to start handlers in, or None to use an + internal nursery. Passed to :func:`serve_listeners`. + + task_status: This function can be used with ``nursery.start``. + + Returns: + This function only returns when cancelled. + + """ + listeners = await trio.open_tcp_listeners(port, host=host, backlog=backlog) + await trio.serve_listeners( + handler, + listeners, + handler_nursery=handler_nursery, + task_status=task_status + ) diff --git a/trio/_highlevel_serve_listeners.py b/trio/_highlevel_serve_listeners.py new file mode 100644 index 0000000000..0be6467199 --- /dev/null +++ b/trio/_highlevel_serve_listeners.py @@ -0,0 +1,129 @@ +import errno +import logging +import os + +import trio + +__all__ = ["serve_listeners"] + +# Errors that accept(2) can return, and which indicate that the system is +# overloaded +ACCEPT_CAPACITY_ERRNOS = { + errno.EMFILE, + errno.ENFILE, + errno.ENOMEM, + errno.ENOBUFS, +} + +# How long to sleep when we get one of those errors +SLEEP_TIME = 0.100 + +# The logger we use to complain when this happens +LOGGER = logging.getLogger("trio.serve_listeners") + + +async def _run_handler(stream, handler): + try: + await handler(stream) + finally: + await trio.aclose_forcefully(stream) + + +async def _serve_one_listener(listener, handler_nursery, handler): + async with listener: + while True: + try: + stream = await listener.accept() + except OSError as exc: + if exc.errno in ACCEPT_CAPACITY_ERRNOS: + LOGGER.error( + "accept returned %s (%s); retrying in %s seconds", + errno.errorcode[exc.errno], + os.strerror(exc.errno), + SLEEP_TIME, + exc_info=True + ) + await trio.sleep(SLEEP_TIME) + else: + raise + else: + handler_nursery.start_soon(_run_handler, stream, handler) + + +async def serve_listeners( + handler, + listeners, + *, + handler_nursery=None, + task_status=trio.STATUS_IGNORED +): + """Listen for incoming connections on ``listeners``, and for each one + start a task running ``handler(stream)``. + + .. warning:: + + If ``handler`` raises an exception, then this function doesn't do + anything special to catch it – so by default the exception will + propagate out and crash your server. If you don't want this, then catch + exceptions inside your ``handler``, or use a ``handler_nursery`` object + that responds to exceptions in some other way. + + Args: + + handler: An async callable, that will be invoked like + ``handler_nursery.start_soon(handler, stream)`` for each incoming + connection. + + listeners: A list of :class:`~trio.abc.Listener` objects. + :func:`serve_listeners` takes responsibility for closing them. + + handler_nursery: The nursery used to start handlers, or any object with + a ``start_soon`` method. If ``None`` (the default), then + :func:`serve_listeners` will create a new nursery internally and use + that. + + task_status: This function can be used with ``nursery.start``, which + will return ``listeners``. + + Returns: + + This function never returns unless cancelled. + + Resource handling: + + If ``handler`` neglects to close the ``stream``, then it will be closed + using :func:`trio.aclose_forcefully`. + + Error handling: + + Most errors coming from :meth:`~trio.abc.Listener.accept` are allowed to + propagate out (crashing the server in the process). However, some errors – + those which indicate that the server is temporarily overloaded – are + handled specially. These are :class:`OSError`\s with one of the following + errnos: + + * ``EMFILE``: process is out of file descriptors + * ``ENFILE``: system is out of file descriptors + * ``ENOBUFS``, ``ENOMEM``: the kernel hit some sort of memory limitation + when trying to create a socket object + + When :func:`serve_listeners` gets one of these errors, then it: + + * Logs the error to the standard library logger ``trio.serve_listeners`` + (level = ERROR, with exception information included). By default this + causes it to be printed to stderr. + * Waits 100 ms before calling ``accept`` again, in hopes that the + system will recover. + + """ + async with trio.open_nursery() as nursery: + if handler_nursery is None: + handler_nursery = nursery + for listener in listeners: + nursery.start_soon( + _serve_one_listener, listener, handler_nursery, handler + ) + # The listeners are already queueing connections when we're called, + # but we wait until the end to call started() just in case we get an + # error or whatever. + task_status.started(listeners) diff --git a/trio/_highlevel_ssl_helpers.py b/trio/_highlevel_ssl_helpers.py index fda8c72f42..c2c5033701 100644 --- a/trio/_highlevel_ssl_helpers.py +++ b/trio/_highlevel_ssl_helpers.py @@ -2,7 +2,10 @@ from ._highlevel_open_tcp_stream import DEFAULT_DELAY -__all__ = ["open_ssl_over_tcp_stream", "open_ssl_over_tcp_listeners"] +__all__ = [ + "open_ssl_over_tcp_stream", "open_ssl_over_tcp_listeners", + "serve_ssl_over_tcp" +] # It might have been nice to take a ssl_protocols= argument here to set up @@ -74,8 +77,7 @@ async def open_ssl_over_tcp_listeners( """Start listening for SSL/TLS-encrypted TCP connections to the given port. Args: - port (int or str): The port to listen on. See - :func:`open_tcp_listeners`. + port (int): The port to listen on. See :func:`open_tcp_listeners`. ssl_context (~ssl.SSLContext): The SSL context to use for all incoming connections. host (str, bytes, or None): The address to bind to; use ``None`` to bind @@ -95,3 +97,76 @@ async def open_ssl_over_tcp_listeners( ) for tcp_listener in tcp_listeners ] return ssl_listeners + + +async def serve_ssl_over_tcp( + handler, + port, + ssl_context, + *, + host=None, + https_compatible=False, + backlog=None, + handler_nursery=None, + task_status=trio.STATUS_IGNORED +): + """Listen for incoming TCP connections, and for each one start a task + running ``handler(stream)``. + + This is a thin convenience wrapper around + :func:`open_ssl_over_tcp_listeners` and :func:`serve_listeners` – see them + for full details. + + .. warning:: + + If ``handler`` raises an exception, then this function doesn't do + anything special to catch it – so by default the exception will + propagate out and crash your server. If you don't want this, then catch + exceptions inside your ``handler``, or use a ``handler_nursery`` object + that responds to exceptions in some other way. + + When used with ``nursery.start`` you get back the newly opened listeners. + See the documentation for :func:`serve_tcp` for an example where this is + useful. + + Args: + handler: The handler to start for each incoming connection. Passed to + :func:`serve_listeners`. + + port (int): The port to listen on. Use 0 to let the kernel pick + an open port. Ultimately passed to :func:`open_tcp_listeners`. + + ssl_context (~ssl.SSLContext): The SSL context to use for all incoming + connections. Passed to :func:`open_ssl_over_tcp_listeners`. + + host (str, bytes, or None): The address to bind to; use ``None`` to bind + to the wildcard address. Ultimately passed to + :func:`open_tcp_listeners`. + + https_compatible (bool): Set this to True if you want to use + "HTTPS-style" TLS. See :class:`~trio.ssl.SSLStream` for details. + + backlog (int or None): See :class:`~trio.ssl.SSLStream` for details. + + handler_nursery: The nursery to start handlers in, or None to use an + internal nursery. Passed to :func:`serve_listeners`. + + task_status: This function can be used with ``nursery.start``. + + Returns: + This function only returns when cancelled. + + """ + listeners = await trio.open_ssl_over_tcp_listeners( + port, + ssl_context, + host=host, + https_compatible=https_compatible, + backlog=backlog + ) + await trio.serve_listeners( + handler, + listeners, + handler_nursery=handler_nursery, + task_status=task_status + ) diff --git a/trio/tests/test_highlevel_open_tcp_listeners.py b/trio/tests/test_highlevel_open_tcp_listeners.py index 3f57ff049b..001cf30948 100644 --- a/trio/tests/test_highlevel_open_tcp_listeners.py +++ b/trio/tests/test_highlevel_open_tcp_listeners.py @@ -5,7 +5,9 @@ import attr import trio -from trio import open_tcp_listeners, SocketListener, open_tcp_stream +from trio import ( + open_tcp_listeners, serve_tcp, SocketListener, open_tcp_stream +) from trio.testing import open_stream_to_socket_listener from .. import socket as tsocket from .._core.tests.tutil import slow @@ -220,3 +222,15 @@ async def test_open_tcp_listeners_port_checking(): await open_tcp_listeners(b"80", host=host) with pytest.raises(TypeError): await open_tcp_listeners("http", host=host) + + +async def test_serve_tcp(): + async def handler(stream): + await stream.send_all(b"x") + + async with trio.open_nursery() as nursery: + listeners = await nursery.start(serve_tcp, handler, 0) + stream = await open_stream_to_socket_listener(listeners[0]) + async with stream: + await stream.receive_some(1) == b"x" + nursery.cancel_scope.cancel() diff --git a/trio/tests/test_highlevel_serve_listeners.py b/trio/tests/test_highlevel_serve_listeners.py new file mode 100644 index 0000000000..173893f54a --- /dev/null +++ b/trio/tests/test_highlevel_serve_listeners.py @@ -0,0 +1,145 @@ +import pytest + +from functools import partial +import errno +from collections import deque + +import attr + +import trio +from trio.testing import memory_stream_pair, wait_all_tasks_blocked + + +@attr.s(hash=False, cmp=False) +class MemoryListener(trio.abc.Listener): + closed = attr.ib(default=False) + accepted_streams = attr.ib(default=attr.Factory(list)) + queued_streams = attr.ib(default=attr.Factory(lambda: trio.Queue(1))) + accept_hook = attr.ib(default=None) + + async def connect(self): + assert not self.closed + client, server = memory_stream_pair() + await self.queued_streams.put(server) + return client + + async def accept(self): + await trio.hazmat.yield_briefly() + assert not self.closed + if self.accept_hook is not None: + await self.accept_hook() + stream = await self.queued_streams.get() + self.accepted_streams.append(stream) + return stream + + async def aclose(self): + self.closed = True + await trio.hazmat.yield_briefly() + + +async def test_serve_listeners_basic(): + listeners = [MemoryListener(), MemoryListener()] + + record = [] + + def close_hook(): + # Make sure this is a forceful close + assert trio.current_effective_deadline() == float("-inf") + record.append(("closed")) + + async def handler(stream): + await stream.send_all(b"123") + assert await stream.receive_some(10) == b"456" + stream.send_stream.close_hook = close_hook + stream.receive_stream.close_hook = close_hook + + async def client(listener): + s = await listener.connect() + assert await s.receive_some(10) == b"123" + await s.send_all(b"456") + + async def do_tests(parent_nursery): + async with trio.open_nursery() as nursery: + for listener in listeners: + for _ in range(3): + nursery.start_soon(client, listener) + + await wait_all_tasks_blocked() + + # verifies that all 6 streams x 2 directions each were closed ok + assert len(record) == 12 + + parent_nursery.cancel_scope.cancel() + + async with trio.open_nursery() as nursery: + l2 = await nursery.start(trio.serve_listeners, handler, listeners) + assert l2 == listeners + # This is just split into another function because gh-136 isn't + # implemented yet + nursery.start_soon(do_tests, nursery) + + for listener in listeners: + assert listener.closed + + +async def test_serve_listeners_accept_unrecognized_error(): + for error in [KeyError(), OSError(errno.ECONNABORTED, "ECONNABORTED")]: + listener = MemoryListener() + + async def raise_error(): + raise error + + listener.accept_hook = raise_error + + with pytest.raises(type(error)) as excinfo: + await trio.serve_listeners(None, [listener]) + assert excinfo.value is error + + +async def test_serve_listeners_accept_capacity_error(autojump_clock, caplog): + listener = MemoryListener() + + async def raise_EMFILE(): + raise OSError(errno.EMFILE, "out of file descriptors") + + listener.accept_hook = raise_EMFILE + + # It retries every 100 ms, so in 950 ms it will retry at 0, 100, ..., 900 + # = 10 times total + with trio.move_on_after(0.950): + await trio.serve_listeners(None, [listener]) + + assert len(caplog.records) == 10 + for record in caplog.records: + assert "retrying" in record.msg + assert record.exc_info[1].errno == errno.EMFILE + + +async def test_serve_listeners_connection_nursery(autojump_clock): + listener = MemoryListener() + + async def handler(stream): + await trio.sleep(1) + + class Done(Exception): + pass + + async def connection_watcher(*, task_status=trio.STATUS_IGNORED): + async with trio.open_nursery() as nursery: + task_status.started(nursery) + await wait_all_tasks_blocked() + assert len(nursery.children) == 10 + raise Done + + with pytest.raises(Done): + async with trio.open_nursery() as nursery: + handler_nursery = await nursery.start(connection_watcher) + await nursery.start( + partial( + trio.serve_listeners, + handler, [listener], + handler_nursery=handler_nursery + ) + ) + for _ in range(10): + nursery.start_soon(listener.connect) diff --git a/trio/tests/test_highlevel_ssl_helpers.py b/trio/tests/test_highlevel_ssl_helpers.py index 9142f1de9a..213a16c2f9 100644 --- a/trio/tests/test_highlevel_ssl_helpers.py +++ b/trio/tests/test_highlevel_ssl_helpers.py @@ -1,5 +1,7 @@ import pytest +from functools import partial + import attr import trio @@ -9,36 +11,20 @@ from .test_ssl import CLIENT_CTX, SERVER_CTX from .._highlevel_ssl_helpers import ( - open_ssl_over_tcp_stream, open_ssl_over_tcp_listeners + open_ssl_over_tcp_stream, open_ssl_over_tcp_listeners, serve_ssl_over_tcp ) -# This is used by test_open_ssl_over_tcp_stream to test it, and it also uses -# open_ssl_over_tcp_listeners so it tests that too. -async def start_echo_server(nursery): - - async def accept_loop(listener): - async with listener: +async def echo_handler(stream): + async with stream: + try: while True: - server_stream = await listener.accept() - nursery.spawn(echo_server, server_stream) - - async def echo_server(server_stream): - async with server_stream: - try: - while True: - data = await server_stream.receive_some(10000) - if not data: - break - await server_stream.send_all(data) - except trio.BrokenStreamError: - pass - - (listener,) = await trio.open_ssl_over_tcp_listeners( - 0, SERVER_CTX, host="127.0.0.1" - ) - nursery.spawn(accept_loop, listener) - return listener.transport_listener.socket.getsockname() + data = await stream.receive_some(10000) + if not data: + break + await stream.send_all(data) + except trio.BrokenStreamError: + pass # Resolver that always returns the given sockaddr, no matter what host/port @@ -54,9 +40,19 @@ async def getnameinfo(self, *args): # pragma: no cover raise NotImplementedError -async def test_open_ssl_over_tcp_stream(): +# This uses serve_ssl_over_tcp, which uses open_ssl_over_tcp_listeners... +async def test_open_ssl_over_tcp_stream_and_everything_else(): async with trio.open_nursery() as nursery: - sockaddr = await start_echo_server(nursery) + (listener,) = await nursery.start( + partial( + serve_ssl_over_tcp, + echo_handler, + 0, + SERVER_CTX, + host="127.0.0.1" + ) + ) + sockaddr = listener.transport_listener.socket.getsockname() hostname_resolver = FakeHostnameResolver(sockaddr) trio.socket.set_custom_hostname_resolver(hostname_resolver)