-
Notifications
You must be signed in to change notification settings - Fork 48
/
Copy pathtest_qthreadexec.py
50 lines (36 loc) · 1.23 KB
/
test_qthreadexec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# © 2018 Gerard Marull-Paretas <gerard@teslabs.com>
# © 2014 Mark Harviston <mark.harviston@gmail.com>
# © 2014 Arve Knudsen <arve.knudsen@gmail.com>
# BSD License
import pytest
import qasync
@pytest.fixture
def executor(request):
exe = qasync.QThreadExecutor(5)
request.addfinalizer(exe.shutdown)
return exe
@pytest.fixture
def shutdown_executor():
exe = qasync.QThreadExecutor(5)
exe.shutdown()
return exe
def test_shutdown_after_shutdown(shutdown_executor):
with pytest.raises(RuntimeError):
shutdown_executor.shutdown()
def test_ctx_after_shutdown(shutdown_executor):
with pytest.raises(RuntimeError):
with shutdown_executor:
pass
def test_submit_after_shutdown(shutdown_executor):
with pytest.raises(RuntimeError):
shutdown_executor.submit(None)
def test_stack_recursion_limit(executor):
# Test that worker threads have sufficient stack size for the default
# sys.getrecursionlimit. If not this should fail with SIGSEGV or SIGBUS
# (or event SIGILL?)
def rec(a, *args, **kwargs):
rec(a, *args, **kwargs)
fs = [executor.submit(rec, 1) for _ in range(10)]
for f in fs:
with pytest.raises(RecursionError):
f.result()