Skip to content

Commit

Permalink
Merge pull request #1551 from njsmith/guest-loop
Browse files Browse the repository at this point in the history
"Guest mode", for cohabitation with Qt etc.
  • Loading branch information
oremanj authored Jun 2, 2020
2 parents dd4dbe0 + 3083de5 commit e0af102
Show file tree
Hide file tree
Showing 13 changed files with 1,530 additions and 241 deletions.
404 changes: 402 additions & 2 deletions docs/source/reference-lowlevel.rst

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions docs/source/reference-testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ Test harness integration
.. decorator:: trio_test


.. _testing-time:

Time and timeouts
-----------------

Expand Down
3 changes: 3 additions & 0 deletions newsfragments/399.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
If you want to use Trio, but are stuck with some other event loop like
Qt or PyGame, then good news: now you can have both. For details, see:
:ref:`guest-mode`.
48 changes: 48 additions & 0 deletions notes-to-self/aio-guest-test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import asyncio
import trio

async def aio_main():
loop = asyncio.get_running_loop()

trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
print(f"trio_main finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)

trio.lowlevel.start_guest_run(
trio_main,
run_sync_soon_threadsafe=loop.call_soon_threadsafe,
done_callback=trio_done_callback,
)

(await trio_done_fut).unwrap()


async def trio_main():
print("trio_main!")

to_trio, from_aio = trio.open_memory_channel(float("inf"))
from_trio = asyncio.Queue()

asyncio.create_task(aio_pingpong(from_trio, to_trio))

from_trio.put_nowait(0)

async for n in from_aio:
print(f"trio got: {n}")
await trio.sleep(1)
from_trio.put_nowait(n + 1)
if n >= 10:
return

async def aio_pingpong(from_trio, to_trio):
print("aio_pingpong!")

while True:
n = await from_trio.get()
print(f"aio got: {n}")
await asyncio.sleep(1)
to_trio.send_nowait(n + 1)


asyncio.run(aio_main())
1 change: 1 addition & 0 deletions trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
wait_writable,
notify_closing,
Nursery,
start_guest_run,
)

# Has to come after _run to resolve a circular import
Expand Down
26 changes: 23 additions & 3 deletions trio/_core/_io_epoll.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .. import _core
from ._run import _public
from ._io_common import wake_all
from ._wakeup_socketpair import WakeupSocketpair


@attr.s(slots=True, eq=False, frozen=True)
Expand Down Expand Up @@ -184,6 +185,12 @@ class EpollIOManager:
_epoll = attr.ib(factory=select.epoll)
# {fd: EpollWaiters}
_registered = attr.ib(factory=lambda: defaultdict(EpollWaiters))
_force_wakeup = attr.ib(factory=WakeupSocketpair)
_force_wakeup_fd = attr.ib(default=None)

def __attrs_post_init__(self):
self._epoll.register(self._force_wakeup.wakeup_sock, select.EPOLLIN)
self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno()

def statistics(self):
tasks_waiting_read = 0
Expand All @@ -200,13 +207,26 @@ def statistics(self):

def close(self):
self._epoll.close()
self._force_wakeup.close()

def force_wakeup(self):
self._force_wakeup.wakeup_thread_and_signal_safe()

# Called internally by the task runner:
def handle_io(self, timeout):
# Return value must be False-y IFF the timeout expired, NOT if any I/O
# happened or force_wakeup was called. Otherwise it can be anything; gets
# passed straight through to process_events.
def get_events(self, timeout):
# max_events must be > 0 or epoll gets cranky
# accessing self._registered from a thread looks dangerous, but it's
# OK because it doesn't matter if our value is a little bit off.
max_events = max(1, len(self._registered))
events = self._epoll.poll(timeout, max_events)
return self._epoll.poll(timeout, max_events)

def process_events(self, events):
for fd, flags in events:
if fd == self._force_wakeup_fd:
self._force_wakeup.drain()
continue
waiters = self._registered[fd]
# EPOLLONESHOT always clears the flags when an event is delivered
waiters.current_flags = 0
Expand Down
21 changes: 20 additions & 1 deletion trio/_core/_io_kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from .. import _core
from ._run import _public
from ._wakeup_socketpair import WakeupSocketpair


@attr.s(slots=True, eq=False, frozen=True)
Expand All @@ -21,6 +22,15 @@ class KqueueIOManager:
_kqueue = attr.ib(factory=select.kqueue)
# {(ident, filter): Task or UnboundedQueue}
_registered = attr.ib(factory=dict)
_force_wakeup = attr.ib(factory=WakeupSocketpair)
_force_wakeup_fd = attr.ib(default=None)

