Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrong aggregation result in Spark SQL tests after enabling columnar shuffle #260

Closed
viirya opened this issue Apr 11, 2024 · 2 comments
Closed
Assignees
Labels
bug Something isn't working

Comments

@viirya
Copy link
Member

viirya commented Apr 11, 2024

Describe the bug

While trying to enable columnar shuffle by default, I found some Spark SQL tests are failed. Some are wrong aggregate result, e.g.

SQLQuerySuite: SPARK-8828 sum should return null if all input values are null

[info]   == Physical Plan ==                                                                                                                                                                                                           
[info]   AdaptiveSparkPlan isFinalPlan=true                                                                                                                                                                                            
[info]   +- == Final Plan ==                                                                                                                                                                                                           
[info]      *(2) ColumnarToRow                                                                                                                                                                                                         
[info]      +- CometHashAggregate [sum#3766L, sum#3767, count#3768L], Final, [sum(a#136), avg(a#136)]                                                                                                                                  
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=5228]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_sum(a#136), partial_avg(a#136)], output=[sum#3766L, sum#3767, count#3768L])
[info]                     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$NullInts, true])).a.intValue AS a#136]
[info]                        +- Scan[obj#135]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [sum#3766L, sum#3767, count#3768L], Final, [sum(a#136), avg(a#136)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=5117]
[info]         +- HashAggregate(keys=[], functions=[partial_sum(a#136), partial_avg(a#136)], output=[sum#3766L, sum#3767, count#3768L])
[info]            +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$NullInts, true])).a.intValue AS a#136]
[info]               +- Scan[obj#135]
...
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<sum(a):bigint,avg(a):double>
[info]   ![null,null]                [null,NaN] (QueryTest.scala:243)

aggregation with codegen:

== Physical Plan ==
[info]   AdaptiveSparkPlan isFinalPlan=true
[info]   +- == Final Plan ==
[info]      *(2) ColumnarToRow
[info]      +- CometHashAggregate [sum#4362, sum#4363, count#4364L], Final, [sum(null), avg(null)]
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10168]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_sum(null), partial_avg(null)], output=[sum#4362, sum#4363, count#4364L])
[info]                     +- *(1) SerializeFromObject
[info]                        +- Scan[obj#12]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [sum#4362, sum#4363, count#4364L], Final, [sum(null), avg(null)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10060]
[info]         +- HashAggregate(keys=[], functions=[partial_sum(null), partial_avg(null)], output=[sum#4362, sum#4363, count#4364L])
[info]            +- SerializeFromObject
[info]               +- Scan[obj#12]
[info]   
[info]   == Results ==
[info]   
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<sum(a):double,avg(a):double,count(NULL):bigint>
[info]   ![null,null,0]              [null,NaN,0] (QueryTest.scala:243)

SPARK-3176 Added Parser of SQL LAST():

[info]   == Physical Plan ==
[info]   AdaptiveSparkPlan isFinalPlan=true
[info]   +- == Final Plan ==
[info]      *(2) ColumnarToRow
[info]      +- CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]
[info]         +- ShuffleQueryStage 0
[info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10390]
[info]               +- RowToColumnar
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])
[info]                     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]
[info]                        +- Scan[obj#92]
[info]   +- == Initial Plan ==
[info]      CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10279]
[info]         +- HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])
[info]            +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]
[info]               +- Scan[obj#92]
[info]   
[info]   == Results ==
[info]   
[info]   == Results ==
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<last(n):int>
[info]   ![4]                        [2] (QueryTest.scala:243)

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

@viirya viirya added the bug Something isn't working label Apr 11, 2024
@viirya
Copy link
Member Author

viirya commented Apr 11, 2024

The first two failures are due to incorrect null handling in Comet Average expression, I will submit a fix soon.

@viirya
Copy link
Member Author

viirya commented Apr 18, 2024

The last failure was fixed by #262.

@viirya viirya closed this as completed Apr 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant