-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-3376] Add experimental (non-production) in-memory shuffle option. #5403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
With this change, in-memory shuffle data is cleaned up in the same way as disk shuffle data: using the metadata cleaner. It would probably be better to clean up in-memory shuffle data more aggressively, in order to avoid running out of memory.
|
Yeah so my feeling on this one is I'm sure it's really useful for benchmarks where you can size things so that data is in memory, but I'd be really hesitant to expose this to the average Spark user. For instance, you could have some increase in the input size of your data and then suddenly, for no reason, your production job now fails with an out-of-memory exception. That seems like it could easily cause bad user experience for people so I just wondered if this could be maintained as a third party package. |
|
Test build #29813 has finished for PR 5403 at commit
|
|
One idea here is to store the shuffle data at MEMORY_OR_DISK storage level, so that things degrade more gracefully when memory becomes contended. |
|
Is it more easier to set spark.local.dir to mem disk and store these shuffle files to ram disks? It is more easier and less GC overhead from my understanding 😄 . Besides, I think shuffle framework could have two layers of abstraction:
So basically these two layers could have different combinations, like sort-based on-memory or sort-based on disk shuffle, hash-based on-memory or hash-based on disk shuffle... I think here the implementation is hash-based on-memory shuffle, so what do you think if we want sort-based on-memory shuffle? Just my thoughts, thanks a lot. |
|
@jerryshao I've found it can be tricky to configure a ram disk to be the correct size for this, making something like this easier. However, if folks have generally found that to be a suitable solution, I'm happy to just close this patch. Re: the two layers of abstraction, I don't think there's any reason to do a sort-based shuffle in-memory. The point of the sort-based shuffle is to improve Spark's use of disk by storing just one file for each map task, rather than opening <# reduce tasks> files for each map task (which makes some file systems like ext3 struggle, and also leads to much seek-ier disk use). As long as data gets stored in memory, I can't think of any reason why using the sort-based shuffle would improve performance (and there is some - likely small - performance cost of sorting all of the data). Are there other reasons you can think of that you'd want to use an in-memory version of the sort-based shuffle? |
|
Also one other consideration is that if you set spark.local.dir to mem disk, you can't persist other RDDs to disk, which you might want to do even if shuffle data is in-memory (although I suppose this might be unlikely?). |
|
Thanks a lot for your reply. Just my rough thought, I think if full sort-based shuffle (with sort shuffle reader) enabled as SPARK-2926 mentioned, the performance of sort-based shuffle in some cases like sort-by-key required (sort-merge-join) is still better than hash-based shuffle even in-memory as I think. But for now as you said hash-based shuffle in more better than sort-based shuffle for the current implementation. Also I think if this patch focus on benchmark, we need to well tune to make no spill in disk, in the current implementation, there's still some spilled files in disk (like ExternalAppendOnlyMap), so it depends on how to say this, if we target on benchmark, then it would be better all the data are on memory, so using mem disk is the same as this solution, but probably will get better performance (GC issue). Just my instant thought, I've no concrete reason to debate on this, sorry for any misunderstanding 😃 . |
|
Definitely, |
|
We'd be very interesting in trying this out - we've been pointing spark.local.dir to a ram disk when we need better shuffle performance, but have run into a number of issues with that model. We'd love to try out this in-memory shuffle to see if it helps. |
|
Since it's a pretty simple implementation, I'd be fine if it were merged in. But I think we should say clearly that it can be useful for benchmarking, etc, but isn't meant to be used in production setting since it's not robust to OOM. /cc @rxin for his thoughts also. |
|
I'm not sure how realistic this can be outside benchmarking mode. Also we will likely work on some code to substantially speed up shuffle, and as a result maybe this won't be as necessary? My worry with checking this in is that it will make it slightly harder to change the interfaces in the future (just more files to muck around). |
|
My two cents: I think the main reason to merge this is that one doesn't need to maintain patches out of tree that'll become outdated when the interface changes :) Also I think this patch completely gets rid of all filesystem overheads (like file open, close, flush etc.) which is a pretty useful design point. We've seen filesystem types being a big issue in the past and thus it'll be useful not just for research benchmarks but also for spark-perf with things like 'how much time does a small shuffle using files takes compared to this etc.' |
|
I agree with Shivaram's point that removing all filesystem dependencies is a valuable point in the design space. For deployments without local filesystems, but lots of RAM, having this option available (even if only as an experimental mode) would be valuable. As mentioned above, using a RAM disk for this can be tricky. |
|
I've seen a lot of experimental features go into Hadoop and then slowly wilt from lack of use and maintenance. My two cents are that we should only include an experimental feature when we have a concrete plan for how it can become production-ready, including a developer with the intention of carrying it out and maintaining it. |
|
My understanding was this wouldn't be an "experimental" feature in terms of how we've defined that in the past (i.e. it's not on a path to being something we'd expect people to ever use in production). It would just be an internal flag we could set when doing measurement work. We've never had such a feature before in the code base, so it's a bit of a question about whether we want to do that in general. I'm neutral to slightly positive on that idea. I think we'd just have it be undocumented and print a large warning that it's only for benchmarking work. I don't think this would add much more burden if the shuffle interface changes because it is ultimately much simpler than either of the existing two shuffles. I don't think this can possibly exist outside of the codebase because it uses the block storage API's. On the other hand, I can see this percolating around on mailing lists, etc as a way to "speed up your spark job". So there is an element of wanting to protect users from themselves and not have this in the codebase in a way that's easily accessible. |
|
Actually I lied - in the codebase we do have some flags we use only for performance analysis. One is "spark.shuffle.sync" which forces writes to sync to disk much more aggresively. It has no use to end-users other than for doing performance analysis of the shuffle. Though it is much more hidden and low level than this proposal here. |
|
By the way - if we did end up deciding to include this, I do feel that:
|
|
Maybe let's discuss this after 1.4.0 deadline? I just don't see much benefits to users, other than running benchmarks or academic studies. Over time I'm pretty sure this code path will fall behind, and not as optimized, which will then be bad for academic studies. So this at best will be good for benchmarks in a short amount of time (maybe 1 or 2 releases). |
|
It seems like there are two separate issues here: (1) Should Spark ever have an in-memory shuffle? Personally I think we should, partially because it's useful for benchmarking and partially because there are some environments (as @mikeringenburg pointed out) where it makes more sense to store shuffle data in-memory (for performance reasons or cluster provisioning reasons etc.). However, @rxin, it sounds like you're pretty strongly against this for maintainability reasons; if you're going to block all attempts at doing this, we should just close SPARK-3376 as "Will not fix". (2) If yes to the above question, should we add this particular in-memory shuffle? To list a few reasons why we might not want this implementation: |
|
My opinion is that the main criteria for including this are:
|
|
In response to @sryza's second question, I'm very interested in this PR, and would be happy to volunteer to help out wherever it's needed, including maintenance if necessary. |
|
This PR could have important performance implications for algorithms in GraphX and MLlib (e.g., ALS) which introduce relatively lightweight shuffle stages at each iteration. |
|
This is an important request for users with high performance systems that have large memory footprints and extreme bandwidth. Avoid disk contention is very important. This also includes the ability to perform in-memory only shuffles. |
|
Hey All, I would like to close this issue pending some further discussion, maybe offline. The main reason is that people keep asking me why we aren't merging in-memory shuffle into Spark when they see this patch, even though clearly the current patch here is not intended as a productionized implementation (but there is no such indication in the title, and it's targeted at SPARK-3376 which asks for a memory shuffle for production workloads). In terms of whether to have this or not. A key question IMO is whether disk plays a major performance role in shuffle write when workloads do fit in the disk buffer cache (these are the same workloads that would be optimized by memory shuffle). So it would be cool to see some results on that. I think probably it makes sense to just get together offline and discuss it. |
|
@pwendell I've seen performance gains even when the data fits in the buffer cache on the write side, because opening a file still requires writing metadata to disk, which can become a point of contention when many tasks are opening many files at the same time. I've also heard from a few folks who have configurations with a lot of memory (and not much disk space / no local disks) who would find this useful. That being said, there are other compelling reasons to close this -- which I've summarized in the PR description. |
|
We have one of the configurations to which @kayousterhout refers - a good deal of local memory, but no local file system, only a global parallel file system (Lustre). Using Lustre for shuffle's temporary directory performs very poorly, and using a local ram disk is limiting due to one of the issues mentioned in the updated PR description - namely that shuffle data is cleaned up very slowly, meaning that we may run out of memory after a number of iterations. Thus, my feeling is that perhaps finding a way to more aggressively clean up the shuffle data might be a bigger priority - it would make something like this PR more suitable for production, and would also make using a ram disk for shuffle data more viable. |
This commit adds a new ShuffleManager that stores all shuffle data
in-memory using the block manager.
With this change, in-memory shuffle data is cleaned up in the
same way as disk shuffle data: using the metadata cleaner.
This version of in-memory shuffle is not appropriate for use in
production because:
(1) Shuffle data is only cleaned up using the metadata cleaner, which
cleans up shuffle data slowly and in the background once an RDD goes
out of scope (under the assumption that data is on-disk, so there's no
particular urgency). For in-memory shuffle data, we'd likely want to clean
up the data more aggressively, once the shuffle has completed, which is
challenging to do with the current code structure (and is not done in this
patch).
(2) We need some clear way of separating shuffle data from other data, because
otherwise a user could get an OOM because shuffle data was filling up memory.
Ideally, we need a graceful way of spilling shuffle data to disk once memory becomes
full (one simple idea would be to just use MEMORY_AND_DISK storage level).
For these reasons, we've decided not to merge this patch.
cc @shivaram