-
-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathasyncio_thread.py
131 lines (109 loc) · 4.24 KB
/
asyncio_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# This file is part of Xpra.
# Copyright (C) 2022-2024 Antoine Martin <antoine@xpra.org>
# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
# later version. See the file COPYING for details.
import time
import asyncio
from typing import Any
from collections.abc import Awaitable, Callable
from queue import SimpleQueue
from collections import namedtuple
from collections.abc import Coroutine, Generator
from time import monotonic
from xpra.scripts.config import InitExit
from xpra.util.thread import start_thread
from xpra.util.str_fn import csv
from xpra.util.env import envbool
from xpra.os_util import WIN32
from xpra.log import Logger
log = Logger("quic")
UVLOOP = envbool("XPRA_UVLOOP", not WIN32)
ExceptionWrapper = namedtuple("ExceptionWrapper", "exception,args")
class ThreadedAsyncioLoop:
"""
shim for quic asyncio sockets,
this runs the asyncio main loop in a thread
and provides methods for:
* calling functions as tasks
* turning an async function into a sync function
(for calling async functions from regular threads)
"""
def __init__(self):
self.loop: asyncio.AbstractEventLoop | None = None
start_thread(self.run_forever, "asyncio-thread", True)
self.wait_for_loop()
def run_forever(self) -> None:
if UVLOOP:
try:
# noinspection PyPackageRequirements
import uvloop # pylint: disable=import-outside-toplevel
except ImportError:
log.warn("Warning: uvloop not found")
else:
log("installing uvloop")
uvloop.install()
log.info(f"uvloop {uvloop.__version__} installed")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = loop
self.loop.run_forever()
self.loop.close()
def wait_for_loop(self) -> None:
now = monotonic()
while monotonic() - now < 1 and self.loop is None:
log("waiting for asyncio event loop")
time.sleep(0.01)
if self.loop is None:
raise RuntimeError("no asyncio main loop")
def call(self, f: Callable | Coroutine | Generator) -> None:
log(f"call({f})")
if not self.loop:
raise RuntimeError("no main loop")
if isinstance(f, (Coroutine, Generator)):
def tsafe() -> None:
log(f"creating task for {f}")
assert self.loop
self.loop.create_task(f)
self.loop.call_soon_threadsafe(tsafe)
else:
self.loop.call_soon_threadsafe(f)
def sync(self, async_fn: Callable[..., Awaitable[Any]], *args) -> Any:
response: SimpleQueue[Any] = SimpleQueue()
async def awaitable():
log("awaitable()")
try:
a = await async_fn(*args)
response.put(a)
except InitExit as e:
log(f"error calling async function {async_fn} with {args}", exc_info=True)
response.put(ExceptionWrapper(InitExit, (e.status, str(e))))
except Exception as e:
log(f"error calling async function {async_fn} with {args}", exc_info=True)
response.put(ExceptionWrapper(RuntimeError, (str(e),)))
def tsafe() -> None:
a = awaitable()
log(f"awaitable={a}")
assert self.loop
f = asyncio.run_coroutine_threadsafe(a, self.loop)
log(f"run_coroutine_threadsafe({a}, {self.loop})={f}")
assert self.loop
self.loop.call_soon_threadsafe(tsafe)
log("sync: waiting for response")
r = response.get()
if isinstance(r, ExceptionWrapper):
log(f"sync: re-throwing {r}")
try:
instance = r.exception(*r.args)
except Exception:
log(f"failed to re-throw {r.exception}{r.args}", exc_info=True)
raise RuntimeError(csv(r.args)) from None
else:
raise instance
log(f"sync: response={r}")
return r
singleton: ThreadedAsyncioLoop | None = None
def get_threaded_loop() -> ThreadedAsyncioLoop:
global singleton
if not singleton:
singleton = ThreadedAsyncioLoop()
return singleton