diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 527adeb364..ca95d006a0 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -54,7 +54,7 @@ import dask import dask.utils -from dask._task_spec import DependenciesMapping, GraphNode, convert_legacy_graph +from dask._task_spec import DependenciesMapping, GraphNode, convert_legacy_graph, _MultiContainer from dask.base import TokenizationError, normalize_token, tokenize from dask.core import istask, validate_key from dask.typing import Key, no_default @@ -4868,6 +4868,7 @@ async def update_graph( graph=graph, global_annotations=annotations or {}, keys=keys, + existing_keys=set(self.tasks), validate=self.validate, ) @@ -9392,6 +9393,7 @@ def _materialize_graph( global_annotations: dict[str, Any], validate: bool, keys: set[Key], + existing_keys: set[Key], ) -> tuple[dict[Key, T_runspec], dict[Key, set[Key]], dict[str, dict[Key, Any]]]: dsk: dict = ensure_dict(graph) if validate: @@ -9411,7 +9413,7 @@ def _materialize_graph( {k: (value(k) if callable(value) else value) for k in layer} ) - dsk2 = convert_legacy_graph(dsk) + dsk2 = convert_legacy_graph(dsk, all_keys=_MultiContainer(dsk, existing_keys)) # FIXME: There should be no need to fully materialize and copy this but some # sections in the scheduler are mutating it. dependencies = {k: set(v) for k, v in DependenciesMapping(dsk2).items()}