-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtest_open_in_process.py
348 lines (275 loc) · 10.8 KB
/
test_open_in_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
import asyncio
import concurrent
import pickle
import signal
import pytest
import trio
from async_exit_stack import (
AsyncExitStack,
)
from asyncio_run_in_process import (
ProcessKilled,
constants,
main,
open_in_process,
open_in_process_with_trio,
)
from asyncio_run_in_process.exceptions import (
ChildCancelled,
)
from asyncio_run_in_process.process import (
Process,
)
from asyncio_run_in_process.state import (
State,
)
from asyncio_run_in_process.tools.sleep import (
sleep,
)
@pytest.fixture(params=('use_trio', 'use_asyncio'))
def open_in_proc(request):
if request.param == 'use_trio':
return open_in_process_with_trio
elif request.param == 'use_asyncio':
return open_in_process
else:
raise Exception("Invariant")
@pytest.mark.asyncio
async def test_SIGINT_on_method_using_run_in_executor():
# This test exists only to show that one needs to be carefull when using run_in_executor() as
# asyncio does not cancel the thread/process it starts, so we need to make sure they return or
# else open_in_process() hangs forever. In the code below, this is achieved by setting the
# stop_loop event before the method passed to open_in_process() returns. If we don't set that
# event, the test hangs forever.
async def loop_forever_in_executor():
import threading
stop_loop = threading.Event()
def thread_loop():
import time
while not stop_loop.is_set():
time.sleep(0.01)
loop = asyncio.get_event_loop()
try:
await loop.run_in_executor(None, thread_loop)
finally:
stop_loop.set()
async with open_in_process(loop_forever_in_executor) as proc:
proc.send_signal(signal.SIGINT)
assert proc.returncode == 2
@pytest.mark.asyncio
async def test_open_in_proc_SIGTERM_while_running(open_in_proc):
async def do_sleep_forever():
while True:
await sleep(0)
async with open_in_proc(do_sleep_forever) as proc:
proc.terminate()
assert proc.returncode == 15
@pytest.mark.asyncio
async def test_open_in_proc_SIGKILL_while_running(open_in_proc):
async def do_sleep_forever():
while True:
await sleep(0)
async with open_in_proc(do_sleep_forever) as proc:
await proc.kill()
assert proc.returncode == -9
assert isinstance(proc.error, ProcessKilled)
@pytest.mark.asyncio
async def test_open_proc_SIGINT_while_running(open_in_proc):
async def do_sleep_forever():
while True:
await sleep(0)
async with open_in_proc(do_sleep_forever) as proc:
proc.send_signal(signal.SIGINT)
assert proc.returncode == 2
@pytest.mark.asyncio
async def test_open_proc_SIGINT_can_be_handled(open_in_proc):
async def do_sleep_forever():
try:
while True:
await sleep(0)
except KeyboardInterrupt:
return 9999
async with open_in_proc(do_sleep_forever) as proc:
proc.send_signal(signal.SIGINT)
assert proc.returncode == 0
assert proc.get_result_or_raise() == 9999
@pytest.mark.asyncio
async def test_open_proc_SIGINT_can_be_ignored(open_in_proc):
async def do_sleep_forever():
try:
while True:
await sleep(0)
except KeyboardInterrupt:
# silence the first SIGINT
pass
try:
while True:
await sleep(0)
except KeyboardInterrupt:
return 9999
async with open_in_proc(do_sleep_forever) as proc:
proc.send_signal(signal.SIGINT)
await asyncio.sleep(0.01)
proc.send_signal(signal.SIGINT)
assert proc.returncode == 0
assert proc.get_result_or_raise() == 9999
@pytest.mark.asyncio
async def test_open_proc_invalid_function_call(open_in_proc):
async def takes_no_args():
pass
async with open_in_proc(takes_no_args, 1, 2, 3) as proc:
pass
assert proc.returncode == 1
assert isinstance(proc.error, TypeError)
@pytest.mark.asyncio
async def test_open_proc_unpickleable_params(touch_path, open_in_proc):
async def takes_open_file(f):
pass
with pytest.raises(pickle.PickleError):
with open(touch_path, "w") as touch_file:
async with open_in_proc(takes_open_file, touch_file):
# this code block shouldn't get executed
assert False # noqa: B011
@pytest.mark.asyncio
async def test_open_proc_KeyboardInterrupt_while_running():
async def do_sleep_forever():
while True:
await asyncio.sleep(0)
with pytest.raises(KeyboardInterrupt):
async with open_in_process(do_sleep_forever) as proc:
raise KeyboardInterrupt
assert proc.returncode == 2
# XXX: For some reason this test hangs forever if we use the open_in_proc fixture, so
# we have to have duplicate versions of it for trio/asyncio.
@pytest.mark.asyncio
async def test_open_proc_with_trio_KeyboardInterrupt_while_running():
sleep = trio.sleep
async def do_sleep_forever():
while True:
await sleep(0)
with pytest.raises(KeyboardInterrupt):
async with open_in_process_with_trio(do_sleep_forever) as proc:
raise KeyboardInterrupt
assert proc.returncode == 2
@pytest.mark.asyncio
async def test_open_proc_does_not_hang_on_exception(open_in_proc):
class CustomException(BaseException):
pass
async def raise_():
await sleep(0.01)
raise CustomException("Just a boring exception")
async def _do_inner():
with pytest.raises(CustomException):
async with open_in_proc(raise_) as proc:
await proc.wait_result_or_raise()
await asyncio.wait_for(_do_inner(), timeout=1)
@pytest.mark.asyncio
async def test_cancelled_error_in_child():
# An asyncio.CancelledError from the child process will be converted into a ChildCancelled.
async def raise_err():
await asyncio.sleep(0.01)
raise asyncio.CancelledError()
async def _do_inner():
async with open_in_process(raise_err) as proc:
await proc.wait_result_or_raise()
with pytest.raises(ChildCancelled):
await asyncio.wait_for(_do_inner(), timeout=1)
@pytest.mark.asyncio
async def test_task_cancellation(monkeypatch):
# If the task executing open_in_process() is cancelled, we will ask the child proc to
# terminate and propagate the CancelledError.
async def store_received_signals():
# Return only when we receive a SIGTERM, also checking that we received a SIGINT before
# the SIGTERM.
received_signals = []
loop = asyncio.get_event_loop()
for sig in [signal.SIGINT, signal.SIGTERM]:
loop.add_signal_handler(sig, received_signals.append, sig)
while True:
if signal.SIGTERM in received_signals:
assert [signal.SIGINT, signal.SIGTERM] == received_signals
return
await asyncio.sleep(0)
child_started = asyncio.Event()
async def runner():
async with open_in_process(store_received_signals) as proc:
child_started.set()
await proc.wait_result_or_raise()
monkeypatch.setattr(constants, 'SIGINT_TIMEOUT_SECONDS', 0.2)
monkeypatch.setattr(constants, 'SIGTERM_TIMEOUT_SECONDS', 0.2)
task = asyncio.ensure_future(runner())
await asyncio.wait_for(child_started.wait(), timeout=1)
assert not task.done()
task.cancel()
# For some reason, using pytest.raises() here doesn't seem to prevent the
# asyncio.CancelledError from closing the event loop, causing subsequent tests to fail.
raised_cancelled_error = False
try:
await asyncio.wait_for(task, timeout=1)
except asyncio.CancelledError:
raised_cancelled_error = True
assert raised_cancelled_error
@pytest.mark.asyncio
async def test_timeout_waiting_for_executing_state(open_in_proc, monkeypatch):
async def wait_for_state(self, state):
if state is State.EXECUTING:
await asyncio.sleep(constants.STARTUP_TIMEOUT_SECONDS + 0.1)
monkeypatch.setattr(Process, 'wait_for_state', wait_for_state)
monkeypatch.setattr(constants, 'STARTUP_TIMEOUT_SECONDS', 1)
async def do_sleep_forever():
while True:
await sleep(0.1)
with pytest.raises(asyncio.TimeoutError):
async with open_in_proc(do_sleep_forever):
pass
@pytest.mark.asyncio
async def test_timeout_waiting_for_pid(open_in_proc, monkeypatch):
async def wait_pid(self):
await asyncio.sleep(constants.STARTUP_TIMEOUT_SECONDS + 0.1)
monkeypatch.setattr(Process, 'wait_pid', wait_pid)
monkeypatch.setattr(constants, 'STARTUP_TIMEOUT_SECONDS', 1)
async def do_sleep_forever():
while True:
await sleep(0.1)
with pytest.raises(asyncio.TimeoutError):
async with open_in_proc(do_sleep_forever):
pass
@pytest.mark.asyncio
async def test_max_processes(monkeypatch, open_in_proc):
async def do_sleep_forever():
while True:
await sleep(0.2)
max_procs = 4
executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_procs)
# We need to monkeypatch _get_executor() instead of setting the
# ASYNCIO_RUN_IN_PROCESS_MAX_PROCS environment variable because if another test runs before us
# it will cause _get_executor() to store a global executor created using the default number of
# max procs and then when it got called again here it'd reuse that executor.
monkeypatch.setattr(main, '_get_executor', lambda: executor)
monkeypatch.setenv('ASYNCIO_RUN_IN_PROCESS_STARTUP_TIMEOUT', str(1))
above_limit_proc_created = False
procs = []
async with AsyncExitStack() as stack:
for _ in range(max_procs):
proc = await stack.enter_async_context(open_in_proc(do_sleep_forever))
procs.append(proc)
for proc in procs:
assert proc.state is State.EXECUTING
try:
async with open_in_proc(do_sleep_forever) as proc:
# This should not execute as the above should raise a TimeoutError, but in case it
# doesn't happen we need to ensure the proc is terminated so we can leave the
# context and fail the test below.
proc.send_signal(signal.SIGINT)
except asyncio.TimeoutError:
pass
else:
above_limit_proc_created = True
finally:
for proc in procs:
proc.send_signal(signal.SIGINT)
# We want to fail the test only after we leave the AsyncExitStack(), or else it will pass the
# exception along when returning control to open_in_proc(), which will interpret it as a
# failure of the process to exit and send a SIGKILL (together with a warning).
if above_limit_proc_created:
assert False, "This process must not have been created successfully"