Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

P2P shuffle questions #5939

Open
mrocklin opened this issue Mar 11, 2022 · 3 comments
Open

P2P shuffle questions #5939

mrocklin opened this issue Mar 11, 2022 · 3 comments

Comments

@mrocklin
Copy link
Member

Howdy folks, I took a look through the recent P2P shuffle implemention. I had a few questions and thoughts. I suspect that most of this is for @gjoseph92 and @fjetter

Just making sure I understand things

So we've taken the structure in the POC and ...

  • added classes and things
  • added the ability for multiple simultaneous shuffles (nice thought with the shuffle ID)
  • stripped out performance considerations (buffering, playing between the thread pool and event loop
  • stripped out disk
  • Generally cleaned things up

Is this correct? Am I missing other large changes?

Some random thoughts

These are just thoughts for future improvements as we evolve. I'm looking for "yup, that's maybe a good idea" or "nope, that's probably dumb because of x, y, z"

  • At some point we might consider engaging the scheduler, rather than getting the list of workers in the first create task. My guess is that we'll need the scheduler anyway when we start thinking about resilience
  • things like DataFrame.groupby should probably be pulled into the sync part of tasks, right? My guess is that we moved this onto the event loop because we were emphasizing simplicity over performance in this round. Is that correct or was there some other issue going on?
  • Any objection to adding "p2p" to Dask as an option there?

Future efforts

I see two major efforts:

  1. Disk and performance
  2. Resilience / worker failure

Assuming that we had magical solutions to both of those problems, what else is missing?

@fjetter
Copy link
Member

fjetter commented Mar 14, 2022

At some point we might consider engaging the scheduler, rather than getting the list of workers in the first create task. My guess is that we'll need the scheduler anyway when we start thinking about resilience

There was an attempt at using a SchedulerPlugin for synchronization instead of relying worker RPC calls. That was pretty cool and would offer an alley to potentially fix other problems. However, at the same time it either required us to parse keys or introduce special annotations. In the end we chose the current approach for simplicity. All of the RPC stuff in the tasks could be easily moved to a plugin once we figured out how to filter/select the keys efficiently

See also #5524

things like DataFrame.groupby should probably be pulled into the sync part of tasks, right? My guess is that we moved this onto the event loop because we were emphasizing simplicity over performance in this round. Is that correct or was there some other issue going on?

yup

Any objection to adding "p2p" to Dask as an option there?

nope

@gjoseph92
Copy link
Collaborator

At some point we might consider engaging the scheduler

See #5524 and the extensive discussion there, including #5524 (comment) and the comment above for why we didn't end up merging that PR instead (even though in principle it worked better, and is the approach we'll need to switch to).

DataFrame.groupby should probably be pulled into the sync part of tasks, right? My guess is that we moved this onto the event loop because we were emphasizing simplicity over performance in this round

Yes (assuming by "sync" you mean "in a thread"; it is called right now within a task in sync, but runs in the worker event loop). And it doesn't make much difference anyway, since groupby mostly holds the GIL, so it doesn't parallelize that well, and therefore will still functionally block the event loop due to the convoy effect. So definitely seemed worth the simplicity for now.

# NOTE: `groupby` blocks the event loop, but it also holds the GIL,
# so we don't bother offloading to a thread. See bpo-7946.
for output_partition, data in data.groupby(self.metadata.column):

In [1]: import pandas as pd
In [2]: import numpy as np
In [3]: df = pd.DataFrame(np.random.randint(1, 1000, 100_000))
In [4]: %load_ext ptime

In [5]: %ptime -n 2 list(df.groupby(0))
Total serial time:   0.07 s
Total parallel time: 0.06 s
For a 1.19X speedup across 2 threads

In [6]: %ptime -n 4 list(df.groupby(0))
Total serial time:   0.14 s
Total parallel time: 0.16 s
For a 0.86X speedup across 4 threads

In [7]: %ptime -n 8 list(df.groupby(0))
Total serial time:   0.29 s
Total parallel time: 0.26 s
For a 1.14X speedup across 8 threads

Any objection to adding "p2p" to Dask as an option there?

dask/dask#8392 did that and we closed it. I'd object to adding it right now, since there's no reason why anyone should use it currently. It's slower than a task shuffle, extremely non-resilient, and is all in-memory. Once it's useful to an end user, then it should be added.

Assuming that we had magical solutions to both of those problems, what else is missing?

  1. Identifying the names of downstream tasks after fusion https://github.com/dask/distributed/pull/5524/files#r765203083

  2. Culling https://github.com/gjoseph92/distributed/blob/p2p-shuffle/proposal/distributed/shuffle/shuffle-design.md#graph-rewrite-on-cull / [DNM] Peer-to-peer shuffle design #5435 (comment)

    These two could initially be addressed by making a new Layer type with a custom cull method (and therefore preventing downstream task fusion). However, losing downstream fusion will expose us to root task overproduction. We thought we'd be able to add some hacks in the shuffle extension implementation to mitigate this by peeking at worker state P2P shuffle skeleton #5520 (comment). Though thinking about it now, I think doing this will be very invasive (the extension might pop extra tasks out of worker.ready and only add them back in once other non-unpack tasks had been received, or something similar on the scheduler)—basically, it would be a hacky version of the backpressure that really should be implemented generically in distributed to fix root task overproduction Distributed scheduler does not obey dask.order.order for num_workers=1, num_threads=1 #5555.

I think there are some other issues (I remember coming up with a situation in which our resilience strategy wouldn't work at some point), but I don't remember them off the top of my head.

@mrocklin
Copy link
Member Author

Heads up, I've started looking into this problem again, hence the recent questions

Yes (assuming by "sync" you mean "in a thread"; it is called right now within a task in sync, but runs in the worker event loop). And it doesn't make much difference anyway, since groupby mostly holds the GIL, so it doesn't parallelize that well, and therefore will still functionally block the event loop due to the convoy effect. So definitely seemed worth the simplicity for now.

There is an in-between state where we await offload(function) to move it off of the event loop, but still have it be relatively simple. I'm doing that for now.

Also, I'm looking into using arrow sort_by/slice to do this outside of pandas

I'd object to adding it [p2p] right now, since there's no reason why anyone should use it currently.

Makes sense

Identifying the names of downstream tasks after fusion
Culling

Yup, that'll be interesting 🙂

I was chatting with Florian and he brought up network backpressure. My suggestion for that is ...

  1. Start by trying hard to make network the bottleneck, then network hardware bottlenecks save the memory hierarchy :)
  2. If/when that fails, have workers ask ahead of time before they send things, and increase the network concurrency a little to compensate

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants