Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider candidates that don't hold any dependencies in decide_worker #4925

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Changes from 20 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5b17f55
Test workers without deps are considered
gjoseph92 Jun 17, 2021
4a81ca8
Consider random subset of workers in decide_worker
gjoseph92 Jun 17, 2021
c57fd72
no-sleep test
gjoseph92 Jun 17, 2021
d810d2d
Comment fastpath. Maybe this is still unnecessary?
gjoseph92 Jun 17, 2021
768d660
Pick from idle workers first
gjoseph92 Jun 17, 2021
346ab17
Update `many_independent_leaves` test
gjoseph92 Jun 18, 2021
420c99e
Uppercase Mb
gjoseph92 Jun 18, 2021
0a004b2
move N_RANDOM_WORKERS within conditional
gjoseph92 Jun 18, 2021
b050d14
Pass in sortedcontainers values, not pydict values
gjoseph92 Jun 18, 2021
9e99b7f
Use sleep test again
gjoseph92 Jun 18, 2021
f6acdc4
Simpler logic
gjoseph92 Jun 18, 2021
524da73
20 -> 10
gjoseph92 Jun 18, 2021
a5d37ae
Over-optimized
gjoseph92 Jun 18, 2021
5842ca8
Revert "Over-optimized"
gjoseph92 Jun 18, 2021
a159245
`random_choices_iter`. over-optimized for now.
gjoseph92 Jun 18, 2021
bb991d1
use `random.choices`
gjoseph92 Jun 18, 2021
58b4bf8
REBASEME Actor: don't hold key references on workers
gjoseph92 Jun 19, 2021
13975cb
Remove flaky data-length check
gjoseph92 Jun 21, 2021
fcb165e
No randomness if < 10 workers to choose from
gjoseph92 Jun 21, 2021
cd382c6
Ensure `decide_worker` args are plain dict_values
gjoseph92 Jun 21, 2021
cc57a8b
1 worker for `test_statistical_profiling`
gjoseph92 Jun 22, 2021
13911bc
no conditional on compiled
gjoseph92 Jun 22, 2021
f2445fe
rerun tests
gjoseph92 Jun 22, 2021
38e6b57
Merge remote-tracking branch 'upstream/main' into decide_worker/add-r…
gjoseph92 Jul 20, 2021
5794540
fix errant actor test
gjoseph92 Jul 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions distributed/actor.py
Original file line number Diff line number Diff line change
@@ -3,11 +3,11 @@
import threading
from queue import Queue

from .client import Future, default_client
from .client import Future
from .protocol import to_serialize
from .utils import iscoroutinefunction, sync, thread_state
from .utils_comm import WrappedKey
from .worker import get_worker
from .worker import get_client, get_worker


class Actor(WrappedKey):
@@ -63,8 +63,8 @@ def __init__(self, cls, address, key, worker=None):
except ValueError:
self._worker = None
try:
self._client = default_client()
self._future = Future(key)
self._client = get_client()
self._future = Future(key, inform=self._worker is None)
except ValueError:
self._client = None

32 changes: 26 additions & 6 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@
from datetime import timedelta
from functools import partial
from numbers import Number
from typing import Optional
from typing import Optional, ValuesView

