From 3e4ccf060121fed898d7b041c220784028e27404 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 21 Aug 2017 19:33:26 -0700 Subject: [PATCH 1/8] Add serve_listeners --- test-requirements.txt | 1 + trio/__init__.py | 3 + trio/_highlevel_serve_listeners.py | 82 ++++++++ trio/tests/test_highlevel_server_listeners.py | 183 ++++++++++++++++++ 4 files changed, 269 insertions(+) create mode 100644 trio/_highlevel_serve_listeners.py create mode 100644 trio/tests/test_highlevel_server_listeners.py diff --git a/test-requirements.txt b/test-requirements.txt index e82d0ced22..42a0b5b623 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -3,3 +3,4 @@ pytest-cov ipython # for the IPython traceback integration tests pyOpenSSL # for the ssl tests trustme # for the ssl tests +pytest-catchlog diff --git a/trio/__init__.py b/trio/__init__.py index 03e88bbaa5..c2c4ef46b8 100644 --- a/trio/__init__.py +++ b/trio/__init__.py @@ -55,6 +55,9 @@ from ._path import * __all__ += _path.__all__ +from ._highlevel_serve_listeners import * +__all__ += _highlevel_serve_listeners.__all__ + from ._highlevel_open_tcp_stream import * __all__ += _highlevel_open_tcp_stream.__all__ diff --git a/trio/_highlevel_serve_listeners.py b/trio/_highlevel_serve_listeners.py new file mode 100644 index 0000000000..89649b9150 --- /dev/null +++ b/trio/_highlevel_serve_listeners.py @@ -0,0 +1,82 @@ +import errno +import logging +import os + +import trio + +__all__ = ["serve_listeners"] + +# Errors that accept(2) can return, and which indicate that the system is +# overloaded +ACCEPT_CAPACITY_ERRNOS = { + errno.EMFILE, + errno.ENFILE, + errno.ENOMEM, + errno.ENOBUFS, +} + +# How long to sleep when we get one of those errors +SLEEP_TIME = 0.100 + +# The logger we use to complain when this happens +LOGGER = logging.getLogger("trio.serve_listeners") + + +async def _run_handler(stream, handler): + try: + await handler(stream) + finally: + await trio.aclose_forcefully(stream) + + +async def _serve_one_listener(listener, connection_nursery, handler): + async with listener: + while True: + # Using MultiError.catch here is kind of overkill -- it's pretty + # unlikely that someone accept() method is going to like, spawn + # multiple subtasks and the have some of them fail simultaneously. + # But if they do, we're ready for them :-) + capacity_errors = [] + + def filter_capacity_errors(exc): + if (isinstance(exc, OSError) + and exc.errno in ACCEPT_CAPACITY_ERRNOS): + capacity_errors.append(exc) + return None + return exc + + with trio.MultiError.catch(filter_capacity_errors): + stream = await listener.accept() + + if capacity_errors: + for exc in capacity_errors: + LOGGER.error( + "accept returned %s (%s); retrying in %s seconds", + errno.errorcode[exc.errno], + os.strerror(exc.errno), + SLEEP_TIME, + exc_info=exc + ) + await trio.sleep(SLEEP_TIME) + else: + connection_nursery.start_soon(_run_handler, stream, handler) + + +async def serve_listeners( + handler, + listeners, + *, + connection_nursery=None, + task_status=trio.STATUS_IGNORED +): + async with trio.open_nursery() as nursery: + if connection_nursery is None: + connection_nursery = nursery + for listener in listeners: + nursery.start_soon( + _serve_one_listener, listener, connection_nursery, handler + ) + # The listeners are already queueing connections when we're called, + # but we wait until the end to call started() just in case we get an + # error or whatever. + task_status.started(listeners) diff --git a/trio/tests/test_highlevel_server_listeners.py b/trio/tests/test_highlevel_server_listeners.py new file mode 100644 index 0000000000..613a00d739 --- /dev/null +++ b/trio/tests/test_highlevel_server_listeners.py @@ -0,0 +1,183 @@ +import pytest + +from functools import partial +import errno +from collections import deque + +import attr + +import trio +from trio.testing import memory_stream_pair, wait_all_tasks_blocked + + +@attr.s(hash=False, cmp=False) +class MemoryListener(trio.abc.Listener): + closed = attr.ib(default=False) + accepted_streams = attr.ib(default=attr.Factory(list)) + queued_streams = attr.ib(default=attr.Factory(lambda: trio.Queue(1))) + accept_hook = attr.ib(default=None) + + async def connect(self): + assert not self.closed + client, server = memory_stream_pair() + await self.queued_streams.put(server) + return client + + async def accept(self): + await trio.hazmat.yield_briefly() + assert not self.closed + if self.accept_hook is not None: + await self.accept_hook() + stream = await self.queued_streams.get() + self.accepted_streams.append(stream) + return stream + + async def aclose(self): + self.closed = True + await trio.hazmat.yield_briefly() + + +async def test_serve_listeners_basic(): + listeners = [MemoryListener(), MemoryListener()] + + record = [] + + def close_hook(): + # Make sure this is a forceful close + assert trio.current_effective_deadline() == float("-inf") + record.append(("closed")) + + async def handler(stream): + await stream.send_all(b"123") + assert await stream.receive_some(10) == b"456" + stream.send_stream.close_hook = close_hook + stream.receive_stream.close_hook = close_hook + + async def client(listener): + s = await listener.connect() + assert await s.receive_some(10) == b"123" + await s.send_all(b"456") + + async def do_tests(parent_nursery): + async with trio.open_nursery() as nursery: + for listener in listeners: + for _ in range(3): + nursery.start_soon(client, listener) + + await wait_all_tasks_blocked() + + # verifies that all 6 streams x 2 directions each were closed ok + assert len(record) == 12 + + parent_nursery.cancel_scope.cancel() + + async with trio.open_nursery() as nursery: + l2 = await nursery.start(trio.serve_listeners, handler, listeners) + assert l2 == listeners + # This is just split into another function because gh-136 isn't + # implemented yet + nursery.start_soon(do_tests, nursery) + + for listener in listeners: + assert listener.closed + + +async def test_serve_listeners_accept_unrecognized_error(): + for error in [KeyError(), OSError(errno.ECONNABORTED, "ECONNABORTED")]: + listener = MemoryListener() + + async def raise_error(): + raise error + + listener.accept_hook = raise_error + + with pytest.raises(type(error)) as excinfo: + await trio.serve_listeners(None, [listener]) + assert excinfo.value is error + + +async def test_serve_listeners_accept_capacity_error(autojump_clock, caplog): + listener = MemoryListener() + + async def raise_EMFILE(): + raise OSError(errno.EMFILE, "out of file descriptors") + + listener.accept_hook = raise_EMFILE + + # It retries every 100 ms, so in 950 ms it will retry at 0, 100, ..., 900 + # = 10 times total + with trio.move_on_after(0.950): + await trio.serve_listeners(None, [listener]) + + assert len(caplog.records) == 10 + for record in caplog.records: + assert "retrying" in record.msg + assert record.exc_info[1].errno == errno.EMFILE + + +async def test_serve_listeners_accept_multi_capacity_error( + autojump_clock, caplog +): + listener = MemoryListener() + + async def raise_MultiError(): + error1 = OSError(errno.EMFILE, "out of file descriptors") + error2 = OSError(errno.ENOBUFS, "out of file buffers") + raise trio.MultiError([error1, error2]) + + listener.accept_hook = raise_MultiError + + # It retries every 100 ms (even with 2 failures), so in 950 ms it will + # retry at 0, 100, ..., 900 = 10 times total + with trio.move_on_after(0.950): + await trio.serve_listeners(None, [listener]) + + # This time each failure generates 2 logging records + assert len(caplog.records) == 20 + for record in caplog.records: + assert "retrying" in record.msg + assert record.exc_info[1].errno in (errno.EMFILE, errno.ENOBUFS) + + +async def test_serve_listeners_accept_mixed_error(autojump_clock, caplog): + listener = MemoryListener() + + async def raise_MultiError(): + error1 = OSError(errno.EMFILE, "out of file descriptors") + error2 = KeyError() + raise trio.MultiError([error1, error2]) + + listener.accept_hook = raise_MultiError + + with pytest.raises(KeyError): + await trio.serve_listeners(None, [listener]) + + +async def test_serve_listeners_connection_nursery(autojump_clock): + listener = MemoryListener() + + async def handler(stream): + await trio.sleep(1) + + class Done(Exception): + pass + + async def connection_watcher(*, task_status=trio.STATUS_IGNORED): + async with trio.open_nursery() as nursery: + task_status.started(nursery) + await wait_all_tasks_blocked() + assert len(nursery.children) == 10 + raise Done + + with pytest.raises(Done): + async with trio.open_nursery() as nursery: + connection_nursery = await nursery.start(connection_watcher) + await nursery.start( + partial( + trio.serve_listeners, + handler, [listener], + connection_nursery=connection_nursery + ) + ) + for _ in range(10): + nursery.start_soon(listener.connect) From cdabe4c6c8f9bb1cefd2fab79e09b27eff734e31 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 21 Aug 2017 19:45:18 -0700 Subject: [PATCH 2/8] Remove MultiError handling from serve_listeners I guess we can add it back if anyone ever notices and complains, it's almost certainly never going to be used, and it makes the code too complicated. It's likely folks will be looking at this as example code, so let's not inflict this on them for no reason. --- trio/_highlevel_serve_listeners.py | 26 ++++--------- trio/tests/test_highlevel_server_listeners.py | 38 ------------------- 2 files changed, 7 insertions(+), 57 deletions(-) diff --git a/trio/_highlevel_serve_listeners.py b/trio/_highlevel_serve_listeners.py index 89649b9150..56ad93f190 100644 --- a/trio/_highlevel_serve_listeners.py +++ b/trio/_highlevel_serve_listeners.py @@ -32,32 +32,20 @@ async def _run_handler(stream, handler): async def _serve_one_listener(listener, connection_nursery, handler): async with listener: while True: - # Using MultiError.catch here is kind of overkill -- it's pretty - # unlikely that someone accept() method is going to like, spawn - # multiple subtasks and the have some of them fail simultaneously. - # But if they do, we're ready for them :-) - capacity_errors = [] - - def filter_capacity_errors(exc): - if (isinstance(exc, OSError) - and exc.errno in ACCEPT_CAPACITY_ERRNOS): - capacity_errors.append(exc) - return None - return exc - - with trio.MultiError.catch(filter_capacity_errors): + try: stream = await listener.accept() - - if capacity_errors: - for exc in capacity_errors: + except OSError as exc: + if exc.errno in ACCEPT_CAPACITY_ERRNOS: LOGGER.error( "accept returned %s (%s); retrying in %s seconds", errno.errorcode[exc.errno], os.strerror(exc.errno), SLEEP_TIME, - exc_info=exc + exc_info=True ) - await trio.sleep(SLEEP_TIME) + await trio.sleep(SLEEP_TIME) + else: + raise else: connection_nursery.start_soon(_run_handler, stream, handler) diff --git a/trio/tests/test_highlevel_server_listeners.py b/trio/tests/test_highlevel_server_listeners.py index 613a00d739..4225f53c2f 100644 --- a/trio/tests/test_highlevel_server_listeners.py +++ b/trio/tests/test_highlevel_server_listeners.py @@ -115,44 +115,6 @@ async def raise_EMFILE(): assert record.exc_info[1].errno == errno.EMFILE -async def test_serve_listeners_accept_multi_capacity_error( - autojump_clock, caplog -): - listener = MemoryListener() - - async def raise_MultiError(): - error1 = OSError(errno.EMFILE, "out of file descriptors") - error2 = OSError(errno.ENOBUFS, "out of file buffers") - raise trio.MultiError([error1, error2]) - - listener.accept_hook = raise_MultiError - - # It retries every 100 ms (even with 2 failures), so in 950 ms it will - # retry at 0, 100, ..., 900 = 10 times total - with trio.move_on_after(0.950): - await trio.serve_listeners(None, [listener]) - - # This time each failure generates 2 logging records - assert len(caplog.records) == 20 - for record in caplog.records: - assert "retrying" in record.msg - assert record.exc_info[1].errno in (errno.EMFILE, errno.ENOBUFS) - - -async def test_serve_listeners_accept_mixed_error(autojump_clock, caplog): - listener = MemoryListener() - - async def raise_MultiError(): - error1 = OSError(errno.EMFILE, "out of file descriptors") - error2 = KeyError() - raise trio.MultiError([error1, error2]) - - listener.accept_hook = raise_MultiError - - with pytest.raises(KeyError): - await trio.serve_listeners(None, [listener]) - - async def test_serve_listeners_connection_nursery(autojump_clock): listener = MemoryListener() From 6cdde5c2963c78ed4a62c3d50ac0e3620824d97c Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 21 Aug 2017 19:53:21 -0700 Subject: [PATCH 3/8] Update README --- README.rst | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/README.rst b/README.rst index 152287f2e9..79f65ae28e 100644 --- a/README.rst +++ b/README.rst @@ -199,25 +199,6 @@ Apache 2. See `LICENSE bytecode level, since exiting a with block seems to expand into 3 separate bytecodes? - - start_* convention -- if you want to run it synchronously, do - async with make_nursery() as nursery: - info, task = await start_foo(nursery) - return task.result.unwrap() - we might even want to wrap this idiom up in a convenience function - - for our server helper, it's a start_ function - maybe it takes listener_nursery, connection_nursery arguments, to let you - set up the graceful shutdown thing? though draining is still a - problem. I guess just a matter of setting a deadline? - - - should we provide a start_nursery? - - problem: an empty nursery would close itself before start_nursery - even returns! - - maybe as minimal extension to the existing thing, - open_nursery(autoclose=False), only closes when cancelled? - - possible improved robustness ("quality of implementation") ideas: - if an abort callback fails, discard that task but clean up the others (instead of discarding all) From 090a1d670fcbad82ff5a722b2a3a1df148926cea Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 21 Aug 2017 19:54:22 -0700 Subject: [PATCH 4/8] Rename connection_nursery to handler_nursery For brevity and consistency --- trio/_highlevel_serve_listeners.py | 12 ++++++------ trio/tests/test_highlevel_server_listeners.py | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/trio/_highlevel_serve_listeners.py b/trio/_highlevel_serve_listeners.py index 56ad93f190..8014fc06a3 100644 --- a/trio/_highlevel_serve_listeners.py +++ b/trio/_highlevel_serve_listeners.py @@ -29,7 +29,7 @@ async def _run_handler(stream, handler): await trio.aclose_forcefully(stream) -async def _serve_one_listener(listener, connection_nursery, handler): +async def _serve_one_listener(listener, handler_nursery, handler): async with listener: while True: try: @@ -47,22 +47,22 @@ async def _serve_one_listener(listener, connection_nursery, handler): else: raise else: - connection_nursery.start_soon(_run_handler, stream, handler) + handler_nursery.start_soon(_run_handler, stream, handler) async def serve_listeners( handler, listeners, *, - connection_nursery=None, + handler_nursery=None, task_status=trio.STATUS_IGNORED ): async with trio.open_nursery() as nursery: - if connection_nursery is None: - connection_nursery = nursery + if handler_nursery is None: + handler_nursery = nursery for listener in listeners: nursery.start_soon( - _serve_one_listener, listener, connection_nursery, handler + _serve_one_listener, listener, handler_nursery, handler ) # The listeners are already queueing connections when we're called, # but we wait until the end to call started() just in case we get an diff --git a/trio/tests/test_highlevel_server_listeners.py b/trio/tests/test_highlevel_server_listeners.py index 4225f53c2f..173893f54a 100644 --- a/trio/tests/test_highlevel_server_listeners.py +++ b/trio/tests/test_highlevel_server_listeners.py @@ -133,12 +133,12 @@ async def connection_watcher(*, task_status=trio.STATUS_IGNORED): with pytest.raises(Done): async with trio.open_nursery() as nursery: - connection_nursery = await nursery.start(connection_watcher) + handler_nursery = await nursery.start(connection_watcher) await nursery.start( partial( trio.serve_listeners, handler, [listener], - connection_nursery=connection_nursery + handler_nursery=handler_nursery ) ) for _ in range(10): From 73f07870968707881861f127b636f31952f75584 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 21 Aug 2017 19:55:51 -0700 Subject: [PATCH 5/8] Fix filename --- ...evel_server_listeners.py => test_highlevel_serve_listeners.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename trio/tests/{test_highlevel_server_listeners.py => test_highlevel_serve_listeners.py} (100%) diff --git a/trio/tests/test_highlevel_server_listeners.py b/trio/tests/test_highlevel_serve_listeners.py similarity index 100% rename from trio/tests/test_highlevel_server_listeners.py rename to trio/tests/test_highlevel_serve_listeners.py From 2bdaac285a539a3227df98ae87e4071a23eb99ad Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 21 Aug 2017 19:56:48 -0700 Subject: [PATCH 6/8] Add trio.serve_tcp --- trio/_highlevel_open_tcp_listeners.py | 24 ++++++++++++++++--- .../test_highlevel_open_tcp_listeners.py | 16 ++++++++++++- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/trio/_highlevel_open_tcp_listeners.py b/trio/_highlevel_open_tcp_listeners.py index e9eb1bf731..8a86ea9416 100644 --- a/trio/_highlevel_open_tcp_listeners.py +++ b/trio/_highlevel_open_tcp_listeners.py @@ -1,10 +1,10 @@ import sys from math import inf -from ._highlevel_socket import SocketListener +import trio from . import socket as tsocket -__all__ = ["open_tcp_listeners"] +__all__ = ["open_tcp_listeners", "serve_tcp"] # Default backlog size: @@ -121,7 +121,7 @@ async def open_tcp_listeners(port, *, host=None, backlog=None): sock.bind(sockaddr) sock.listen(backlog) - listeners.append(SocketListener(sock)) + listeners.append(trio.SocketListener(sock)) except: sock.close() raise @@ -131,3 +131,21 @@ async def open_tcp_listeners(port, *, host=None, backlog=None): raise return listeners + + +async def serve_tcp( + handler, + port, + *, + host=None, + backlog=None, + handler_nursery=None, + task_status=trio.STATUS_IGNORED +): + listeners = await trio.open_tcp_listeners(port, host=host, backlog=backlog) + await trio.serve_listeners( + handler, + listeners, + handler_nursery=handler_nursery, + task_status=task_status + ) diff --git a/trio/tests/test_highlevel_open_tcp_listeners.py b/trio/tests/test_highlevel_open_tcp_listeners.py index 3f57ff049b..001cf30948 100644 --- a/trio/tests/test_highlevel_open_tcp_listeners.py +++ b/trio/tests/test_highlevel_open_tcp_listeners.py @@ -5,7 +5,9 @@ import attr import trio -from trio import open_tcp_listeners, SocketListener, open_tcp_stream +from trio import ( + open_tcp_listeners, serve_tcp, SocketListener, open_tcp_stream +) from trio.testing import open_stream_to_socket_listener from .. import socket as tsocket from .._core.tests.tutil import slow @@ -220,3 +222,15 @@ async def test_open_tcp_listeners_port_checking(): await open_tcp_listeners(b"80", host=host) with pytest.raises(TypeError): await open_tcp_listeners("http", host=host) + + +async def test_serve_tcp(): + async def handler(stream): + await stream.send_all(b"x") + + async with trio.open_nursery() as nursery: + listeners = await nursery.start(serve_tcp, handler, 0) + stream = await open_stream_to_socket_listener(listeners[0]) + async with stream: + await stream.receive_some(1) == b"x" + nursery.cancel_scope.cancel() From 5782f8f2aaed457cc95dcfdd683f33b1dccbc105 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 21 Aug 2017 20:06:21 -0700 Subject: [PATCH 7/8] Add trio.serve_ssl_over_tcp --- trio/_highlevel_ssl_helpers.py | 31 +++++++++++++- trio/tests/test_highlevel_ssl_helpers.py | 52 +++++++++++------------- 2 files changed, 54 insertions(+), 29 deletions(-) diff --git a/trio/_highlevel_ssl_helpers.py b/trio/_highlevel_ssl_helpers.py index fda8c72f42..2dfc5593bf 100644 --- a/trio/_highlevel_ssl_helpers.py +++ b/trio/_highlevel_ssl_helpers.py @@ -2,7 +2,10 @@ from ._highlevel_open_tcp_stream import DEFAULT_DELAY -__all__ = ["open_ssl_over_tcp_stream", "open_ssl_over_tcp_listeners"] +__all__ = [ + "open_ssl_over_tcp_stream", "open_ssl_over_tcp_listeners", + "serve_ssl_over_tcp" +] # It might have been nice to take a ssl_protocols= argument here to set up @@ -95,3 +98,29 @@ async def open_ssl_over_tcp_listeners( ) for tcp_listener in tcp_listeners ] return ssl_listeners + + +async def serve_ssl_over_tcp( + handler, + port, + ssl_context, + *, + host=None, + https_compatible=False, + backlog=None, + handler_nursery=None, + task_status=trio.STATUS_IGNORED +): + listeners = await trio.open_ssl_over_tcp_listeners( + port, + ssl_context, + host=host, + https_compatible=https_compatible, + backlog=backlog + ) + await trio.serve_listeners( + handler, + listeners, + handler_nursery=handler_nursery, + task_status=task_status + ) diff --git a/trio/tests/test_highlevel_ssl_helpers.py b/trio/tests/test_highlevel_ssl_helpers.py index 9142f1de9a..213a16c2f9 100644 --- a/trio/tests/test_highlevel_ssl_helpers.py +++ b/trio/tests/test_highlevel_ssl_helpers.py @@ -1,5 +1,7 @@ import pytest +from functools import partial + import attr import trio @@ -9,36 +11,20 @@ from .test_ssl import CLIENT_CTX, SERVER_CTX from .._highlevel_ssl_helpers import ( - open_ssl_over_tcp_stream, open_ssl_over_tcp_listeners + open_ssl_over_tcp_stream, open_ssl_over_tcp_listeners, serve_ssl_over_tcp ) -# This is used by test_open_ssl_over_tcp_stream to test it, and it also uses -# open_ssl_over_tcp_listeners so it tests that too. -async def start_echo_server(nursery): - - async def accept_loop(listener): - async with listener: +async def echo_handler(stream): + async with stream: + try: while True: - server_stream = await listener.accept() - nursery.spawn(echo_server, server_stream) - - async def echo_server(server_stream): - async with server_stream: - try: - while True: - data = await server_stream.receive_some(10000) - if not data: - break - await server_stream.send_all(data) - except trio.BrokenStreamError: - pass - - (listener,) = await trio.open_ssl_over_tcp_listeners( - 0, SERVER_CTX, host="127.0.0.1" - ) - nursery.spawn(accept_loop, listener) - return listener.transport_listener.socket.getsockname() + data = await stream.receive_some(10000) + if not data: + break + await stream.send_all(data) + except trio.BrokenStreamError: + pass # Resolver that always returns the given sockaddr, no matter what host/port @@ -54,9 +40,19 @@ async def getnameinfo(self, *args): # pragma: no cover raise NotImplementedError -async def test_open_ssl_over_tcp_stream(): +# This uses serve_ssl_over_tcp, which uses open_ssl_over_tcp_listeners... +async def test_open_ssl_over_tcp_stream_and_everything_else(): async with trio.open_nursery() as nursery: - sockaddr = await start_echo_server(nursery) + (listener,) = await nursery.start( + partial( + serve_ssl_over_tcp, + echo_handler, + 0, + SERVER_CTX, + host="127.0.0.1" + ) + ) + sockaddr = listener.transport_listener.socket.getsockname() hostname_resolver = FakeHostnameResolver(sockaddr) trio.socket.set_custom_hostname_resolver(hostname_resolver) From ed0996730f71cfa280c3e8491633c7170e2490b7 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Mon, 21 Aug 2017 21:59:35 -0700 Subject: [PATCH 8/8] Add documentation for trio.serve_* --- docs/source/reference-io.rst | 24 ++++++---- trio/_highlevel_open_tcp_listeners.py | 66 ++++++++++++++++++++++++++- trio/_highlevel_serve_listeners.py | 59 ++++++++++++++++++++++++ trio/_highlevel_ssl_helpers.py | 50 +++++++++++++++++++- 4 files changed, 188 insertions(+), 11 deletions(-) diff --git a/docs/source/reference-io.rst b/docs/source/reference-io.rst index f23997be9b..d39f52c6be 100644 --- a/docs/source/reference-io.rst +++ b/docs/source/reference-io.rst @@ -132,14 +132,18 @@ Abstract base classes -Generic stream implementations -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Generic stream tools +~~~~~~~~~~~~~~~~~~~~ -Trio currently provides one generic utility class for working with -streams. And if you want to test code that's written against the +Trio currently provides a generic helper for writing servers that +listen for connections using one or more +:class:`~trio.abc.Listener`\s, and a generic utility class for working +with streams. And if you want to test code that's written against the streams interface, you should also check out :ref:`testing-streams` in :mod:`trio.testing`. +.. autofunction:: serve_listeners + .. autoclass:: StapledStream :members: :show-inheritance: @@ -151,15 +155,19 @@ Sockets and networking The high-level network interface is built on top of our stream abstraction. +.. autofunction:: open_tcp_stream + +.. autofunction:: serve_tcp + +.. autofunction:: open_ssl_over_tcp_stream + +.. autofunction:: serve_ssl_over_tcp + .. autoclass:: SocketStream :members: :undoc-members: :show-inheritance: -.. autofunction:: open_tcp_stream - -.. autofunction:: open_ssl_over_tcp_stream - .. autoclass:: SocketListener :members: :show-inheritance: diff --git a/trio/_highlevel_open_tcp_listeners.py b/trio/_highlevel_open_tcp_listeners.py index 8a86ea9416..4ac87686b4 100644 --- a/trio/_highlevel_open_tcp_listeners.py +++ b/trio/_highlevel_open_tcp_listeners.py @@ -76,7 +76,8 @@ async def open_tcp_listeners(port, *, host=None, backlog=None): ``"0.0.0.0"`` for IPv4-only and ``"::"`` for IPv6-only. backlog (int or None): The listen backlog to use. If you leave this as - ``None`` then Trio will pick a good default. + ``None`` then Trio will pick a good default. (Currently: whatever + your system has configured as the maximum backlog.) Returns: list of :class:`SocketListener` @@ -142,6 +143,69 @@ async def serve_tcp( handler_nursery=None, task_status=trio.STATUS_IGNORED ): + """Listen for incoming TCP connections, and for each one start a task + running ``handler(stream)``. + + This is a thin convenience wrapper around :func:`open_tcp_listeners` and + :func:`serve_listeners` – see them for full details. + + .. warning:: + + If ``handler`` raises an exception, then this function doesn't do + anything special to catch it – so by default the exception will + propagate out and crash your server. If you don't want this, then catch + exceptions inside your ``handler``, or use a ``handler_nursery`` object + that responds to exceptions in some other way. + + When used with ``nursery.start`` you get back the newly opened listeners. + So, for example, if you want to start a server in your test suite and then + connect to it to check that it's working properly, you can use something + like:: + + from trio.testing import open_stream_to_socket_listener + + async with trio.open_nursery() as nursery: + listeners = await nursery.start(serve_tcp, handler, 0) + client_stream = await open_stream_to_socket_listener(listeners[0]) + + # Then send and receive data on 'client', for example: + await client.send_all(b"GET / HTTP/1.0\r\n\r\n") + + This avoids several common pitfalls: + + 1. It lets the kernel pick a random open port, so your test suite doesn't + depend on any particular port being open. + + 2. It waits for the server to be accepting connections on that port before + ``start`` returns, so there's no race condition where the incoming + connection arrives before the server is ready. + + 3. It uses the Listener object to find out which port was picked, so it + can connect to the right place. + + Args: + handler: The handler to start for each incoming connection. Passed to + :func:`serve_listeners`. + + port: The port to listen on. Use 0 to let the kernel pick an open port. + Passed to :func:`open_tcp_listeners`. + + host (str, bytes, or None): The host interface to listen on; use + ``None`` to bind to the wildcard address. Passed to + :func:`open_tcp_listeners`. + + backlog: The listen backlog, or None to have a good default picked. + Passed to :func:`open_tcp_listeners`. + + handler_nursery: The nursery to start handlers in, or None to use an + internal nursery. Passed to :func:`serve_listeners`. + + task_status: This function can be used with ``nursery.start``. + + Returns: + This function only returns when cancelled. + + """ listeners = await trio.open_tcp_listeners(port, host=host, backlog=backlog) await trio.serve_listeners( handler, diff --git a/trio/_highlevel_serve_listeners.py b/trio/_highlevel_serve_listeners.py index 8014fc06a3..0be6467199 100644 --- a/trio/_highlevel_serve_listeners.py +++ b/trio/_highlevel_serve_listeners.py @@ -57,6 +57,65 @@ async def serve_listeners( handler_nursery=None, task_status=trio.STATUS_IGNORED ): + """Listen for incoming connections on ``listeners``, and for each one + start a task running ``handler(stream)``. + + .. warning:: + + If ``handler`` raises an exception, then this function doesn't do + anything special to catch it – so by default the exception will + propagate out and crash your server. If you don't want this, then catch + exceptions inside your ``handler``, or use a ``handler_nursery`` object + that responds to exceptions in some other way. + + Args: + + handler: An async callable, that will be invoked like + ``handler_nursery.start_soon(handler, stream)`` for each incoming + connection. + + listeners: A list of :class:`~trio.abc.Listener` objects. + :func:`serve_listeners` takes responsibility for closing them. + + handler_nursery: The nursery used to start handlers, or any object with + a ``start_soon`` method. If ``None`` (the default), then + :func:`serve_listeners` will create a new nursery internally and use + that. + + task_status: This function can be used with ``nursery.start``, which + will return ``listeners``. + + Returns: + + This function never returns unless cancelled. + + Resource handling: + + If ``handler`` neglects to close the ``stream``, then it will be closed + using :func:`trio.aclose_forcefully`. + + Error handling: + + Most errors coming from :meth:`~trio.abc.Listener.accept` are allowed to + propagate out (crashing the server in the process). However, some errors – + those which indicate that the server is temporarily overloaded – are + handled specially. These are :class:`OSError`\s with one of the following + errnos: + + * ``EMFILE``: process is out of file descriptors + * ``ENFILE``: system is out of file descriptors + * ``ENOBUFS``, ``ENOMEM``: the kernel hit some sort of memory limitation + when trying to create a socket object + + When :func:`serve_listeners` gets one of these errors, then it: + + * Logs the error to the standard library logger ``trio.serve_listeners`` + (level = ERROR, with exception information included). By default this + causes it to be printed to stderr. + * Waits 100 ms before calling ``accept`` again, in hopes that the + system will recover. + + """ async with trio.open_nursery() as nursery: if handler_nursery is None: handler_nursery = nursery diff --git a/trio/_highlevel_ssl_helpers.py b/trio/_highlevel_ssl_helpers.py index 2dfc5593bf..c2c5033701 100644 --- a/trio/_highlevel_ssl_helpers.py +++ b/trio/_highlevel_ssl_helpers.py @@ -77,8 +77,7 @@ async def open_ssl_over_tcp_listeners( """Start listening for SSL/TLS-encrypted TCP connections to the given port. Args: - port (int or str): The port to listen on. See - :func:`open_tcp_listeners`. + port (int): The port to listen on. See :func:`open_tcp_listeners`. ssl_context (~ssl.SSLContext): The SSL context to use for all incoming connections. host (str, bytes, or None): The address to bind to; use ``None`` to bind @@ -111,6 +110,53 @@ async def serve_ssl_over_tcp( handler_nursery=None, task_status=trio.STATUS_IGNORED ): + """Listen for incoming TCP connections, and for each one start a task + running ``handler(stream)``. + + This is a thin convenience wrapper around + :func:`open_ssl_over_tcp_listeners` and :func:`serve_listeners` – see them + for full details. + + .. warning:: + + If ``handler`` raises an exception, then this function doesn't do + anything special to catch it – so by default the exception will + propagate out and crash your server. If you don't want this, then catch + exceptions inside your ``handler``, or use a ``handler_nursery`` object + that responds to exceptions in some other way. + + When used with ``nursery.start`` you get back the newly opened listeners. + See the documentation for :func:`serve_tcp` for an example where this is + useful. + + Args: + handler: The handler to start for each incoming connection. Passed to + :func:`serve_listeners`. + + port (int): The port to listen on. Use 0 to let the kernel pick + an open port. Ultimately passed to :func:`open_tcp_listeners`. + + ssl_context (~ssl.SSLContext): The SSL context to use for all incoming + connections. Passed to :func:`open_ssl_over_tcp_listeners`. + + host (str, bytes, or None): The address to bind to; use ``None`` to bind + to the wildcard address. Ultimately passed to + :func:`open_tcp_listeners`. + + https_compatible (bool): Set this to True if you want to use + "HTTPS-style" TLS. See :class:`~trio.ssl.SSLStream` for details. + + backlog (int or None): See :class:`~trio.ssl.SSLStream` for details. + + handler_nursery: The nursery to start handlers in, or None to use an + internal nursery. Passed to :func:`serve_listeners`. + + task_status: This function can be used with ``nursery.start``. + + Returns: + This function only returns when cancelled. + + """ listeners = await trio.open_ssl_over_tcp_listeners( port, ssl_context,