Skip to content

Commit 33d83bc

Browse files
authored
Provide stack for suspended coro in test timeout (#5446)
1 parent 918e3fb commit 33d83bc

File tree

3 files changed

+39
-26
lines changed

3 files changed

+39
-26
lines changed

distributed/tests/test_scheduler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ def func(scheduler):
420420
@gen_cluster(
421421
nthreads=[], config={"distributed.scheduler.blocked-handlers": ["test-handler"]}
422422
)
423-
def test_scheduler_init_pulls_blocked_handlers_from_config(s):
423+
async def test_scheduler_init_pulls_blocked_handlers_from_config(s):
424424
assert s.blocked_handlers == ["test-handler"]
425425

426426

distributed/tests/test_utils_test.py

+21-21
Original file line numberDiff line numberDiff line change
@@ -106,27 +106,6 @@ def assert_config():
106106
await c.run_on_scheduler(assert_config)
107107

108108

109-
@gen_cluster(client=True)
110-
def test_gen_cluster_legacy_implicit(c, s, a, b):
111-
assert isinstance(c, Client)
112-
assert isinstance(s, Scheduler)
113-
for w in [a, b]:
114-
assert isinstance(w, Worker)
115-
assert s.nthreads == {w.address: w.nthreads for w in [a, b]}
116-
assert (yield c.submit(lambda: 123)) == 123
117-
118-
119-
@gen_cluster(client=True)
120-
@gen.coroutine
121-
def test_gen_cluster_legacy_explicit(c, s, a, b):
122-
assert isinstance(c, Client)
123-
assert isinstance(s, Scheduler)
124-
for w in [a, b]:
125-
assert isinstance(w, Worker)
126-
assert s.nthreads == {w.address: w.nthreads for w in [a, b]}
127-
assert (yield c.submit(lambda: 123)) == 123
128-
129-
130109
@pytest.mark.skip(reason="This hangs on travis")
131110
def test_gen_cluster_cleans_up_client(loop):
132111
import dask.context
@@ -373,3 +352,24 @@ async def ping_pong():
373352
assert await write_queue.get() == (b.address, {"op": "ping", "reply": True})
374353
write_event.set()
375354
assert await fut == "pong"
355+
356+
357+
@pytest.mark.slow()
358+
def test_provide_stack_on_timeout():
359+
sleep_time = 30
360+
361+
async def inner_test(c, s, a, b):
362+
await asyncio.sleep(sleep_time)
363+
364+
# If this timeout is too small, the cluster setup/teardown might take too
365+
# long and the timeout error we'll receive will be different
366+
test = gen_cluster(client=True, timeout=2)(inner_test)
367+
368+
start = time()
369+
with pytest.raises(asyncio.TimeoutError) as exc:
370+
test()
371+
end = time()
372+
assert "inner_test" in str(exc)
373+
assert "await asyncio.sleep(sleep_time)" in str(exc)
374+
# ensure the task was properly
375+
assert end - start < sleep_time / 2

distributed/utils_test.py

+17-4
Original file line numberDiff line numberDiff line change
@@ -919,7 +919,7 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture
919919

920920
def _(func):
921921
if not iscoroutinefunction(func):
922-
func = gen.coroutine(func)
922+
raise RuntimeError("gen_cluster only works for coroutine functions.")
923923

924924
@functools.wraps(func)
925925
def test_func(*outer_args, **kwargs):
@@ -964,11 +964,24 @@ async def coro():
964964
)
965965
args = [c] + args
966966
try:
967-
future = func(*args, *outer_args, **kwargs)
968-
future = asyncio.wait_for(future, timeout)
969-
result = await future
967+
coro = func(*args, *outer_args, **kwargs)
968+
task = asyncio.create_task(coro)
969+
970+
coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
971+
result = await coro2
970972
if s.validate:
971973
s.validate_state()
974+
except asyncio.TimeoutError as e:
975+
assert task
976+
buffer = io.StringIO()
977+
# This stack indicates where the coro/test is suspended
978+
task.print_stack(file=buffer)
979+
task.cancel()
980+
while not task.cancelled():
981+
await asyncio.sleep(0.01)
982+
raise TimeoutError(
983+
f"Test timeout after {timeout}s.\n{buffer.getvalue()}"
984+
) from e
972985
finally:
973986
if client and c.status not in ("closing", "closed"):
974987
await c._close(fast=s.status == Status.closed)

0 commit comments

Comments
 (0)