Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISCUSS] Improve client/scheduler performance during shuffling #6163

Open
rjzamora opened this issue May 1, 2020 · 2 comments
Open

[DISCUSS] Improve client/scheduler performance during shuffling #6163

rjzamora opened this issue May 1, 2020 · 2 comments
Labels
dataframe discussion Discussing a topic with no specific actions yet scheduler

Comments

@rjzamora
Copy link
Member

rjzamora commented May 1, 2020

Lets use this issue to coordinate some ongoing efforts to improve client/scheduler graph performance related to large-scale shuffle operations.

In order to rearrange data between partitions in dask.dataframe (for parallel merge/sort/shuffle routines), the rearrange_by_column_tasks routine is used to build a task graph for staged shuffling. Since this logic represents n log(n) scaling, the time required for graph creation and execution itself can be quite significant.

Note that a detailed explanation of a nearly identical "staged shuffle" is described in this discussion. One component of the algorithm that is clearly dominating the size of the graph is the repetition of shuffle_group tasks (which output dictionaries of pd/cudf DataFrame objects) and getitem tasks (which select elements of the shuffle-group output). It is my understanding that some people may have promising ideas to improve performance here.

cc @kkraus14 @quasiben @mrocklin (Please do cc others as well..)

@kkraus14
Copy link
Member

kkraus14 commented May 1, 2020

cc @madsbk who had some ideas in this area

@kkraus14
Copy link
Member

kkraus14 commented May 1, 2020

Also, taken from #6137 (comment), here's a standalone example of where client/scheduler graph performance is problematic:

from distributed import Client
from dask.datasets import timeseries
from dask.dataframe.shuffle import shuffle


client = Client()

ddf_d = timeseries(start='2000-01-01', end='2005-01-01', partition_freq='1d')  # 1827 partitions
ddf_d_2 = shuffle(ddf_d, "id", shuffle="tasks")

%time ddf_d_2 = ddf_d_2.persist()  #~8s on my machine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataframe discussion Discussing a topic with no specific actions yet scheduler
Projects
None yet
Development

No branches or pull requests

3 participants