Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When a socket/fd is closed, wake up outstanding waiters #460

Merged
merged 3 commits into from
Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1546,6 +1546,8 @@ Exceptions and warnings

.. autoexception:: ResourceBusyError

.. autoexception:: ClosedResourceError

.. autoexception:: RunFinishedError

.. autoexception:: TrioInternalError
Expand Down
40 changes: 40 additions & 0 deletions docs/source/reference-hazmat.rst
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,28 @@ All environments provide the following functions:
:raises trio.ResourceBusyError:
if another task is already waiting for the given socket to
become writable.
:raises trio.ClosedResourceError:
if another task calls :func:`notify_socket_close` while this
function is still working.


.. function:: notify_socket_close(sock)

Notifies Trio's internal I/O machinery that you are about to close
a socket.

This causes any operations currently waiting for this socket to
immediately raise :exc:`~trio.ClosedResourceError`.

This does *not* actually close the socket. Generally when closing a
socket, you should first call this function, and then close the
socket.

The given object *must* be exactly of type :func:`socket.socket`,
nothing else.

:raises TypeError:
if the given object is not of type :func:`socket.socket`.


Unix-specific API
Expand All @@ -174,6 +196,9 @@ Unix-like systems provide the following functions:
:raises trio.ResourceBusyError:
if another task is already waiting for the given fd to
become readable.
:raises trio.ClosedResourceError:
if another task calls :func:`notify_fd_close` while this
function is still working.


.. function:: wait_writable(fd)
Expand All @@ -192,6 +217,21 @@ Unix-like systems provide the following functions:
:raises trio.ResourceBusyError:
if another task is already waiting for the given fd to
become writable.
:raises trio.ClosedResourceError:
if another task calls :func:`notify_fd_close` while this
function is still working.

.. function:: notify_fd_close(fd)

Notifies Trio's internal I/O machinery that you are about to close
a file descriptor.

This causes any operations currently waiting for this file
descriptor to immediately raise :exc:`~trio.ClosedResourceError`.

This does *not* actually close the file descriptor. Generally when
closing a file descriptor, you should first call this function, and
then actually close it.


Kqueue-specific API
Expand Down
5 changes: 0 additions & 5 deletions docs/source/reference-io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,6 @@ Abstract base classes

.. autoexception:: BrokenStreamError

.. autoexception:: ClosedStreamError

.. currentmodule:: trio.abc

.. autoclass:: trio.abc.Listener
Expand All @@ -158,9 +156,6 @@ Abstract base classes

.. currentmodule:: trio

.. autoexception:: ClosedListenerError



Generic stream tools
~~~~~~~~~~~~~~~~~~~~
Expand Down
18 changes: 18 additions & 0 deletions newsfragments/36.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Suppose one task is blocked trying to use a resource – for example,
reading from a socket – and while it's doing this, another task closes
the resource. Previously, this produced undefined behavior. Now,
closing a resource causes pending operations on that resource to
terminate immediately with a :exc:`ClosedResourceError`.

``ClosedStreamError`` and ``ClosedListenerError`` are now aliases for
:exc:`ClosedResourceError`, and deprecated.

For this to work, Trio needs to know when a resource has been closed.
To facilitate this, new functions have been added:
:func:`trio.hazmat.notify_fd_close` and
:func:`trio.hazmat.notify_socket_close`. If you're using Trio's
built-in wrappers like :cls:`~trio.SocketStream` or
:mod:`trio.socket`, then you don't need to worry about this, but if
you're using the low-level functions like
:func:`trio.hazmat.wait_readable`, you should make sure to call these
functions at appropriate times.
18 changes: 18 additions & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@
from . import ssl
# Not imported by default: testing

_deprecate.enable_attribute_deprecations(__name__)
__deprecated_attributes__ = {
"ClosedStreamError":
_deprecate.DeprecatedAttribute(
ClosedResourceError,
"0.5.0",
issue=36,
instead=ClosedResourceError
),
"ClosedListenerError":
_deprecate.DeprecatedAttribute(
ClosedResourceError,
"0.5.0",
issue=36,
instead=ClosedResourceError
),
}

_deprecate.enable_attribute_deprecations(hazmat.__name__)

# Temporary hack to make sure _result is loaded, just during the deprecation
Expand Down
24 changes: 19 additions & 5 deletions trio/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ async def aclose(self):
If the resource is already closed, then this method should silently
succeed.

Once this method completes, any other pending or future operations on
this resource should generally raise :exc:`~trio.ClosedResourceError`,
unless there's a good reason to do otherwise.

See also: :func:`trio.aclose_forcefully`.

"""
Expand Down Expand Up @@ -297,7 +301,9 @@ async def send_all(self, data):
:meth:`HalfCloseableStream.send_eof` on this stream.
trio.BrokenStreamError: if something has gone wrong, and the stream
is broken.
trio.ClosedStreamError: if you already closed this stream object.
trio.ClosedResourceError: if you previously closed this stream
object, or if another task closes this stream object while
:meth:`send_all` is running.

Most low-level operations in trio provide a guarantee: if they raise
:exc:`trio.Cancelled`, this means that they had no effect, so the
Expand Down Expand Up @@ -328,7 +334,9 @@ async def wait_send_all_might_not_block(self):
:meth:`HalfCloseableStream.send_eof` on this stream.
trio.BrokenStreamError: if something has gone wrong, and the stream
is broken.
trio.ClosedStreamError: if you already closed this stream object.
trio.ClosedResourceError: if you previously closed this stream
object, or if another task closes this stream object while
:meth:`wait_send_all_might_not_block` is running.

Note:

Expand Down Expand Up @@ -402,7 +410,9 @@ async def receive_some(self, max_bytes):
:meth:`receive_some` on the same stream at the same time.
trio.BrokenStreamError: if something has gone wrong, and the stream
is broken.
trio.ClosedStreamError: if you already closed this stream object.
trio.ClosedResourceError: if you previously closed this stream
object, or if another task closes this stream object while
:meth:`receive_some` is running.

"""

