-
Notifications
You must be signed in to change notification settings - Fork 5
/
__init__.py
106 lines (93 loc) · 3.77 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""Isolated package for trio-parallel's worker behavior
This allows workers to start up without "heavy" dependencies like trio and attrs.
Users still need to make sure their CPU-bound functions also do not pull in such
packages, but at least we are doing our part."""
import signal
import sys
from inspect import iscoroutine
from pickle import HIGHEST_PROTOCOL
from time import perf_counter
try:
from cloudpickle import dumps, loads
except ImportError:
from pickle import dumps, loads
from outcome import capture, Error
from tblib.pickling_support import install as install_pickling_support
MAX_TIMEOUT = 24.0 * 60.0 * 60.0
ACK = b"\x06"
def handle_job(job):
try:
fn, args = loads(job)
ret = fn(*args)
if iscoroutine(ret):
# Manually close coroutine to avoid RuntimeWarnings
ret.close()
raise TypeError(
"trio-parallel worker expected a sync function, but {!r} appears "
"to be asynchronous".format(getattr(fn, "__qualname__", fn))
)
return ret
except BaseException as e:
install_pickling_support(e)
raise e
def safe_dumps(result):
try:
return dumps(result, protocol=HIGHEST_PROTOCOL)
except BaseException as exc: # noqa: ASYNC103
return dumps(Error(exc), protocol=HIGHEST_PROTOCOL) # noqa: ASYNC104
def safe_poll(recv_pipe, timeout):
deadline = perf_counter() + timeout
while timeout > MAX_TIMEOUT:
if recv_pipe.poll(MAX_TIMEOUT):
return True
timeout = deadline - perf_counter()
else:
return recv_pipe.poll(timeout)
def worker_behavior(recv_pipe, send_pipe, idle_timeout, init, retire):
# Intercept keyboard interrupts to avoid passing KeyboardInterrupt
# between processes. (Trio will take charge via cancellation.)
signal.signal(signal.SIGINT, signal.SIG_IGN)
try:
if sys.platform == "win32":
# Signal successful startup.
send_pipe.send_bytes(ACK)
if isinstance(init, bytes): # true except on "fork"
init = loads(init)
retire = loads(retire)
init()
while safe_poll(recv_pipe, idle_timeout):
send_pipe.send_bytes(
safe_dumps(capture(handle_job, recv_pipe.recv_bytes()))
)
if retire():
break
except (BrokenPipeError, EOFError):
# Graceful shutdown: If the main process closes the pipes, we will
# observe one of these exceptions and can simply exit quietly.
# Closing pipes manually fixed some __del__ flakiness in CI
send_pipe.close()
recv_pipe.close()
return
except BaseException:
# Ensure BrokenWorkerError raised in the main proc.
send_pipe.close()
# recv_pipe must remain open and clear until the main proc closes it.
try:
while True:
recv_pipe.recv_bytes()
except EOFError:
pass
raise
else:
# Clean idle shutdown or retirement: close recv_pipe first to minimize
# subsequent race.
recv_pipe.close()
# Race condition: it is possible to sneak a write through in the main process
# between the while loop predicate and recv_pipe.close(). Naively, this would
# make a clean shutdown look like a broken worker. By sending a sentinel
# value, we can indicate to a waiting main process that we have hit this
# race condition and need a restart. However, the send MUST be non-blocking
# to free this process's resources in a timely manner. Therefore, this message
# can be any size on Windows but must be less than 512 bytes by POSIX.1-2001.
send_pipe.send_bytes(dumps(None, protocol=HIGHEST_PROTOCOL))
send_pipe.close()