From 00a42264171a5a4b4a0d26dbc77f10d542f8279d Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 29 Jun 2021 16:30:15 +0200 Subject: [PATCH] Ensure shuffle split default durations uses proper prefix See also https://github.com/dask/dask/issues/7844 --- distributed/distributed.yaml | 2 +- distributed/tests/test_scheduler.py | 30 ++++++++++++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 9ea6360b49f..870198bc306 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -28,7 +28,7 @@ distributed: unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h") default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks) rechunk-split: 1us - shuffle-split: 1us + split-shuffle: 1us validate: False # Check scheduler state at every step for debugging dashboard: status: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 418ab7e7f5e..36ba0ae52bb 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -17,7 +17,7 @@ import dask from dask import delayed -from dask.utils import apply +from dask.utils import apply, parse_timedelta from distributed import Client, Nanny, Worker, fire_and_forget, wait from distributed.comm import Comm @@ -1845,6 +1845,34 @@ async def test_get_task_duration(c, s, a, b): assert len(s.unknown_durations["slowinc"]) == 1 +@gen_cluster(client=True) +async def test_default_task_duration_splits(c, s, a, b): + """This test ensures that the default task durations for shuffle split tasks are, by default, aligned with the task names of dask.dask""" + + pd = pytest.importorskip("pandas") + dd = pytest.importorskip("dask.dataframe") + + # We don't care about the actual computation here but we'll schedule one anyhow to verify that we're looking for the correct key + npart = 10 + df = dd.from_pandas(pd.DataFrame({"A": range(100), "B": 1}), npartitions=npart) + graph = df.shuffle( + "A", + shuffle="tasks", + # If we don't have enough partitions, we'll fall back to a simple shuffle + max_branch=npart - 1, + ).sum() + fut = c.compute(graph) + await wait(fut) + + split_prefix = [pre for pre in s.task_prefixes.keys() if "split" in pre] + assert len(split_prefix) == 1 + split_prefix = split_prefix[0] + default_time = parse_timedelta( + dask.config.get("distributed.scheduler.default-task-durations")[split_prefix] + ) + assert default_time <= 1e-6 + + @pytest.mark.asyncio async def test_no_danglng_asyncio_tasks(cleanup): start = asyncio.all_tasks()