Skip to content

Commit

Permalink
minor
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Dec 18, 2023
2 parents d9258ac + 8c3eb6f commit 82fd10b
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 54 deletions.
17 changes: 6 additions & 11 deletions distributed/deploy/tests/test_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import asyncio
import logging

import pytest
Expand Down Expand Up @@ -76,21 +75,17 @@ async def test_raise_if_scheduler_fails_to_start():


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@pytest.mark.slow
@gen_test()
async def test_subprocess_cluster_does_not_depend_on_logging():
async def _start():
async with SubprocessCluster(
asynchronous=True,
dashboard_address=":0",
scheduler_kwargs={"idle_timeout": "5s"},
worker_kwargs={"death_timeout": "5s"},
):
pass

with new_config_file(
{"distributed": {"logging": {"distributed": logging.CRITICAL + 1}}}
):
await asyncio.wait_for(_start(), timeout=2)
async with SubprocessCluster(
asynchronous=True, dashboard_address=":0"
) as cluster, Client(cluster, asynchronous=True) as client:
result = await client.submit(lambda x: x + 1, 10)
assert result == 11


@pytest.mark.skipif(
Expand Down
7 changes: 7 additions & 0 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ def __init__( # type: ignore[no-untyped-def]
self.Worker = Worker if worker_class is None else worker_class

self.pre_spawn_env = _get_env_variables("distributed.nanny.pre-spawn-environ")
# To get consistent hashing on subprocesses, we need to set a consistent seed for
# the Python hash algorithm; xref https://github.com/dask/distributed/pull/8400
if self.pre_spawn_env.get("PYTHONHASHSEED") in (None, "0"):
# This number is arbitrary; it was chosen to commemorate
# https://github.com/dask/dask/issues/6640.
self.pre_spawn_env.update({"PYTHONHASHSEED": "6640"})

self.env = merge(
self.pre_spawn_env,
_get_env_variables("distributed.nanny.environ"),
Expand Down
22 changes: 20 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,15 @@ class TaskState:
#: be rejected.
run_id: int | None

#: Whether to consider this task rootish in the context of task queueing
#: True
#: Always consider this task rootish
#: False
#: Never consider this task rootish
#: None
#: Use a heuristic to determine whether this task should be considered rootish
_rootish: bool | None

#: Cached hash of :attr:`~TaskState.client_key`
_hash: int

Expand Down Expand Up @@ -1430,6 +1439,7 @@ def __init__(
self.metadata = None
self.annotations = None
self.erred_on = None
self._rootish = None
self.run_id = None
TaskState._instances.add(self)

Expand Down Expand Up @@ -2909,8 +2919,11 @@ def is_rootish(self, ts: TaskState) -> bool:
Whether ``ts`` is a root or root-like task.
Root-ish tasks are part of a group that's much larger than the cluster,
and have few or no dependencies.
and have few or no dependencies. Tasks may also be explicitly marked as rootish
to override this heuristic.
"""
if ts._rootish is not None:
return ts._rootish
if ts.resource_restrictions or ts.worker_restrictions or ts.host_restrictions:
return False
tg = ts.group
Expand Down Expand Up @@ -6922,7 +6935,12 @@ def workers_to_close(
if isinstance(key, bytes):
key = pickle.loads(key)

groups = groupby(key, self.workers.values())
# Long running tasks typically use a worker_client to schedule
# other tasks. We should never shut down the worker they're
# running on, as it would cause them to restart from scratch
# somewhere else.
valid_workers = [ws for ws in self.workers.values() if not ws.long_running]
groups = groupby(key, valid_workers)

limit_bytes = {k: sum(ws.memory_limit for ws in v) for k, v in groups.items()}
group_bytes = {k: sum(ws.nbytes for ws in v) for k, v in groups.items()}
Expand Down
16 changes: 10 additions & 6 deletions distributed/shuffle/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,22 @@ def cull(self, keys: Iterable[str], all_keys: Any) -> tuple[HashJoinP2PLayer, di

def _construct_graph(self) -> dict[tuple | str, tuple]:
token_left = tokenize(
"hash-join",
# Include self.name to ensure that shuffle IDs are unique for individual
# merge operations. Reusing shuffles between merges is dangerous because of
# required coordination and complexity introduced through dynamic clusters.
self.name,
self.name_input_left,
self.left_on,
self.npartitions,
self.parts_out,
self.left_index,
)
token_right = tokenize(
"hash-join",
# Include self.name to ensure that shuffle IDs are unique for individual
# merge operations. Reusing shuffles between merges is dangerous because of
# required coordination and complexity introduced through dynamic clusters.
self.name,
self.name_input_right,
self.right_on,
self.npartitions,
self.parts_out,
self.right_index,
)
dsk: dict[tuple | str, tuple] = {}
name_left = "hash-join-transfer-" + token_left
Expand Down
75 changes: 71 additions & 4 deletions distributed/shuffle/_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def get_or_create(
self._raise_if_barrier_unknown(spec.id)
self._raise_if_task_not_processing(key)
worker_for = self._calculate_worker_for(spec)
self._ensure_output_tasks_are_non_rootish(spec)
state = spec.create_new_run(worker_for)
self.active_shuffles[spec.id] = state
self._shuffles[spec.id].add(state)
Expand All @@ -170,6 +171,11 @@ def _raise_if_task_not_processing(self, key: Key) -> None:
def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]:
"""Pin the outputs of a P2P shuffle to specific workers.
The P2P implementation of a hash join combines the loading of shuffled output
partitions for the left and right side with the actual merge operation into a
single output task. As a consequence, we need to make sure that shuffles with
shared output tasks align on the output mapping.
Parameters
----------
id: ID of the shuffle to pin
Expand All @@ -180,7 +186,7 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]:
This function assumes that the barrier task and the output tasks share
the same worker restrictions.
"""
mapping = {}
existing: dict[Any, str] = {}
shuffle_id = spec.id
barrier = self.scheduler.tasks[barrier_key(shuffle_id)]

Expand All @@ -189,10 +195,71 @@ def _calculate_worker_for(self, spec: ShuffleSpec) -> dict[Any, str]:
else:
workers = list(self.scheduler.workers)

# Check if this shuffle shares output tasks with a different shuffle that has
# already been initialized and needs to be taken into account when
# mapping output partitions to workers.
# Naively, you could delete this whole paragraph and just call
# spec.pick_worker; it would return two identical sets of results on both calls
# of this method... until the set of available workers changes between the two
# calls, which would cause misaligned shuffle outputs and a deadlock.
seen = {barrier}
for dependent in barrier.dependents:
for possible_barrier in dependent.dependencies:
if possible_barrier in seen:
continue
seen.add(possible_barrier)
if not (other_barrier_key := id_from_key(possible_barrier.key)):
continue
if not (shuffle := self.active_shuffles.get(other_barrier_key)):
continue
current_worker_for = shuffle.run_spec.worker_for
# This is a fail-safe for future three-ways merges. At the moment there
# should only ever be at most one other shuffle that shares output
# tasks, so existing will always be empty.
if existing: # pragma: nocover
for shared_key in existing.keys() & current_worker_for.keys():
if existing[shared_key] != current_worker_for[shared_key]:
raise RuntimeError(
f"Failed to initialize shuffle {spec.id} because "
"it cannot align output partition mappings between "
f"existing shuffles {seen}. "
f"Mismatch encountered for output partition {shared_key!r}: "
f"{existing[shared_key]} != {current_worker_for[shared_key]}."
)
existing.update(current_worker_for)

worker_for = {}
for partition in spec.output_partitions:
worker = spec.pick_worker(partition, workers)
mapping[partition] = worker
return mapping
if (worker := existing.get(partition, None)) is None:
worker = spec.pick_worker(partition, workers)
worker_for[partition] = worker
return worker_for

def _ensure_output_tasks_are_non_rootish(self, spec: ShuffleSpec) -> None:
"""Output tasks are created without worker restrictions and run once with the
only purpose of setting the worker restriction and then raising Reschedule, and
then running again properly on the correct worker. It would be non-trivial to
set the worker restriction before they're first run due to potential task
fusion.
Most times, this lack of initial restrictions would cause output tasks to be
labelled as rootish on their first (very fast) run, which in turn would break
the design assumption that the worker-side queue of rootish tasks will last long
enough to cover the round-trip to the scheduler to receive more tasks, which in
turn would cause a measurable slowdown on the overall runtime of the shuffle
operation.
This method ensures that, given M output tasks and N workers, each worker-side
queue is pre-loaded with M/N output tasks which can be flushed very fast as
they all raise Reschedule() in quick succession.
See Also
--------
ShuffleRun._ensure_output_worker
"""
barrier = self.scheduler.tasks[barrier_key(spec.id)]
for dependent in barrier.dependents:
dependent._rootish = False

@log_errors()
def _set_restriction(self, ts: TaskState, worker: str) -> None:
Expand Down
66 changes: 62 additions & 4 deletions distributed/shuffle/tests/test_merge.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
from __future__ import annotations

import asyncio
import contextlib
from typing import Any
from unittest import mock

import pytest

from distributed.shuffle._core import id_from_key
from dask.typing import Key

from distributed import Worker
from distributed.shuffle._core import ShuffleId, ShuffleSpec, id_from_key
from distributed.shuffle._merge import hash_join
from distributed.shuffle._worker_plugin import ShuffleRun, _ShuffleRunManager
from distributed.utils_test import gen_cluster

dd = pytest.importorskip("dask.dataframe")
Expand Down Expand Up @@ -112,7 +118,9 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s
# Vary the number of output partitions for the shuffles of dd2
.repartition(20).merge(ddf2, left_on="b", right_on="x", shuffle="p2p")
)
# Generate unique shuffle IDs if the input frame is the same but parameters differ
# Generate unique shuffle IDs if the input frame is the same but
# parameters differ. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
assert sum(id_from_key(k) is not None for k in out.dask) == 4
result = await c.compute(out)
expected = pdf1.merge(pdf2, left_on="a", right_on="x").merge(
Expand Down Expand Up @@ -147,8 +155,10 @@ async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a,
right_on="b",
shuffle="p2p",
)
# Generate the same shuffle IDs if the input frame is the same and all its parameters match
assert sum(id_from_key(k) is not None for k in out.dask) == 3
# Generate unique shuffle IDs if the input frame is the same and all its
# parameters match. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
assert sum(id_from_key(k) is not None for k in out.dask) == 4
result = await c.compute(out)
expected = pdf2.merge(
pdf1.merge(pdf2, left_on="a", right_on="x"), left_on="x", right_on="b"
Expand Down Expand Up @@ -474,3 +484,51 @@ async def test_index_merge_p2p(c, s, a, b, how):
),
pdf_right.merge(pdf_left, how=how, right_index=True, left_on="a"),
)


class LimitedGetOrCreateShuffleRunManager(_ShuffleRunManager):
seen: set[ShuffleId]
block_get_or_create: asyncio.Event
blocking_get_or_create: asyncio.Event

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.seen = set()
self.limit = 1
self.blocking_get_or_create = asyncio.Event()
self.block_get_or_create = asyncio.Event()

async def get_or_create(self, spec: ShuffleSpec, key: Key) -> ShuffleRun:
if len(self.seen) >= self.limit and spec.id not in self.seen:
self.blocking_get_or_create.set()
await self.block_get_or_create.wait()
self.seen.add(spec.id)
return await super().get_or_create(spec, key)


@mock.patch(
"distributed.shuffle._worker_plugin._ShuffleRunManager",
LimitedGetOrCreateShuffleRunManager,
)
@gen_cluster(client=True, nthreads=[("", 1)])
async def test_merge_does_not_deadlock_if_worker_joins(c, s, a):
"""Regression test for https://github.com/dask/distributed/issues/8411"""
pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
df1 = dd.from_pandas(pdf1, npartitions=10)
df2 = dd.from_pandas(pdf2, npartitions=20)

run_manager_A = a.plugins["shuffle"].shuffle_runs

joined = dd.merge(df1, df2, left_on="a", right_on="x", shuffle="p2p")
result = c.compute(joined)

await run_manager_A.blocking_get_or_create.wait()

async with Worker(s.address) as b:
run_manager_A.block_get_or_create.set()
run_manager_B = b.plugins["shuffle"].shuffle_runs
run_manager_B.block_get_or_create.set()
result = await result
expected = pd.merge(pdf1, pdf2, left_on="a", right_on="x")
assert_eq(result, expected, check_index=False)
39 changes: 39 additions & 0 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -2478,6 +2478,45 @@ async def test_unpack_gets_rescheduled_from_non_participating_worker(c, s, a):
dd.assert_eq(result, expected)


class BlockedBarrierShuffleSchedulerPlugin(ShuffleSchedulerPlugin):
def __init__(self, scheduler: Scheduler):
super().__init__(scheduler)
self.in_barrier = asyncio.Event()
self.block_barrier = asyncio.Event()

async def barrier(self, id: ShuffleId, run_id: int, consistent: bool) -> None:
self.in_barrier.set()
await self.block_barrier.wait()
return await super().barrier(id, run_id, consistent)


@gen_cluster(client=True)
async def test_unpack_is_non_rootish(c, s, a, b):
with pytest.warns(UserWarning):
scheduler_plugin = BlockedBarrierShuffleSchedulerPlugin(s)
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-21",
dtypes={"x": float, "y": float},
freq="10 s",
)
df = df.shuffle("x")
result = c.compute(df)

await scheduler_plugin.in_barrier.wait()

unpack_tss = [ts for key, ts in s.tasks.items() if key_split(key) == "shuffle_p2p"]
assert len(unpack_tss) == 20
assert not any(s.is_rootish(ts) for ts in unpack_tss)
del unpack_tss
scheduler_plugin.block_barrier.set()
result = await result

await check_worker_cleanup(a)
await check_worker_cleanup(b)
await check_scheduler_cleanup(s)


class FlakyConnectionPool(ConnectionPool):
def __init__(self, *args, failing_connects=0, **kwargs):
self.attempts = 0
Expand Down
Loading

0 comments on commit 82fd10b

Please sign in to comment.