Skip to content

Commit

Permalink
Dump cluster state on all test failures (dask#5674)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jan 21, 2022
1 parent 5054c19 commit af84e40
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 131 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ jobs:
with:
name: ${{ env.TEST_ID }}
path: reports
- name: Upload timeout reports
- name: Upload gen_cluster dumps for failed tests
# ensure this runs even if pytest fails
if: >
always() &&
(steps.run_tests.outcome == 'success' || steps.run_tests.outcome == 'failure')
uses: actions/upload-artifact@v2
with:
name: ${{ env.TEST_ID }}-timeouts
path: test_timeout_dump
name: ${{ env.TEST_ID }}_cluster_dumps
path: test_cluster_dump
if-no-files-found: ignore
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dask-worker-space/
tags
.ipynb_checkpoints
.venv/
.mypy_cache/

# Test timeouts will dump the cluster state in here
test_timeout_dump/
# Test failures will dump the cluster state in here
test_cluster_dump/
29 changes: 12 additions & 17 deletions distributed/diagnostics/tests/test_worker_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ class MyCustomPlugin(WorkerPlugin):
assert next(iter(w.plugins)).startswith("MyCustomPlugin-")


def test_release_key_deprecated():
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_release_key_deprecated(c, s, a):
class ReleaseKeyDeprecated(WorkerPlugin):
def __init__(self):
self._called = False
Expand All @@ -222,36 +223,30 @@ def teardown(self, worker):
assert self._called
return super().teardown(worker)

@gen_cluster(client=True, nthreads=[("", 1)])
async def test(c, s, a):

await c.register_worker_plugin(ReleaseKeyDeprecated())
fut = await c.submit(inc, 1, key="task")
assert fut == 2
await c.register_worker_plugin(ReleaseKeyDeprecated())

with pytest.deprecated_call(
match="The `WorkerPlugin.release_key` hook is depreacted"
):
test()
assert await c.submit(inc, 1, key="x") == 2
while "x" in a.tasks:
await asyncio.sleep(0.01)


def test_assert_no_warning_no_overload():
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_assert_no_warning_no_overload(c, s, a):
"""Assert we do not receive a deprecation warning if we do not overload any
methods
"""

class Dummy(WorkerPlugin):
pass

@gen_cluster(client=True, nthreads=[("", 1)])
async def test(c, s, a):

await c.register_worker_plugin(Dummy())
fut = await c.submit(inc, 1, key="task")
assert fut == 2

with pytest.warns(None):
test()
await c.register_worker_plugin(Dummy())
assert await c.submit(inc, 1, key="x") == 2
while "x" in a.tasks:
await asyncio.sleep(0.01)


@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
Expand Down
60 changes: 24 additions & 36 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,11 @@ def get(self, key):


@pytest.mark.parametrize("direct_to_workers", [True, False])
def test_client_actions(direct_to_workers):
@gen_cluster(client=True)
async def test(c, s, a, b):
c = await Client(
s.address, asynchronous=True, direct_to_workers=direct_to_workers
)

@gen_cluster()
async def test_client_actions(s, a, b, direct_to_workers):
async with Client(
s.address, asynchronous=True, direct_to_workers=direct_to_workers
) as c:
counter = c.submit(Counter, workers=[a.address], actor=True)
assert isinstance(counter, Future)
counter = await counter
Expand All @@ -86,8 +84,7 @@ async def test(c, s, a, b):
assert hasattr(counter, "add")
assert hasattr(counter, "n")

n = await counter.n
assert n == 0
assert await counter.n == 0

assert counter._address == a.address

Expand All @@ -96,45 +93,36 @@ async def test(c, s, a, b):

await asyncio.gather(counter.increment(), counter.increment())

n = await counter.n
assert n == 2
assert await counter.n == 2

counter.add(10)
while (await counter.n) != 10 + 2:
n = await counter.n
await asyncio.sleep(0.01)

await c.close()

test()


@pytest.mark.parametrize("separate_thread", [False, True])
def test_worker_actions(separate_thread):
@gen_cluster(client=True)
async def test(c, s, a, b):
counter = c.submit(Counter, workers=[a.address], actor=True)
a_address = a.address

def f(counter):
start = counter.n
@gen_cluster(client=True)
async def test_worker_actions(c, s, a, b, separate_thread):
counter = c.submit(Counter, workers=[a.address], actor=True)
a_address = a.address

assert type(counter) is Actor
assert counter._address == a_address
def f(counter):
start = counter.n

future = counter.increment(separate_thread=separate_thread)
assert isinstance(future, ActorFuture)
assert "Future" in type(future).__name__
end = future.result(timeout=1)
assert end > start
assert type(counter) is Actor
assert counter._address == a_address

futures = [c.submit(f, counter, pure=False) for _ in range(10)]
await c.gather(futures)
future = counter.increment(separate_thread=separate_thread)
assert isinstance(future, ActorFuture)
assert "Future" in type(future).__name__
end = future.result(timeout=1)
assert end > start

counter = await counter
assert await counter.n == 10
futures = [c.submit(f, counter, pure=False) for _ in range(10)]
await c.gather(futures)

test()
counter = await counter
assert await counter.n == 10


@gen_cluster(client=True)
Expand Down
78 changes: 35 additions & 43 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5747,43 +5747,37 @@ async def test_client_active_bad_port():


@pytest.mark.parametrize("direct", [True, False])
def test_turn_off_pickle(direct):
@gen_cluster()
async def test(s, a, b):
np = pytest.importorskip("numpy")

async with Client(
s.address, asynchronous=True, serializers=["dask", "msgpack"]
) as c:
assert (await c.submit(inc, 1)) == 2
await c.submit(np.ones, 5)
await c.scatter(1)
@gen_cluster(client=True, client_kwargs={"serializers": ["dask", "msgpack"]})
async def test_turn_off_pickle(c, s, a, b, direct):
np = pytest.importorskip("numpy")

# Can't send complex data
with pytest.raises(TypeError):
future = await c.scatter(inc)
assert (await c.submit(inc, 1)) == 2
await c.submit(np.ones, 5)
await c.scatter(1)

# can send complex tasks (this uses pickle regardless)
future = c.submit(lambda x: x, inc)
await wait(future)
# Can't send complex data
with pytest.raises(TypeError):
await c.scatter(inc)

# but can't receive complex results
with pytest.raises(TypeError):
await c.gather(future, direct=direct)
# can send complex tasks (this uses pickle regardless)
future = c.submit(lambda x: x, inc)
await wait(future)

# Run works
result = await c.run(lambda: 1)
assert list(result.values()) == [1, 1]
result = await c.run_on_scheduler(lambda: 1)
assert result == 1
# but can't receive complex results
with pytest.raises(TypeError):
await c.gather(future, direct=direct)

# But not with complex return values
with pytest.raises(TypeError):
await c.run(lambda: inc)
with pytest.raises(TypeError):
await c.run_on_scheduler(lambda: inc)
# Run works
result = await c.run(lambda: 1)
assert list(result.values()) == [1, 1]
result = await c.run_on_scheduler(lambda: 1)
assert result == 1

test()
# But not with complex return values
with pytest.raises(TypeError):
await c.run(lambda: inc)
with pytest.raises(TypeError):
await c.run_on_scheduler(lambda: inc)


@gen_cluster()
Expand Down Expand Up @@ -6620,21 +6614,19 @@ async def test_annotations_task_state(c, s, a, b):


@pytest.mark.parametrize("fn", ["compute", "persist"])
def test_annotations_compute_time(fn):
@gen_cluster(client=True)
async def test_annotations_compute_time(c, s, a, b, fn):
da = pytest.importorskip("dask.array")
x = da.ones(10, chunks=(5,))

@gen_cluster(client=True)
async def test(c, s, a, b):
x = da.ones(10, chunks=(5,))

with dask.annotate(foo="bar"):
# Turn off optimization to avoid rewriting layers and picking up annotations
# that way. Instead, we want `compute`/`persist` to be able to pick them up.
x = await getattr(c, fn)(x, optimize_graph=False)

assert all({"foo": "bar"} == ts.annotations for ts in s.tasks.values())
with dask.annotate(foo="bar"):
# Turn off optimization to avoid rewriting layers and picking up annotations
# that way. Instead, we want `compute`/`persist` to be able to pick them up.
fut = getattr(c, fn)(x, optimize_graph=False)

test()
await wait(fut)
assert s.tasks
assert all(ts.annotations == {"foo": "bar"} for ts in s.tasks.values())


@pytest.mark.xfail(reason="https://github.com/dask/dask/issues/7036")
Expand Down
30 changes: 17 additions & 13 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,16 @@ def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
nthreads=nthreads,
config={"distributed.scheduler.work-stealing": False},
)
async def test(c, s, *workers):
async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers):
r"""
Ensure that sibling root tasks are scheduled to the same node, reducing future data transfer.
Ensure that sibling root tasks are scheduled to the same node, reducing future
data transfer.
We generate a wide layer of "root" tasks (random NumPy arrays). All of those tasks share 0-5
trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are most common in real-world use
(``ndeps=1`` is basically ``da.from_array(..., inline_array=False)`` or ``da.from_zarr``).
The graph is structured like this (though the number of tasks and workers is different):
We generate a wide layer of "root" tasks (random NumPy arrays). All of those
tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are
most common in real-world use (``ndeps=1`` is basically ``da.from_array(...,
inline_array=False)`` or ``da.from_zarr``). The graph is structured like this
(though the number of tasks and workers is different):
|-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling
Expand All @@ -159,9 +161,9 @@ async def test(c, s, *workers):
\ \ \ | | / / /
TRIVIAL * 0..5
Neighboring `random-` tasks should be scheduled on the same worker. We test that generally,
only one worker holds each row of the array, that the `random-` tasks are never transferred,
and that there are few transfers overall.
Neighboring `random-` tasks should be scheduled on the same worker. We test that
generally, only one worker holds each row of the array, that the `random-` tasks
are never transferred, and that there are few transfers overall.
"""
da = pytest.importorskip("dask.array")
np = pytest.importorskip("numpy")
Expand Down Expand Up @@ -222,16 +224,18 @@ def random(**kwargs):
keys = log["keys"]
# The root-ish tasks should never be transferred
assert not any(k.startswith("random") for k in keys), keys
# `object-` keys (the trivial deps of the root random tasks) should be transferred
# `object-` keys (the trivial deps of the root random tasks) should be
# transferred
if any(not k.startswith("object") for k in keys):
# But not many other things should be
unexpected_transfers.append(list(keys))