Expand Down Expand Up @@ -470,7 +480,9 @@ async def send_eof(self):
:meth:`send_eof` on this stream.
trio.BrokenStreamError: if something has gone wrong, and the stream
is broken.
trio.ClosedStreamError: if you already closed this stream object.
trio.ClosedResourceError: if you previously closed this stream
object, or if another task closes this stream object while
:meth:`send_eof` is running.

"""

Expand All @@ -494,7 +506,9 @@ async def accept(self):
Raises:
trio.ResourceBusyError: if two tasks attempt to call
:meth:`accept` on the same listener at the same time.
trio.ClosedListenerError: if you already closed this listener.
trio.ClosedResourceError: if you previously closed this listener
object, or if another task closes this listener object while
:meth:`accept` is running.

Note that there is no ``BrokenListenerError``, because for listeners
there is no general condition of "the network/remote peer broke the
Expand Down
9 changes: 8 additions & 1 deletion trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ async def wait_socket_writable(sock):
raise TypeError("need a socket")
await wait_writable(sock)

__all__ += ["wait_socket_readable", "wait_socket_writable"]
def notify_socket_close(sock):
if type(sock) != _stdlib_socket.socket:
raise TypeError("need a socket")
notify_fd_close(sock)

__all__ += [
"wait_socket_readable", "wait_socket_writable", "notify_socket_close"
]
14 changes: 14 additions & 0 deletions trio/_core/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"WouldBlock",
"Cancelled",
"ResourceBusyError",
"ClosedResourceError",
]


Expand Down Expand Up @@ -103,3 +104,16 @@ class ResourceBusyError(Exception):
the data get scrambled.

"""


class ClosedResourceError(Exception):
"""Raised when attempting to use a resource after it has been closed.

Note that "closed" here means that *your* code closed the resource,
generally by calling a method with a name like ``close`` or ``aclose``, or
by exiting a context manager. If a problem arises elsewhere – for example,
because of a network failure, or because a remote peer closed their end of
a connection – then that should be indicated by a different exception
class, like :exc:`BrokenStreamError` or an :exc:`OSError` subclass.

"""
24 changes: 24 additions & 0 deletions trio/_core/_io_epoll.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import select
import attr
import outcome

from .. import _core
from . import _public
Expand Down Expand Up @@ -122,3 +123,26 @@ async def wait_readable(self, fd):
@_public
async def wait_writable(self, fd):
await self._epoll_wait(fd, "write_task")

@_public
def notify_fd_close(self, fd):
if not isinstance(fd, int):
fd = fd.fileno()
if fd not in self._registered:
return

waiters = self._registered[fd]

def interrupt(task):
exc = _core.ClosedResourceError("another task closed this fd")
_core.reschedule(task, outcome.Error(exc))

if waiters.write_task is not None:
interrupt(waiters.write_task)
waiters.write_task = None

if waiters.read_task is not None:
interrupt(waiters.read_task)
waiters.read_task = None

self._update_registrations(fd, True)
25 changes: 25 additions & 0 deletions trio/_core/_io_kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,28 @@ async def wait_readable(self, fd):
@_public
async def wait_writable(self, fd):
await self._wait_common(fd, select.KQ_FILTER_WRITE)

@_public
def notify_fd_close(self, fd):
if not isinstance(fd, int):
fd = fd.fileno()

for filter in [select.KQ_FILTER_READ, select.KQ_FILTER_WRITE]:
key = (fd, filter)
receiver = self._registered.get(key)

if receiver is None:
continue

if type(receiver) is _core.Task:
event = select.kevent(fd, filter, select.KQ_EV_DELETE)
self._kqueue.control([event], 0)
exc = _core.ClosedResourceError("another task closed this fd")
_core.reschedule(receiver, outcome.Error(exc))
del self._registered[key]
else:
# XX this is an interesting example of a case where being able
# to close a queue would be useful...
raise NotImplementedError(
"can't close an fd that monitor_kevent is using"
)
13 changes: 13 additions & 0 deletions trio/_core/_io_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,19 @@ async def wait_socket_readable(self, sock):
async def wait_socket_writable(self, sock):
await self._wait_socket("write", sock)

@_public
def notify_socket_close(self, sock):
if type(sock) is not stdlib_socket.socket:
raise TypeError("need a stdlib socket")

for mode in ["read", "write"]:
if sock in self._socket_waiters[mode]:
task = self._socket_waiters[mode].pop(sock)
exc = _core.ClosedResourceError(
"another task closed this socket"
)
_core.reschedule(task, outcome.Error(exc))

# This has cffi-isms in it and is untested... but it demonstrates the
# logic we'll want when we start actually using overlapped I/O.
#
Expand Down
Loading