-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
address some comments for 11792 #11816
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set the max to a higher number and throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid not. Even if we change the max repartition number from 10 to 20, there will still be failed test cases because of this. Actually, given a small enough batch size, we can always create a contrived case where it "can not repartiton out"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been thinking about this a little more and I think we do need to throw an exception, but we might need to fix some other issues first. We should be doing some debugging to understand fully why the tests are outputting this warning/failing.
Our tests are a little crazy because we can set the batch size to be very small, which can expose some bad cases. But if we do the math this should not be that big of a problem. By default we split the input data 16 ways on each pass. We set the batch size bytes to 250 bytes in some extreme tests. That means if we have a limit of 9 times that we can partition the data before we get a warning that means we have (250 * 16 ^ 9). That is 15.625 TiB of data, assuming that there are no hash collisions.
That said we should be able to know how many partitions we need to do up front because we know the size, row count, and batch count for all of the input data. We used 16 on the join because we didn't know how much data would be processed yet.
I think what is more likely happening is that because we have lots of small batches, after a single pass of the aggregate no output batches have been reduced in size. So then we start trying to partition, but one or two rows for the same key might be large enough that we cannot hit that 250 byte batch size Because of that it does not matter how many times we re-partition the data it will never be small enough.
That is a live lock situation I want to avoid. So we either need to throw an exception when we hit this case or we need to just give up trying to make the data smaller and just cross our fingers and hope that we have enough memory to make it work.
But this is just me speculating. Someone needs to debug what is happening with the failing tests and understand how we can fix them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I can look into it for a further look. But it might take a while and this PR will be pending longer, is that ok to you (considering this is a follow-up work of #11792 )? @revans2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some debugging of my own and I understand what is happening. I reproduced the issue by running
against spark 3.4.0. Not all of the tests failed, but two of them did. I also added in a lot of debugging.
It turns out that the problem is because of how Spark handles NULL in a hash. A NULL is a noop and returns the seed unchanged. This can lead to a high number of hash collisions. So if we are hashing two columns (NaN, NULL) and (NULL, NaN) (in this case), then the two have the same hash, but different values.
So if we have a very small batch size (like in the tests) and NULLs show up in the data we can run into situations like this.
needRepartionAgain
has a check to see if all of the batches have a single row in them to avoid issues with the tests, but technically this is flawed because our hash function is flawed. Using the CUDF implementation instead of the Spark one might fix the issue, but there could be other issues hiding in the hash function. As such I am fine with how the code is now, but I would like a follow on issue where we explore the idea of doing the re-partition once, because we know the size of the input data.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Bobby, I think we don't know the size of the input data ? In previous sort based agg implementation, all input batches are fetched before doing sort, but in current repartition based agg implementation, we repartition each input batch before next input batch is fetched. We did so to avoid unnecessary spills.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is true for the first pass. But if a second pass is needed we know that size of the data because we have processed the entire input at that point and made it spillable. We know a lot at that point.
Along with this we know that all of the rows in a batch have unique keys (because it has passed the merge stage)
From this we can derive that the number of unique keys in the bucket is between
aggregateBatches.map(_.numRows).max()
andaggregatebatches.map(_numRows).total
That means if we have a good hash implementation, which we do not but can probably fix, then we should be able to split the input so for a second pass so that we have
If after re-partitioning it the second time, then we have probably done the best we could possibly have done and we should just try and make the aggregation work. This should make the worst case situation much better because we will only go through the data 3 times at the most instead of the 11 times max that we have today.
The second optimization that I would make (especially if we want to reduce spilling). Is to not re-partition everything in one go.
The order of operations should be something like the following. Note there is a lot of hand waving here.
This way we will have released as much spilled data as we can after the first repartition pass instead of holding on to it just so we can finish repartitioning everything. It should not reduce the maximum memory used, but it should reduce how long we are at that maximum memory used mark.
I'm not sure if this is going to help in practice. A good hash algorithm should spread the data fairly evenly so we should not see much skew/hash collisions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But if a second pass is needed we know that size of the data because we have processed the entire input at that point and made it spillable. -- Not sure if you noticed when reviewing, but in my understanding this is no longer true now, please check :
spark-rapids/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Line 1007 in 32aa3e1
I'm going to merge this PR first. Please feel free to continue the discussion here.