-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Efficient scalable shuffle - P2P shuffle extension #7507
Comments
Hello, this is very cool to see, and I'm excited to see it being applied to arrays! In my Xarray-centric experience the shuffle is usually expressed as a out-of-order indexing of a dask.Array (for e.g. index out all elements belonging to a group in a groupby operation; and present that to the user as a new array). See this comment and below for some discussion. Currently this just results in an explosion of tasks, and is only workable for a limited set of problems. For built-in reductions, we can use a Is it feasible for the indexing/slicing code to use the shuffle for such cases. |
So far, we implemented this only for the API call arr.rechunk and haven't considered more generic slicing. However, the biggest lift was not the rechunk API but rather figuring out how to chop up the array and piece it back together properly but that's basically slicing :) We are currently generating a plan (this all is subject to change) that calculates the "target chunk" for every slice of an input chunk distributed/distributed/shuffle/_rechunk.py Lines 134 to 162 in f4328bb
effectively this is just a mapping like The only difference between rechunk and generic slicing (IIUC) is that rechunk typically slices contiguously. The extension currently doesn't care (or optimize for) this so I am pretty confident we can express generic indexing as well using this. There are very likely cases where the old style indexing is still better so a challenge will be figuring out which algorithm to choose. @dcherian is |
That's what I was thinking ;)
Yes it is useful but I'll have to make up a use-case. It has been such a problem for so long, we try to avoid it as much as possible, or use rechunker to create a second copy of the dataset. The other common place this bites us is |
I think that the right approach here is to start to generalize out the shuffle system to the point where a sharp and adventurous developer, like @dcherian , could try implementing his own operation. This will also likely come up with My sense is that there is likely some abstraction that could be designed similar to |
This looks very exciting! I have a test problem I would like to try it out on. What's the easiest way for me to do that? Use Coiled with distributed > |
@TomNicholas: Yes, using On a related note, I'm currently improving the P2P rechunking implementation (distributed#7897). I still have to create a range of benchmarks to test these changes and a decision heuristic between Please let me know if you have more common rechunking workloads you care about. Creating an issue on https://github.com/coiled/benchmarks/issues might be the best way to do that. |
I'm not sure why P2P rechunking helps with Tom's test problem. Can someone explain that? ms = MemorySampler()
ds = xr.Dataset(
dict(
anom_u=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
anom_v=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
)
)
quad = ds**2
quad["uv"] = ds.anom_u * ds.anom_v
mean = quad.mean("time")
with ms.sample():
mean.compute() |
@dcherian: I don't think it does. I've tested the problem recently, and it appears to lack rechunking of any sort. Hence P2P rechunking won't help. @TomNicholas: Do you have more information on this? Where would you have expected it to help? |
Okay hmmm - in that case there must be some other change that's caused me to see a totally different performance between running an old version of distributed locally and the new version with P2P on Coiled. I'll try both on Coiled instead now to eliminate possible causes. |
Okay I take it back, |
If you ran this on Coiled, can you provide us with the cluster ID? |
232904 for without P2P and 232905 for with P2P. |
These two runs both look pretty much the same. Were you able to reproduce the totally different performance you mentioned here? |
The performance I originally saw when I ran with a local cluster without p2p looked like this. I'm wondering now whether I actually somehow accidentally switched from measuring total cluster memory usage of all types (including spilled) to just measuring process memory when I moved to Coiled. That would explain the difference in the shape of the graphs. Now on Coiled I get this plot, where the red is spilling, which is consistent with my first result. EDIT: Actually I now realise what happened - I had set |
Today I tried the P2P chunking to rechunk a 3D array (chunked at 128x128x128) into an optimized, directional (1x1024x1024) variant. The input array is Zarr and the output is also Zarr. On array-based P2P chunking, I noticed the shuffle barrier is the primary source of the bottleneck (i.e. seems like it makes a copy of the entire array on disk? I saw huge disk write usage while it wasn't writing anything to the target array.) It starts Am I doing something incorrectly, or are my expectations wrong? I would expect (N, 1024, 1024) sized pieces to be written out whenever they're ready and freed from temporary cache. The memory usage is definitely consistent, but it doesn't flush anything to the new Zarr array until the whole dataset is read. Any suggestions on the best way to diagnose what is going on? |
@tasansal What you are describing is indeed what is expected. Before we can write out any of the output chunks we have to read the entire dataset in, otherwise the output chunk may be incomplete before we write it out. |
That's probably a case where task-based rechunking is better. In general my guess is that we'll probably prefer task-based rechunking when the number of stages in the plan is minimal, and prefer p2p-based rechunking when there are more stages in the plan. Those cases have full copies regardless. |
@fjetter I understand. However, that would reduce the utility of P2P shuffling for many tasks, no? Is it possible to have multiple shuffle barriers based on the chunking? What I mean is: If I have an array that is shaped @mrocklin That Makes sense. Is there a way to limit/adjust the memory usage of task-based re-chunking? I have tried it with the new Also, probably out of scope, but caching the full float32 array on disk doesn't work well with compressed formats like Zarr. It can potentially take 2-10x more disk space than users expect. Should this be documented? Or could the cache use compression? |
The benefit is indeed situational; that's also why we haven't made it the default (as opposed to P2P shuffles for dataframes). Finding a good heuristic has also proven surprisingly hard and depends on the underlying hardware (dask/dask#10226).
We've considered the possibility of using multiple barriers, but it is not a high priority. The way P2P is currently built, it would still force you to roundtrip all data through disk even if it fits into memory. So before solving the algorithmic problem, we need to fix this one. Otherwise, you likely wouldn't benefit much from the speedups because rechunking would still be disk-bound. #7618 is a first step into that direction. All that being said, let us know if you would like to get involved in the efforts of improving P2P rechunking. There's much to be done :)
Given the current append-strategy to the files on disk, I doubt that we would gain much benefit from disk. Documenting this might be a good idea, would you be interested in creating a PR for this? |
@tasansal if you're not already aware you might be interested in cubed, which is a generalisation of rechunker (and ships a version of rechunker within it). |
@tasansal can I ask you for an example problem that people can look at? Is something like the following representative of your problem? import dask.array as da
x = da.random.random((2000, 2000, 2000), chunks=(128, 128, 128))
x.rechunk(("auto", 1000, 1000)).to_zarr(...) |
I'm also curious, what is actually stopping you from using this today? Is it slow performance? (If so, I'm curious what you're using today that has better performance). Is it running out of memory? (that seems unlikely given the p2p solution). Is it running out of disk? What I'm hearing now is "this could be better" and I entirely agree with that. I'm also curious on what specifically is causing you pain, and also what, if anything, makes it insufficient. |
I ran this both locally and on cloud and I think I'm hitting hardware performance. Video here in case people are intereted. Probably my version of the problem isn't representative of what you're running into @tasansal . If you can help by providing a representative example that would be really useful. |
Interesting, I don't suppose there's a way to reproduce this, even if in a non-minimal way? My sense is that for rechunking that isn't all-to-all (like (1000, 1) to (1, 1000)) the the task based mechanism will be near-optimal. If there is some ordering issue then we should try to hunt that down. |
annoyingly, my attempt at a reproducer with public data has led to the graph I expect :/
|
Ah, managed to reproduce with just adding a few more files. Also I think I'm hijacking @fjetter's thread here, so lmk if I should just open a new issue. Code is as follows (takes ~5 minutes to download if depending on zone, requires ~20 GiB of disk space). Interesting that this only shows up when I have 24 files instead of the 12 above.
|
ds.air_temperature_at_2_metres.data.visualize(optimize_graph=True, color='order') Can you share the result of this call? You'll need to include a filename: ds.air_temperature_at_2_metres.data.visualize("myfile.png", optimize_graph=True, color='order') The |
🤔 someone else should take a look at that, but it's not immediately clear to me that that's a fail case just yet. Looking at the numbers, it seems like maybe the graphviz/dot layout is a bit off? There are clearly lines swooping around that look off, but when you check the numbers and colors it seems like Graphviz has placed things a oddly anyway (everyone uses heuristics to this problem). Are you able to make this look more obviously bad by adding more files? |
The swooping lines in the visualize() figure aren't clear issues, agree that they're mostly just a quirk of plotting. Some issues that I do see: node 200 (bottom, middle right) should be running before 186, which should run before 179; I think this might be whatever led to the allowed failing of this test. I'm wondering if it's in conflict with the desired ordering specified in this test? |
Just checking in here, I've gotten pulled away this week. I think that @fjetter plans to think about this soon though. It's near the top (but not yet at the top) of his priority list. |
Motivation
Shuffles are an integral part of many distributed data manipulation algorithms. Common DataFrame operations relying on shuffling include
sort
,merge
,set_index
, or various groupby operations (e.g.groupby().apply()
,groupby(split_out>1)
) whereas the most stereotypical array workload is therechunk
. There are many other applications for an efficient shuffle implementation which justifies taking a dedicated approach to solve this issue.Shuffling is a poor fit for centralized graph-based scheduling, since the graph is all-to-all (naive O(N²), dask O(N logN); in size where N is the number of partitions), yet the core logic of a shuffle is so simple, it benefits little from centralized coordination, while suffering significant overhead from it. With task-based shuffles, the amount of data we can shuffle effectively (before workers run out of memory, or the scheduler crashes or bottlenecks) is severely limited. Allowing workers to autonomously exchange data with their peers and manage disk and memory usage in a more granular way allows us to push that limit significantly higher.
See https://coiled.io/blog/better-shuffling-in-dask-a-proof-of-concept/ for more background.
This issue tracks the current implementation progress and highlights various milestones. We intend to update the top-level description of this issue continuously such that this issue can serve as an always up-to-date overview of the current efforts.
Goals
Roadmap
1 - Foundations and dask.DataFrame ✅
The implementation effort so far focused on creating a stable foundation for the things to come and is deriving from the early prototype. This stage mostly focused on a consistent concurrency model that supports off-band, direct peer to peer communication between workers and integrates well with the existing task based scheduling logic.
This was developed at the example of a
DataFrame
based shuffle and we consider this now ready to use!For detailed instructions, known issues and feedback, please see #7509. We encourage all users of
dask.DataFrame
to try this and report with feedback.2 - dask.Array rechunking
The new shuffle extension is currently build to handle pandas DataFrames and is using pyarrow behind the scenes. It's architecture is built with generic types in mind and will be suited just as well for array workloads. One of the most popular many-to-many problems is the array rechunking which we will implement next using this extension.
Basic functionality is being set up in #7534
This approach already provides constant time array rechunking but sometimes falls short in terms of walltime performance compared to old style task based shuffling.
3 - Misc
This next stage is not as refined as the intial ones. There are many smaller to medium sized issues that will either expand adoption of the P2P algorithm or make it run faster and smoother. This section will become more refined over time.
dask.dataframe.merge
#7496string[pyarrow]
dtype does not roundtrip in P2P shuffling #7420The text was updated successfully, but these errors were encountered: