-
Notifications
You must be signed in to change notification settings - Fork 237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
address some comments for 11792 #11816
base: branch-25.02
Are you sure you want to change the base?
Conversation
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
build |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala
Outdated
Show resolved
Hide resolved
// repartitioning out (considering we're changing seed each time we repartition). | ||
// However for some test cases with really small batch size, this can happen. So | ||
// we're just logging some warnings here. | ||
log.warn("The bucket is still too large after " + recursiveDepth + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use scala string interpolation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is also not clear to me why we end up in this situation in the tests. Can you give me an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @abellina , if you replace this warning with a throw, and run DATAGEN_SEED=1732770215 SPARK_HOME=...... integration_tests/run_pyspark_from_build.sh -k 'hash_aggregate_test and spark.rapids.sql.batchSizeBytes'
(the seed is same as in #11790 (comment)), then you'll get see failed tests like:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use scala string interpolation here?
done
// duplicated rows (the duplication only happens in different batches) to prevent | ||
// repartitioning out (considering we're changing seed each time we repartition). | ||
// However for some test cases with really small batch size, this can happen. So | ||
// we're just logging some warnings here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set the max to a higher number and throw?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid not. Even if we change the max repartition number from 10 to 20, there will still be failed test cases because of this. Actually, given a small enough batch size, we can always create a contrived case where it "can not repartiton out"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been thinking about this a little more and I think we do need to throw an exception, but we might need to fix some other issues first. We should be doing some debugging to understand fully why the tests are outputting this warning/failing.
Our tests are a little crazy because we can set the batch size to be very small, which can expose some bad cases. But if we do the math this should not be that big of a problem. By default we split the input data 16 ways on each pass. We set the batch size bytes to 250 bytes in some extreme tests. That means if we have a limit of 9 times that we can partition the data before we get a warning that means we have (250 * 16 ^ 9). That is 15.625 TiB of data, assuming that there are no hash collisions.
That said we should be able to know how many partitions we need to do up front because we know the size, row count, and batch count for all of the input data. We used 16 on the join because we didn't know how much data would be processed yet.
I think what is more likely happening is that because we have lots of small batches, after a single pass of the aggregate no output batches have been reduced in size. So then we start trying to partition, but one or two rows for the same key might be large enough that we cannot hit that 250 byte batch size Because of that it does not matter how many times we re-partition the data it will never be small enough.
That is a live lock situation I want to avoid. So we either need to throw an exception when we hit this case or we need to just give up trying to make the data smaller and just cross our fingers and hope that we have enough memory to make it work.
But this is just me speculating. Someone needs to debug what is happening with the failing tests and understand how we can fix them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some debugging of my own and I understand what is happening. I reproduced the issue by running
env TEST_PARALLEL=0 MAX_PARALLEL=20 DATAGEN_SEED=1732770215 ./run_pyspark_from_build.sh -k 'test_hash_multiple_mode_query' -s 2>&1 | tee log.txt
against spark 3.4.0. Not all of the tests failed, but two of them did. I also added in a lot of debugging.
It turns out that the problem is because of how Spark handles NULL in a hash. A NULL is a noop and returns the seed unchanged. This can lead to a high number of hash collisions. So if we are hashing two columns (NaN, NULL) and (NULL, NaN) (in this case), then the two have the same hash, but different values.
So if we have a very small batch size (like in the tests) and NULLs show up in the data we can run into situations like this.
// Deal with the over sized buckets
def needRepartitionAgain(bucket: AutoClosableArrayBuffer[SpillableColumnarBatch]) = {
bucket.map(_.sizeInBytes).sum > targetMergeBatchSize &&
bucket.size() != 1 &&
!bucket.forall(_.numRows() == 1) // this is for test
}
needRepartionAgain
has a check to see if all of the batches have a single row in them to avoid issues with the tests, but technically this is flawed because our hash function is flawed. Using the CUDF implementation instead of the Spark one might fix the issue, but there could be other issues hiding in the hash function. As such I am fine with how the code is now, but I would like a follow on issue where we explore the idea of doing the re-partition once, because we know the size of the input data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Bobby, I think we don't know the size of the input data ? In previous sort based agg implementation, all input batches are fetched before doing sort, but in current repartition based agg implementation, we repartition each input batch before next input batch is fetched. We did so to avoid unnecessary spills.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is true for the first pass. But if a second pass is needed we know that size of the data because we have processed the entire input at that point and made it spillable. We know a lot at that point.
- The size of the input data in bytes
- The number of rows total and per batch
- The number of input batches (although my debugging showed that we are putting a lot of empty batches in here and should probably filter them out)
Along with this we know that all of the rows in a batch have unique keys (because it has passed the merge stage)
From this we can derive that the number of unique keys in the bucket is between aggregateBatches.map(_.numRows).max()
and aggregatebatches.map(_numRows).total
That means if we have a good hash implementation, which we do not but can probably fix, then we should be able to split the input so for a second pass so that we have
val minCardinality = aggreateBatches.map(_.numRows).max()
val targetBatchesBySize = aggregateBatches.map(_.size).sum() / targetBatchSize
val numBuckets = min(minCardinality, targetBatchesBySize)
If after re-partitioning it the second time, then we have probably done the best we could possibly have done and we should just try and make the aggregation work. This should make the worst case situation much better because we will only go through the data 3 times at the most instead of the 11 times max that we have today.
- to read it in and repartition it once
- to read in repartitioned data and repartition it a second time
- to read in second order repartitioned data and do the merge aggregation.
The second optimization that I would make (especially if we want to reduce spilling). Is to not re-partition everything in one go.
The order of operations should be something like the following. Note there is a lot of hand waving here.
val buckets = repartition_pass_1
val remaining = mergeSmallEnoughBucketsAndReturnFromIterator(buckets)
remaining.for_each { tooLargeBucket =>
val buckets = repartition_pass_2(tooLargeBucket)
mergeBucketsAndRetrunFromIterator(buckets)
}
This way we will have released as much spilled data as we can after the first repartition pass instead of holding on to it just so we can finish repartitioning everything. It should not reduce the maximum memory used, but it should reduce how long we are at that maximum memory used mark.
I'm not sure if this is going to help in practice. A good hash algorithm should spread the data fairly evenly so we should not see much skew/hash collisions.
// repartitioning out (considering we're changing seed each time we repartition). | ||
// However for some test cases with really small batch size, this can happen. So | ||
// we're just logging some warnings here. | ||
log.warn("The bucket is still too large after " + recursiveDepth + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is also not clear to me why we end up in this situation in the tests. Can you give me an example?
The base branch was changed.
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the problem now. The issue could happen in production (in theory), but it is so unlikely that it really does not matter. I would like a follow on to fix some things with partitioning, but it is not a big deal and I can file it myself.
@abellina could you please take another look? I debugged what was happening with the test and it is really unlikely to be a production problem, but I want to hear your opinion on it. |
I'll take a look @revans2 @binmahone, it might not be today, but I'll try to at least get the gist of it from Bobby. |
#11792 is merged without @revans2 's approval because of #11792 (comment). A few comments are received afterwards from @abellina @gerashegalov and @revans2 . This PR is to address their comments