Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/dask/distributed into 313
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau committed Nov 13, 2024
2 parents ef558f5 + 5bedd3f commit 0fd7889
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test-report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
mv test_report.html test_short_report.html deploy/
- name: Deploy 🚀
uses: JamesIves/github-pages-deploy-action@v4.6.8
uses: JamesIves/github-pages-deploy-action@v4.6.9
with:
branch: gh-pages
folder: deploy
22 changes: 4 additions & 18 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,9 @@

import dask
import dask.utils
from dask._task_spec import (
DependenciesMapping,
GraphNode,
convert_legacy_graph,
resolve_aliases,
)
from dask._task_spec import DependenciesMapping, GraphNode, convert_legacy_graph
from dask.base import TokenizationError, normalize_token, tokenize
from dask.core import istask, reverse_dict, validate_key
from dask.core import istask, validate_key
from dask.typing import Key, no_default
from dask.utils import (
_deprecated,
Expand Down Expand Up @@ -9411,19 +9406,10 @@ def _materialize_graph(
)

dsk2 = convert_legacy_graph(dsk)
dependents = reverse_dict(DependenciesMapping(dsk2))
# This is removing weird references like "x-foo": "foo" which often make up
# a substantial part of the graph
# This also performs culling!
dsk3 = resolve_aliases(dsk2, keys, dependents)

logger.debug(
"Removing aliases. Started with %i and got %i left", len(dsk2), len(dsk3)
)
# FIXME: There should be no need to fully materialize and copy this but some
# sections in the scheduler are mutating it.
dependencies = {k: set(v) for k, v in DependenciesMapping(dsk3).items()}
return dsk3, dependencies, annotations_by_type
dependencies = {k: set(v) for k, v in DependenciesMapping(dsk2).items()}
return dsk2, dependencies, annotations_by_type


def _cull(dsk: dict[Key, GraphNode], keys: set[Key]) -> dict[Key, GraphNode]:
Expand Down
14 changes: 14 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5304,3 +5304,17 @@ async def before_close(self):
async def test_rootish_taskgroup_configuration(c, s, *workers):
assert s.rootish_tg_threshold == 10
assert s.rootish_tg_dependencies_threshold == 15


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_alias_resolving_break_queuing(c, s, a):
pytest.importorskip("numpy")
import dask.array as da

arr = da.random.random((90, 100), chunks=(10, 50))
result = arr.rechunk(((10, 7, 7, 6) * 3, (50, 50)))
result = result.sum(split_every=1000)
x = result.persist()
while not s.tasks:
await asyncio.sleep(0.01)
assert sum([s.is_rootish(v) for v in s.tasks.values()]) == 18
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ requires-python = ">=3.10"
dependencies = [
"click >= 8.0",
"cloudpickle >= 3.0.0",
"dask == 2024.10.0",
"dask == 2024.11.2",
"jinja2 >= 2.10.3",
"locket >= 1.0.0",
"msgpack >= 1.0.2",
Expand Down Expand Up @@ -225,8 +225,7 @@ source = ["distributed"]
omit = [
"distributed/tests/test*",
"distributed/*/tests/test*",
"distributed/cli/utils.py",
"distributed/cli/dask_spec.py",
"distributed/cli/*",
"distributed/deploy/ssh.py",
"distributed/_version.py",
"distributed/pytest_resourceleaks.py",
Expand Down

0 comments on commit 0fd7889

Please sign in to comment.