# A transfer at the very end to move aggregated results is fine (necessary with unbalanced workers in fact),
# but generally there should be very very few transfers.
# A transfer at the very end to move aggregated results is fine (necessary with
# unbalanced workers in fact), but generally there should be very very few
# transfers.
assert len(unexpected_transfers) <= 3, unexpected_transfers

test()
test_decide_worker_coschedule_order_neighbors_()


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
Expand Down
19 changes: 8 additions & 11 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,7 @@ async def assert_balanced(inp, expected, c, s, *workers):
raise Exception(f"Expected: {expected2}; got: {result2}")


@pytest.mark.slow
@pytest.mark.parametrize(
"inp,expected",
[
Expand Down Expand Up @@ -733,19 +734,15 @@ async def assert_balanced(inp, expected, c, s, *workers):
],
)
def test_balance(inp, expected):
async def test(*args, **kwargs):
async def test_balance_(*args, **kwargs):
await assert_balanced(inp, expected, *args, **kwargs)

test = gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * len(inp),
config={
"distributed.scheduler.default-task-durations": {
str(i): 1 for i in range(10)
}
},
)(test)
test()
config = {
"distributed.scheduler.default-task-durations": {str(i): 1 for i in range(10)}
}
gen_cluster(client=True, nthreads=[("", 1)] * len(inp), config=config)(
test_balance_
)()


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2, Worker=Nanny, timeout=60)
Expand Down
Loading

0 comments on commit af84e40

Please sign in to comment.