-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternative scheduling for new tasks #2940
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3596,6 +3596,84 @@ def transition_no_worker_waiting(self, key): | |
pdb.set_trace() | ||
raise | ||
|
||
def decide_worker_initial(self, ts: TaskState) -> WorkerState: | ||
r""" | ||
Decide the worker to assign a task to. | ||
|
||
Parameters | ||
---------- | ||
ts : TaskState | ||
This is a ready task that has no dependencies. | ||
|
||
Returns | ||
------- | ||
WorkerState | ||
|
||
Notes | ||
----- | ||
This prioritizes scheduling a task on a worker executing tasks that we're | ||
a sibling dependency with. | ||
|
||
Consider the following task graph (read top to bottom) | ||
|
||
:: | ||
|
||
a-1 a-2 a-3 a-4 | ||
\ / \ / | ||
b-1 b-2 | ||
|
||
If we have | ||
|
||
* Two workers: `A` and `B` | ||
* Task `a-1` is running on worker `A` | ||
* We're currently scheduling task `a-2` | ||
|
||
we'll choose to schedule `a-2` it on worker `A` as well because that will | ||
minimize communication when `b-1` is eventually scheduled. | ||
|
||
When we dont have any sibling tasks running, we assign `ts` to an idle worker, | ||
or a worker with occupancy / a relatively low number of tasks. | ||
|
||
See Also | ||
-------- | ||
decide_worker | ||
worker_objective | ||
transition_waiting_processing | ||
""" | ||
worker = None | ||
# the config is just for ease of testing / benchmarking. Will remove | ||
if dask.config.get("distributed.scheduler.lump_tasks", default=True): | ||
for dts in ts.dependents: | ||
# Figure out where my siblings are running. Note that we | ||
# stop as soon as we find *a* sibling running *somewhere*. | ||
# If time weren't an issue, we might find the worker with the | ||
# most siblings. But that's expensive. | ||
# | ||
for sts in dts.dependencies: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are several situations where a single task has very many dependents. In these cases I think that we'll hit N^2 scaling and bring things down. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about cases where we don't have siblings, but cousins n'th removed
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, my initial got a full count of where each of our co-dependencies was running. That blew up very quickly. The early There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This approach won't help in that case (I think |
||
if sts.processing_on: | ||
# c[sts.processing_on] += 1 | ||
worker = sts.processing_on | ||
break | ||
if sts.who_has: | ||
worker = random.choice(sts.who_has) | ||
break | ||
|
||
if worker: | ||
# ((worker, n_tasks),) = c.most_common(1) | ||
return worker | ||
elif self.idle: | ||
if len(self.idle) < 20: # smart but linear in small case | ||
worker = min(self.idle, key=operator.attrgetter("occupancy")) | ||
else: # dumb but fast in large case | ||
worker = self.idle[self.n_tasks % len(self.idle)] | ||
|
||
elif len(self.workers) < 20: # smart but linear in small case | ||
worker = min(self.workers.values(), key=operator.attrgetter("occupancy")) | ||
else: # dumb but fast in large case | ||
worker = self.workers.values()[self.n_tasks % len(self.workers)] | ||
|
||
return worker | ||
|
||
def decide_worker(self, ts): | ||
""" | ||
Decide on a worker for task *ts*. Return a WorkerState. | ||
|
@@ -3614,18 +3692,8 @@ def decide_worker(self, ts): | |
valid_workers, | ||
partial(self.worker_objective, ts), | ||
) | ||
elif self.idle: | ||
if len(self.idle) < 20: # smart but linear in small case | ||
worker = min(self.idle, key=operator.attrgetter("occupancy")) | ||
else: # dumb but fast in large case | ||
worker = self.idle[self.n_tasks % len(self.idle)] | ||
else: | ||
if len(self.workers) < 20: # smart but linear in small case | ||
worker = min( | ||
self.workers.values(), key=operator.attrgetter("occupancy") | ||
) | ||
else: # dumb but fast in large case | ||
worker = self.workers.values()[self.n_tasks % len(self.workers)] | ||
worker = self.decide_worker_initial(ts) | ||
|
||
if self.validate: | ||
assert worker is None or isinstance(worker, WorkerState), ( | ||
|
@@ -4819,6 +4887,7 @@ def decide_worker(ts, all_workers, valid_workers, objective): | |
of bytes sent between workers. This is determined by calling the | ||
*objective* function. | ||
""" | ||
|
||
deps = ts.dependencies | ||
assert all(dts.who_has for dts in deps) | ||
if ts.actor: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that all ascii art diagrams in the codebase so far have computation going from bottom to top. This is also the way that visualize works.