import psutil
import sortedcontainers
@@ -2324,7 +2324,10 @@ def decide_worker(self, ts: TaskState) -> WorkerState:
if ts._dependencies or valid_workers is not None:
ws = decide_worker(
ts,
self._workers_dv.values(),
self._workers_dv.values() if compiled else dict.values(self._workers),
self._idle_dv.values() if compiled else dict.values(self._workers),
# ^ NOTE: For performance, these must be actual `dict_values`, not `SortedDictValues`.
# In Cython, `_workers_dv` is a plain dict, but in plain Python, it's still a `SortedDict`.
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved
valid_workers,
partial(self.worker_objective, ts),
)
@@ -7459,14 +7462,19 @@ def _reevaluate_occupancy_worker(state: SchedulerState, ws: WorkerState):
@cfunc
@exceptval(check=False)
def decide_worker(
ts: TaskState, all_workers, valid_workers: set, objective
ts: TaskState,
all_workers: ValuesView,
idle_workers: ValuesView,
valid_workers: set,
objective,
) -> WorkerState:
"""
Decide which worker should take task *ts*.

We choose the worker that has the data on which *ts* depends.
We consider all workers which hold dependencies of *ts*,
plus a sample of up to 10 random workers (with preference for idle ones).

If several workers have dependencies then we choose the less-busy worker.
From those, we choose the worker where the *objective* function is minimized.

Optionally provide *valid_workers* of where jobs are allowed to occur
(if all workers are allowed to take the task, pass None instead).
@@ -7476,6 +7484,8 @@ def decide_worker(
of bytes sent between workers. This is determined by calling the
*objective* function.
"""
# NOTE: `all_workers` and `idle_workers` must be plain `dict_values` objects,
# not a `SortedValuesView`, which is much slower to iterate over.
ws: WorkerState = None
wws: WorkerState
dts: TaskState
@@ -7485,7 +7495,17 @@ def decide_worker(
if ts._actor:
candidates = set(all_workers)
else:
# Select all workers holding deps of this task
candidates = {wws for dts in deps for wws in dts._who_has}
# Add up to 10 random workers into `candidates`, preferring idle ones.
worker_pool = valid_workers if valid_workers is not None else all_workers
if len(candidates) < len(worker_pool):
sample_from = idle_workers or worker_pool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if valid_workers and idle_workers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like maybe we want to use idle_workers or all_workers above in line 7501

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if valid_workers and idle_workers?

We'd then intersect candidates with valid_workers, so at least we wouldn't ever pick an invalid worker:

else:
candidates &= valid_workers
if not candidates:
candidates = valid_workers
if not candidates:
if ts._loose_restrictions:
ws = decide_worker(ts, all_workers, idle_workers, None, objective)
return ws

But it's true that if idle_workers and valid_workers are disjoint, then we haven't gained anything here.

How about (valid_workers.intersection(idle_workers) or valid_workers) if valid_workers is not None else (idle_workers or valid_workers)?

Or for simplicity we could ignore idle_workers when there are valid_workers given, and just use valid_workers.

candidates.update(
random.choices(list(sample_from), k=min(10, len(sample_from)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on reducing 10 to 2 or 5 or something lower? This might be premature, but I'm curious how much adding more workers helps here in aggregate. Part of me thinks that adding just one random worker into the mix probably does 80% of the good in aggregate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd kind of like to do that later once we've gotten more of a sense of the impact of this, and hopefully have some more examples to measure that change against. When a cluster is mostly quiet, picking 2 random workers has a high chance of finding a good one, but in a large cluster with mostly busy workers, it's a big haystack to find the few that are underutilized (but not idle).

if len(sample_from) > 10
else sample_from
)
if valid_workers is None:
if not candidates:
candidates = set(all_workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have idle_workers in this function should this be idle_workers or all_workers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting; yes it probably should be.

@@ -7495,7 +7515,7 @@ def decide_worker(
candidates = valid_workers
if not candidates:
if ts._loose_restrictions:
ws = decide_worker(ts, all_workers, None, objective)
ws = decide_worker(ts, all_workers, idle_workers, None, objective)
return ws

ncandidates: Py_ssize_t = len(candidates)
57 changes: 56 additions & 1 deletion distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
@@ -515,7 +515,7 @@ def check(dask_worker):
start = time()
while any(client.run(check).values()):
sleep(0.01)
assert time() < start + 30
assert time() < start + 10


@gen_cluster(
@@ -566,6 +566,61 @@ async def wait(self):
await c.gather(futures)


@gen_cluster(client=True, client_kwargs=dict(set_as_default=False))
# ^ NOTE: without `set_as_default=False`, `get_client()` within worker would return
# the same client instance the test is using (because it's all one process).
# Even with this, both workers will share the same client instance.
async def test_worker_actor_handle_is_weakref(c, s, a, b):
counter = c.submit(Counter, actor=True, workers=[a.address])

await c.submit(lambda _: None, counter, workers=[b.address])

del counter

start = time()
while a.actors or b.data:
await asyncio.sleep(0.1)
assert time() < start + 10


def test_worker_actor_handle_is_weakref_sync(client):
workers = list(client.run(lambda: None))
counter = client.submit(Counter, actor=True, workers=[workers[0]])

client.submit(lambda _: None, counter, workers=[workers[1]]).result()

del counter

def check(dask_worker):
return len(dask_worker.data) + len(dask_worker.actors)

start = time()
while any(client.run(check).values()):
sleep(0.01)
assert time() < start + 10


def test_worker_actor_handle_is_weakref_from_compute_sync(client):
workers = list(client.run(lambda: None))

with dask.annotate(workers=workers[0]):
counter = dask.delayed(Counter)()
with dask.annotate(workers=workers[1]):
intermediate = dask.delayed(lambda c: None)(counter)
with dask.annotate(workers=workers[0]):
final = dask.delayed(lambda x, c: x)(intermediate, counter)

final.compute(actors=counter, optimize_graph=False)

def worker_tasks_running(dask_worker):
return len(dask_worker.data) + len(dask_worker.actors)

start = time()
while any(client.run(worker_tasks_running).values()):
sleep(0.01)
assert time() < start + 10


def test_one_thread_deadlock():
with cluster(nworkers=2) as (cl, w):
client = Client(cl["address"])
31 changes: 27 additions & 4 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -100,14 +100,16 @@ async def test_recompute_released_results(c, s, a, b):
assert result == 1


@gen_cluster(client=True)
@gen_cluster(client=True, config={"distributed.scheduler.bandwidth": "1Mb"})
async def test_decide_worker_with_many_independent_leaves(c, s, a, b):
# Make data large to penalize scheduling dependent tasks on other workers
ballast = b"\0" * int(s.bandwidth)
xs = await asyncio.gather(
c.scatter(list(range(0, 100, 2)), workers=a.address),
c.scatter(list(range(1, 100, 2)), workers=b.address),
c.scatter([bytes(i) + ballast for i in range(0, 100, 2)], workers=a.address),
c.scatter([bytes(i) + ballast for i in range(1, 100, 2)], workers=b.address),
)
xs = list(concat(zip(*xs)))
ys = [delayed(inc)(x) for x in xs]
ys = [delayed(lambda s: s[0])(x) for x in xs]

y2s = c.persist(ys)
await wait(y2s)
@@ -126,6 +128,27 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c):
assert x.key in a.data or x.key in b.data


@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 3,
config={"distributed.scheduler.work-stealing": False},
)
async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c):
await client.submit(slowinc, 10, delay=0.1) # learn that slowinc is slow
root = await client.scatter(1)
assert sum(root.key in worker.data for worker in [a, b, c]) == 1

start = time()
tasks = client.map(slowinc, [root] * 6, delay=0.1, pure=False)
await wait(tasks)
elapsed = time() - start

assert elapsed <= 4
assert all(root.key in worker.data for worker in [a, b, c]), [
list(worker.data.keys()) for worker in [a, b, c]
]


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_move_data_over_break_restrictions(client, s, a, b, c):
[x] = await client.scatter([1], workers=b.address)
2 changes: 1 addition & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
@@ -1417,7 +1417,7 @@ async def get_data(
if k in self.actors:
from .actor import Actor

data[k] = Actor(type(self.actors[k]), self.address, k)
data[k] = Actor(type(self.actors[k]), self.address, k, worker=self)

msg = {"status": "OK", "data": {k: to_serialize(v) for k, v in data.items()}}
nbytes = {k: self.tasks[k].nbytes for k in data if k in self.tasks}