-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit incoming data transfers by amount of data #6975
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 30m 32s ⏱️ + 2m 52s Results for commit 0a5517d. ± Comparison against base commit bfc5cfe. ♻️ This comment has been updated with latest results. |
141a9de
to
0275f99
Compare
732bdf5
to
ac7ebfe
Compare
Note: The current implementation may cross the limit in some instances, i.e., any time we are starting to gather from a new worker. I'm working on some cleaner logic that avoids crossing the limit unless we are not gathering any data. This change may also be moved to a separate PR if we want to get this general change merged. |
I adjusted the logic and now we should never exceed the limit unless for the first task to gather to ensure that we make progress. |
assert ws.tasks["a"].state == "fetch" | ||
assert ws.tasks["b"].state == "flight" | ||
assert ws.tasks["c"].state == "flight" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only deterministic because there is some inherent sorting to the python collections (e.g. dicts are insertion sorted). From first principle, the worker doesn't prefer either of these tasks over the other. I think we should write the test agnostic to this. Or asked differently, would you have known which is scheduled and which is queued before executing the test once?
how about
tasks_by_state = defaultdict(list)
for ts in ws.tasks.values():
tasks_by_state[ts.state].append(ts)
assert len(tasks_by_state["flight"]) == 2
assert len(tasks_by_state["fetch"]) == 1
# NOTE: We do not compare instructions since their sorting is random
ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws2, data={ts.key: 123 for ts in tasks_by_state['flight']}, total_nbytes=200, stimulus_id="s2"
)
)
assert all(ts.state == "memory" for ts in tasks_by_state['flight'])
assert all(ts.state == "flight" for ts in tasks_by_state['fetch'])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, this makes an effort to hide unnecessary details and limit the assertions to what's important.
distributed/worker_state_machine.py
Outdated
to_gather: list[TaskState] = [] | ||
total_nbytes = 0 | ||
|
||
while available: | ||
ts = available.peek() | ||
# The top-priority task is fetched regardless of its size | ||
if ( | ||
to_gather | ||
and total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes | ||
gather_at_least_one_task = self.transfer_incoming_bytes or to_gather | ||
exceed_message_target = ( | ||
total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes | ||
) | ||
exceed_bytes_limit = ( | ||
self.transfer_incoming_bytes_limit is not None | ||
and self.transfer_incoming_bytes + total_nbytes + ts.get_nbytes() | ||
> self.transfer_incoming_bytes_limit | ||
) | ||
if gather_at_least_one_task and ( | ||
exceed_message_target or exceed_bytes_limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about
if self.transfer_incoming_bytes_limit:
bytes_left_to_fetch = min(
self.transfer_incoming_bytes_limit - self.transfer_incoming_bytes,
self.transfer_message_target_bytes,
)
else:
bytes_left_to_fetch = self.transfer_message_target_bytes
while available:
ts = available.peek()
if (
# If there is no other traffic, the top priority task may be
# fetched regardless of its size
to_gather or self.transfer_incoming_bytes
) and total_nbytes + ts.get_nbytes() > bytes_left_to_fetch:
break
for worker in ts.who_has:
# This also effectively pops from available
self.data_needed[worker].remove(ts)
to_gather.append(ts)
total_nbytes += ts.get_nbytes()
return to_gather, total_nbytes
At least subjectively this seems simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this looks simpler.
assert ws.tasks["a"].state == "flight" | ||
assert ws.tasks["b"].state == "fetch" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar concern about determinism as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted
assert instructions == [ | ||
GatherDep.match( | ||
worker=ws2, | ||
to_gather={"a"}, | ||
stimulus_id="s1", | ||
), | ||
] | ||
assert ws.tasks["a"].state == "flight" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should either assert the state or the instruction. Both is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the instruction checks
distributed/worker.py
Outdated
if self.memory_manager.memory_limit is None | ||
else int( | ||
self.memory_manager.memory_limit | ||
* dask.config.get("distributed.worker.memory.transfer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the distributed-schema.yaml
distributed.worker.memory.transfer
is allowed to be False
. This would raise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorporated review feedback
distributed/worker.py
Outdated
if self.memory_manager.memory_limit is None | ||
else int( | ||
self.memory_manager.memory_limit | ||
* dask.config.get("distributed.worker.memory.transfer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks!
distributed/worker_state_machine.py
Outdated
to_gather: list[TaskState] = [] | ||
total_nbytes = 0 | ||
|
||
while available: | ||
ts = available.peek() | ||
# The top-priority task is fetched regardless of its size | ||
if ( | ||
to_gather | ||
and total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes | ||
gather_at_least_one_task = self.transfer_incoming_bytes or to_gather | ||
exceed_message_target = ( | ||
total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes | ||
) | ||
exceed_bytes_limit = ( | ||
self.transfer_incoming_bytes_limit is not None | ||
and self.transfer_incoming_bytes + total_nbytes + ts.get_nbytes() | ||
> self.transfer_incoming_bytes_limit | ||
) | ||
if gather_at_least_one_task and ( | ||
exceed_message_target or exceed_bytes_limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this looks simpler.
assert ws.tasks["a"].state == "flight" | ||
assert ws.tasks["b"].state == "fetch" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adjusted
assert instructions == [ | ||
GatherDep.match( | ||
worker=ws2, | ||
to_gather={"a"}, | ||
stimulus_id="s1", | ||
), | ||
] | ||
assert ws.tasks["a"].state == "flight" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the instruction checks
assert ws.tasks["a"].state == "fetch" | ||
assert ws.tasks["b"].state == "flight" | ||
assert ws.tasks["c"].state == "flight" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair point, this makes an effort to hide unnecessary details and limit the assertions to what's important.
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
Closes #6208
pre-commit run --all-files