From f1efbe35c4e5f3d00ea23b67830d4cc994b4cef4 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 6 Aug 2019 11:20:09 -0500 Subject: [PATCH] Altnerative scheduling for new tasks Rather than placing new tasks with no dependencies on the first idle worker, we try placing them on a worker executing tasks they're a co-depenency with. --- distributed/scheduler.py | 91 +++++++++++++++++++++++++---- distributed/tests/test_scheduler.py | 20 +++++++ 2 files changed, 100 insertions(+), 11 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 65e93d3e59c..0bb7c25e06d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3596,6 +3596,84 @@ def transition_no_worker_waiting(self, key): pdb.set_trace() raise + def decide_worker_initial(self, ts: TaskState) -> WorkerState: + r""" + Decide the worker to assign a task to. + + Parameters + ---------- + ts : TaskState + This is a ready task that has no dependencies. + + Returns + ------- + WorkerState + + Notes + ----- + This prioritizes scheduling a task on a worker executing tasks that we're + a sibling dependency with. + + Consider the following task graph (read top to bottom) + + :: + + a-1 a-2 a-3 a-4 + \ / \ / + b-1 b-2 + + If we have + + * Two workers: `A` and `B` + * Task `a-1` is running on worker `A` + * We're currently scheduling task `a-2` + + we'll choose to schedule `a-2` it on worker `A` as well because that will + minimize communication when `b-1` is eventually scheduled. + + When we dont have any sibling tasks running, we assign `ts` to an idle worker, + or a worker with occupancy / a relatively low number of tasks. + + See Also + -------- + decide_worker + worker_objective + transition_waiting_processing + """ + worker = None + # the config is just for ease of testing / benchmarking. Will remove + if dask.config.get("distributed.scheduler.lump_tasks", default=True): + for dts in ts.dependents: + # Figure out where my siblings are running. Note that we + # stop as soon as we find *a* sibling running *somewhere*. + # If time weren't an issue, we might find the worker with the + # most siblings. But that's expensive. + # + for sts in dts.dependencies: + if sts.processing_on: + # c[sts.processing_on] += 1 + worker = sts.processing_on + break + if sts.who_has: + worker = random.choice(sts.who_has) + break + + if worker: + # ((worker, n_tasks),) = c.most_common(1) + return worker + elif self.idle: + if len(self.idle) < 20: # smart but linear in small case + worker = min(self.idle, key=operator.attrgetter("occupancy")) + else: # dumb but fast in large case + worker = self.idle[self.n_tasks % len(self.idle)] + + elif len(self.workers) < 20: # smart but linear in small case + worker = min(self.workers.values(), key=operator.attrgetter("occupancy")) + else: # dumb but fast in large case + worker = self.workers.values()[self.n_tasks % len(self.workers)] + + return worker + def decide_worker(self, ts): """ Decide on a worker for task *ts*. Return a WorkerState. @@ -3614,18 +3692,8 @@ def decide_worker(self, ts): valid_workers, partial(self.worker_objective, ts), ) - elif self.idle: - if len(self.idle) < 20: # smart but linear in small case - worker = min(self.idle, key=operator.attrgetter("occupancy")) - else: # dumb but fast in large case - worker = self.idle[self.n_tasks % len(self.idle)] else: - if len(self.workers) < 20: # smart but linear in small case - worker = min( - self.workers.values(), key=operator.attrgetter("occupancy") - ) - else: # dumb but fast in large case - worker = self.workers.values()[self.n_tasks % len(self.workers)] + worker = self.decide_worker_initial(ts) if self.validate: assert worker is None or isinstance(worker, WorkerState), ( @@ -4819,6 +4887,7 @@ def decide_worker(ts, all_workers, valid_workers, objective): of bytes sent between workers. This is determined by calling the *objective* function. """ + deps = ts.dependencies assert all(dts.who_has for dts in deps) if ts.actor: diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 9035fbd8667..ff48f482020 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -114,6 +114,26 @@ def test_decide_worker_with_restrictions(client, s, a, b, c): assert x.key in a.data or x.key in b.data +@gen_cluster(client=True) +def test_decide_worker_groups_siblings(c, s, a, b): + # Ensure that a-0, a-1, and b-1 are all scheduled on + # the same worker to ensure that communication is + # minimized. + dsk = { + "a-0": (inc, 0), + "a-1": (inc, 1), + "b-0": (operator.add, "a-0", "a-1"), + "a-2": (inc, 2), + "a-3": (inc, 3), + "b-1": (operator.add, "a-2", "a-3"), + } + x = yield c.get(dsk, keys=["b-0", "b-1"], sync=False) + yield wait(x) + assert x == [3, 7] + + assert all([len(x.outgoing_transfer_log) == 1 for x in [a, b]]) + + @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) def test_move_data_over_break_restrictions(client, s, a, b, c): [x] = yield client.scatter([1], workers=b.address)