def __attrs_post_init__(self):
force_wakeup_event = select.kevent(
self._force_wakeup.wakeup_sock, select.KQ_FILTER_READ, select.KQ_EV_ADD
)
self._kqueue.control([force_wakeup_event], 0)
self._force_wakeup_fd = self._force_wakeup.wakeup_sock.fileno()

def statistics(self):
tasks_waiting = 0
Expand All @@ -35,7 +45,10 @@ def statistics(self):
def close(self):
self._kqueue.close()

def handle_io(self, timeout):
def force_wakeup(self):
self._force_wakeup.wakeup_thread_and_signal_safe()

def get_events(self, timeout):
# max_events must be > 0 or kqueue gets cranky
# and we generally want this to be strictly larger than the actual
# number of events we get, so that we can tell that we've gotten
Expand All @@ -50,8 +63,14 @@ def handle_io(self, timeout):
else:
timeout = 0
# and loop back to the start
return events

def process_events(self, events):
for event in events:
key = (event.ident, event.filter)
if event.ident == self._force_wakeup_fd:
self._force_wakeup.drain()
continue
receiver = self._registered[key]
if event.flags & select.KQ_EV_ONESHOT:
del self._registered[key]
Expand Down
21 changes: 17 additions & 4 deletions trio/_core/_io_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ class CKeys(enum.IntEnum):
AFD_POLL = 0
WAIT_OVERLAPPED = 1
LATE_CANCEL = 2
USER_DEFINED = 3 # and above
FORCE_WAKEUP = 3
USER_DEFINED = 4 # and above


def _check(success):
Expand Down Expand Up @@ -388,7 +389,14 @@ def statistics(self):
completion_key_monitors=len(self._completion_key_queues),
)

def handle_io(self, timeout):
def force_wakeup(self):
_check(
kernel32.PostQueuedCompletionStatus(
self._iocp, 0, CKeys.FORCE_WAKEUP, ffi.NULL
)
)

def get_events(self, timeout):
received = ffi.new("PULONG")
milliseconds = round(1000 * timeout)
if timeout > 0 and milliseconds == 0:
Expand All @@ -402,8 +410,11 @@ def handle_io(self, timeout):
except OSError as exc:
if exc.winerror != ErrorCodes.WAIT_TIMEOUT: # pragma: no cover
raise
return
for i in range(received[0]):
return 0
return received[0]

def process_events(self, received):
for i in range(received):
entry = self._events[i]
if entry.lpCompletionKey == CKeys.AFD_POLL:
lpo = entry.lpOverlapped
Expand Down Expand Up @@ -465,6 +476,8 @@ def handle_io(self, timeout):
# try changing this line to
# _core.reschedule(waiter, outcome.Error(exc))
raise exc
elif entry.lpCompletionKey == CKeys.FORCE_WAKEUP:
pass
else:
# dispatch on lpCompletionKey
queue = self._completion_key_queues[entry.lpCompletionKey]
Expand Down
52 changes: 29 additions & 23 deletions trio/_core/_ki.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from contextlib import contextmanager
from functools import wraps
import attr

import async_generator

Expand Down Expand Up @@ -170,26 +171,31 @@ def wrapper(*args, **kwargs):
disable_ki_protection.__name__ = "disable_ki_protection"


@contextmanager
def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
if (
not is_main_thread()
or signal.getsignal(signal.SIGINT) != signal.default_int_handler
):
yield
return

def handler(signum, frame):
assert signum == signal.SIGINT
protection_enabled = ki_protection_enabled(frame)
if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
deliver_cb()
else:
raise KeyboardInterrupt

signal.signal(signal.SIGINT, handler)
try:
yield
finally:
if signal.getsignal(signal.SIGINT) is handler:
signal.signal(signal.SIGINT, signal.default_int_handler)
@attr.s
class KIManager:
handler = attr.ib(default=None)

def install(self, deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
assert self.handler is None
if (
not is_main_thread()
or signal.getsignal(signal.SIGINT) != signal.default_int_handler
):
return

def handler(signum, frame):
assert signum == signal.SIGINT
protection_enabled = ki_protection_enabled(frame)
if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
deliver_cb()
else:
raise KeyboardInterrupt

self.handler = handler
signal.signal(signal.SIGINT, handler)

def close(self):
if self.handler is not None:
if signal.getsignal(signal.SIGINT) is self.handler:
signal.signal(signal.SIGINT, signal.default_int_handler)
self.handler = None
Loading

0 comments on commit e0af102

Please sign in to comment.