Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure shuffle split default durations uses proper prefix #4991

Merged
merged 1 commit into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ distributed:
unknown-task-duration: 500ms # Default duration for all tasks with unknown durations ("15m", "2h")
default-task-durations: # How long we expect function names to run ("1h", "1s") (helps for long tasks)
rechunk-split: 1us
shuffle-split: 1us
split-shuffle: 1us
validate: False # Check scheduler state at every step for debugging
dashboard:
status:
Expand Down
30 changes: 29 additions & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import dask
from dask import delayed
from dask.utils import apply
from dask.utils import apply, parse_timedelta

from distributed import Client, Nanny, Worker, fire_and_forget, wait
from distributed.comm import Comm
Expand Down Expand Up @@ -1845,6 +1845,34 @@ async def test_get_task_duration(c, s, a, b):
assert len(s.unknown_durations["slowinc"]) == 1


@gen_cluster(client=True)
async def test_default_task_duration_splits(c, s, a, b):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this test seems to raise TimeoutErrors in a few CI builds

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this is connected to improper cluster shutdowns I've been observing recently. I am using almost identical code for the stealing test with the exception that I'm not awaiting the computation here. I'll add a wait in here and hope this is gone afterwards

"""This test ensures that the default task durations for shuffle split tasks are, by default, aligned with the task names of dask.dask"""

pd = pytest.importorskip("pandas")
dd = pytest.importorskip("dask.dataframe")

# We don't care about the actual computation here but we'll schedule one anyhow to verify that we're looking for the correct key
npart = 10
df = dd.from_pandas(pd.DataFrame({"A": range(100), "B": 1}), npartitions=npart)
graph = df.shuffle(
"A",
shuffle="tasks",
# If we don't have enough partitions, we'll fall back to a simple shuffle
max_branch=npart - 1,
).sum()
fut = c.compute(graph)
await wait(fut)

split_prefix = [pre for pre in s.task_prefixes.keys() if "split" in pre]
assert len(split_prefix) == 1
split_prefix = split_prefix[0]
default_time = parse_timedelta(
dask.config.get("distributed.scheduler.default-task-durations")[split_prefix]
)
assert default_time <= 1e-6
Comment on lines +1870 to +1873
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot assert on the actual known runtime since at this point it is/was measured already. I want to ensure that the actual config value is correct. There are other tests ensuring that these config values are used if they exist



@pytest.mark.asyncio
async def test_no_danglng_asyncio_tasks(cleanup):
start = asyncio.all_tasks()
Expand Down