Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

25% performance regression in merges #7052

Closed
wence- opened this issue Sep 21, 2022 · 6 comments
Closed

25% performance regression in merges #7052

wence- opened this issue Sep 21, 2022 · 6 comments

Comments

@wence-
Copy link
Contributor

wence- commented Sep 21, 2022

Our weekly multi-node benchmarking (working on making this publicly visible) shows a performance regression in simple dataframe merges, which I can pinpoint to #6975. (This was briefly reverted in #6994 and then reintroduced in #7007).

visualization-3

More specifically, #6975 changes the decision making in _select_keys_for_gather:

if (
# When there is no other traffic, the top-priority task is fetched
# regardless of its size to ensure progress
self.transfer_incoming_bytes
or to_gather
) and total_nbytes + ts.get_nbytes() > bytes_left_to_fetch:
break
for worker in ts.who_has:
# This also effectively pops from available
self.data_needed[worker].remove(ts)
to_gather.append(ts)
total_nbytes += ts.get_nbytes()

Prior to this change the logic was

# The top-priority task is fetched regardless of its size
if (
to_gather
and total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes
):
break
for worker in ts.who_has:
# This also effectively pops from available
self.data_needed[worker].remove(ts)
to_gather.append(ts)
total_nbytes += ts.get_nbytes()

Note the difference in whether we fetch the top priority task. If I remove the part of the decision making logic that looks at self.incoming_transfer_bytes:

 if ( 
     to_gather 
     and total_nbytes + ts.get_nbytes() > bytes_left_to_fetch
 ): 

Then performance goes back to where it was previously.

Not sure the correct way to square this circle. I don't understand the how the change in _select_keys_for_gather interacts with the intention of the PR to throttle data transfer.

cc @hendrikmakait (as author of #6975)

@jrbourbeau
Copy link
Member

Thanks for reporting and even identifying the relevant code change @wence-! @hendrikmakait do you have some time to look into this?

@hendrikmakait
Copy link
Member

Hi @wence, thanks for bringing this to my attention! #6975 was by design going to hurt some workloads, we're essentially trading fewer out-of-memory scenarios (which might have fatal consequences on a worker) for increased runtime of some workloads. I had not noticed any performance hits for the integration tests executed within coiled/coiled-runtime which indicated that we had hit a sweet spot for the threshold set via distributed.worker.memory.transfer that would keep the oom-killer away while rarely impacting runtime.

Do you have an example workload and cluster configuration (e.g. cluster size, available RAM, # of workers) that I could try and replicate?

If you have time to investigate further, would you mind exploring the effect of increasing distributed.worker.memory.transfer on the runtime of the impacted workloads?

It might be that we have to revisit the default setting or this imperfect approach to limiting memory load caused by data transfer altogether.

@wence-
Copy link
Contributor Author

wence- commented Sep 26, 2022

If you have time to investigate further, would you mind exploring the effect of increasing distributed.worker.memory.transfer on the runtime of the impacted workloads?

Setting export DASK_DISTRIBUTED__WORKER__MEMORY__TRANSFER=1 (which I think is the maximum value) doesn't improve things. In fact, it appears that setting this value doesn't really have an effect at all for this workload (I get effectively the same throughput with export DASK_DISTRIBUTED__WORKER__MEMORY__TRANSFER=0.00000001).

Inspecting the values of self.transfer_incoming_bytes_limit, self.transfer_incoming_bytes, and self.transfer_message_target_bytes, it appears that the limit on bytes_left_to_fetch is always coming from self.transfer_message_target_bytes (which is hard-coded at 50MB).

These benchmarks are running on a high-performance network (depending on the worker pairings between 12 and 45 GiB/s uni-directional bandwidth), so the default to limit grabbing multiple "small" messages from a single worker at 50MB total is getting in the way (I can send multiple GiBs of data in less than a second).

I think what is happening is that previously there might have been two messages in flight between any given pair of workers at any one time, whereas now the changed logic means we limit to a single message.

So I think that #6975 fixed the logic in terms of limiting wrt transfer_message_target_bytes, but this turns out to be bad in some settings. One way to fix this is add configuration for transfer_message_target_bytes, I suppose.

@hendrikmakait
Copy link
Member

hendrikmakait commented Sep 26, 2022

Inspecting the values of self.transfer_incoming_bytes_limit, self.transfer_incoming_bytes, and self.transfer_message_target_bytes, it appears that the limit on bytes_left_to_fetch is always coming from self.transfer_message_target_bytes (which is hard-coded at 50MB).

I think there might be a problem with the related logic, let me take a closer look at the implemention.

@hendrikmakait
Copy link
Member

One way to fix this is add configuration for transfer_message_target_bytes, I suppose.

This feels like a good idea regardless of the problem at hand, I'll put together a PR.

@hendrikmakait
Copy link
Member

Fixed by #7071

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants