-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21870][SQL] Split aggregation code into small functions #20965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Since SPARK-23791 gave me a heads-up, I submitted again for reviews. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not needed since we can use CodeGenerator.calculateParamLength
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
|
retest this please |
|
Test build #88840 has finished for PR 20965 at commit
|
ae61eb9 to
696ba17
Compare
|
Test build #88876 has finished for PR 20965 at commit
|
|
Test build #88875 has finished for PR 20965 at commit
|
|
retest this please |
|
Test build #88881 has finished for PR 20965 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just put those into object CodeGenerator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
ee8a68f to
640fd8f
Compare
|
Test build #88917 has finished for PR 20965 at commit
|
|
Test build #88918 has finished for PR 20965 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason why we are not using CodeGenerator.splitExpressions instead of this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's true and I'll try to fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aha, I remember; Since this splitting needed to consider subexpression elimination, I added this specific splitting function for HashAggregateExec here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, can't we add instead a splitExpressionsWithSubexprElimination or something similar to CodeGenerator instead, so that it can be reused if needed somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I'll recheck.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this needed now and it was not before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't have these copies, some queries return wrong answers, e.g.,
import org.apache.spark.sql.execution.debug._
sql("CREATE OR REPLACE TEMPORARY VIEW testData AS SELECT * FROM VALUES (1, 1), (1, 2), (2, 1) AS testData(a, b)")
val df = sql("SELECT SKEWNESS(a) FROM testData")
df.debugCodegen
df.show
scala> df.show (copy)
+---------------------------+
|skewness(CAST(a AS DOUBLE))|
+---------------------------+
| 0.7071067811865475|
+---------------------------+
scala> df.show (non-copy)
+---------------------------+
|skewness(CAST(a AS DOUBLE))|
+---------------------------+
| -1.368454115659954|
+---------------------------+
In the original gen'd code, aggregation buffer updates happen in the end of an Aggregate consume;
https://gist.github.com/maropu/ec5a322c40e9e7e0024c3074a261197b#file-codegen-d-skew-in-the-master-L807
But, if we split aggregation functions into pieces, these updates happen in the end of each split function, e.g.,;
https://gist.github.com/maropu/1e9e7cb2377622549163261fc321a108#file-codegen-d-skew-in-spark-21870-2-local-copy-L822
So, if we don't copy the previous state to local buffers, the aggregation wrongly references the updated state and then returns an incorrect answer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but this may introduce a perf regression, doesn't it? in this case I am not sure if we should do it or not, WDYT @cloud-fan @gatorsmile @hvanhovell ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hopefully this method will become easier after @viirya's PR gets merged. Anyway, this seems to me quite a generic method: shall we move it to CodeGenerator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
63f6e90 to
f2cd5ca
Compare
|
Test build #89556 has finished for PR 20965 at commit
|
|
Test build #89557 has finished for PR 20965 at commit
|
|
oh, made a mistake ... |
|
Test build #95334 has finished for PR 20965 at commit
|
|
Thanks for the check, @rednaxelafx ! As @cloud-fan said, I think that code length/bytecode mismatches are our well-known issue... @cloud-fan Yea, sure. we must check that before merging this. |
|
Test build #110103 has finished for PR 20965 at commit
|
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good now. Just found a style problem.
| aggCodeBlocks.fold(EmptyBlock)(_ + _).code | ||
| } | ||
| } else { | ||
| aggCodeBlocks.fold(EmptyBlock)(_ + _).code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As conf.codegenSplitAggregateFunc is enabled by default, can we run jenkins test after disabling it? We can enable it before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, reasonable suggestion. I'll try.
| val bufferOffset = bufferStartOffsets(i) | ||
| // All the update code for aggregation buffers should be placed in the end | ||
| // of each aggregation function code. | ||
| val updateRowBuffer = fastRowEvalsForOneFunc.zipWithIndex.map { case (ev, j) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: ident seems wrong here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh...
|
To check if the tests passed with |
|
I checked the TPCDS performance numbers (sf=5) and I couldn't find much difference with/without this pr: https://docs.google.com/spreadsheets/d/10eCV0PHeAaWGaXwpKPDDYCSLzsogl4IIiGiitqvw8PY/edit?usp=sharing To make sure this pr is still beneficial, I run microbenchmarks below; |
|
Test build #110183 has finished for PR 20965 at commit
|
This reverts commit 20b310f.
|
@viirya ok, all the tests passed with |
|
@viirya oh, sorry. I updated the link so that everyone can see it: https://docs.google.com/spreadsheets/d/10eCV0PHeAaWGaXwpKPDDYCSLzsogl4IIiGiitqvw8PY/edit?usp=sharing |
|
@maropu Thanks! :) |
|
Test build #110201 has finished for PR 20965 at commit
|
|
ok, I think all the things get fine. @cloud-fan @viirya |
|
thanks, merging to master! |
|
Thanks for all the reviewers! |
…nerated code in HashAggregateExec ### What changes were proposed in this pull request? This pr cleans up string template formats for generated code in HashAggregateExec. This changes comes from rednaxelafx comment: #20965 (comment) ### Why are the changes needed? To improve code-readability. ### Does this PR introduce any user-facing change? No ### How was this patch tested? N/A Closes #25714 from maropu/SPARK-21870-FOLLOWUP. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
## What changes were proposed in this pull request? This pr proposed to split aggregation code into small functions in `HashAggregateExec`. In apache#18810, we got performance regression if JVMs didn't compile too long functions. I checked and I found the codegen of `HashAggregateExec` frequently goes over the limit when a query has too many aggregate functions (e.g., q66 in TPCDS). The current master places all the generated aggregation code in a single function. In this pr, I modified the code to assign an individual function for each aggregate function (e.g., `SUM` and `AVG`). For example, in a query `SELECT SUM(a), AVG(a) FROM VALUES(1) t(a)`, the proposed code defines two functions for `SUM(a)` and `AVG(a)` as follows; - generated code with this pr (https://gist.github.com/maropu/812990012bc967a78364be0fa793f559): ``` /* 173 */ private void agg_doConsume_0(InternalRow inputadapter_row_0, long agg_expr_0_0, boolean agg_exprIsNull_0_0, double agg_expr_1_0, boolean agg_exprIsNull_1_0, long agg_expr_2_0, boolean agg_exprIsNull_2_0) throws java.io.IOException { /* 174 */ // do aggregate /* 175 */ // common sub-expressions /* 176 */ /* 177 */ // evaluate aggregate functions and update aggregation buffers /* 178 */ agg_doAggregate_sum_0(agg_exprIsNull_0_0, agg_expr_0_0); /* 179 */ agg_doAggregate_avg_0(agg_expr_1_0, agg_exprIsNull_1_0, agg_exprIsNull_2_0, agg_expr_2_0); /* 180 */ /* 181 */ } ... /* 071 */ private void agg_doAggregate_avg_0(double agg_expr_1_0, boolean agg_exprIsNull_1_0, boolean agg_exprIsNull_2_0, long agg_expr_2_0) throws java.io.IOException { /* 072 */ // do aggregate for avg /* 073 */ // evaluate aggregate function /* 074 */ boolean agg_isNull_19 = true; /* 075 */ double agg_value_19 = -1.0; ... /* 114 */ private void agg_doAggregate_sum_0(boolean agg_exprIsNull_0_0, long agg_expr_0_0) throws java.io.IOException { /* 115 */ // do aggregate for sum /* 116 */ // evaluate aggregate function /* 117 */ agg_agg_isNull_11_0 = true; /* 118 */ long agg_value_11 = -1L; ``` - generated code in the current master (https://gist.github.com/maropu/e9d772af2c98d8991a6a5f0af7841760) ``` /* 059 */ private void agg_doConsume_0(InternalRow localtablescan_row_0, int agg_expr_0_0) throws java.io.IOException { /* 060 */ // do aggregate /* 061 */ // common sub-expressions /* 062 */ boolean agg_isNull_4 = false; /* 063 */ long agg_value_4 = -1L; /* 064 */ if (!false) { /* 065 */ agg_value_4 = (long) agg_expr_0_0; /* 066 */ } /* 067 */ // evaluate aggregate function /* 068 */ agg_agg_isNull_7_0 = true; /* 069 */ long agg_value_7 = -1L; /* 070 */ do { /* 071 */ if (!agg_bufIsNull_0) { /* 072 */ agg_agg_isNull_7_0 = false; /* 073 */ agg_value_7 = agg_bufValue_0; /* 074 */ continue; /* 075 */ } /* 076 */ /* 077 */ boolean agg_isNull_9 = false; /* 078 */ long agg_value_9 = -1L; /* 079 */ if (!false) { /* 080 */ agg_value_9 = (long) 0; /* 081 */ } /* 082 */ if (!agg_isNull_9) { /* 083 */ agg_agg_isNull_7_0 = false; /* 084 */ agg_value_7 = agg_value_9; /* 085 */ continue; /* 086 */ } /* 087 */ /* 088 */ } while (false); /* 089 */ /* 090 */ long agg_value_6 = -1L; /* 091 */ /* 092 */ agg_value_6 = agg_value_7 + agg_value_4; /* 093 */ boolean agg_isNull_11 = true; /* 094 */ double agg_value_11 = -1.0; /* 095 */ /* 096 */ if (!agg_bufIsNull_1) { /* 097 */ agg_agg_isNull_13_0 = true; /* 098 */ double agg_value_13 = -1.0; /* 099 */ do { /* 100 */ boolean agg_isNull_14 = agg_isNull_4; /* 101 */ double agg_value_14 = -1.0; /* 102 */ if (!agg_isNull_4) { /* 103 */ agg_value_14 = (double) agg_value_4; /* 104 */ } /* 105 */ if (!agg_isNull_14) { /* 106 */ agg_agg_isNull_13_0 = false; /* 107 */ agg_value_13 = agg_value_14; /* 108 */ continue; /* 109 */ } /* 110 */ /* 111 */ boolean agg_isNull_15 = false; /* 112 */ double agg_value_15 = -1.0; /* 113 */ if (!false) { /* 114 */ agg_value_15 = (double) 0; /* 115 */ } /* 116 */ if (!agg_isNull_15) { /* 117 */ agg_agg_isNull_13_0 = false; /* 118 */ agg_value_13 = agg_value_15; /* 119 */ continue; /* 120 */ } /* 121 */ /* 122 */ } while (false); /* 123 */ /* 124 */ agg_isNull_11 = false; // resultCode could change nullability. /* 125 */ /* 126 */ agg_value_11 = agg_bufValue_1 + agg_value_13; /* 127 */ /* 128 */ } /* 129 */ boolean agg_isNull_17 = false; /* 130 */ long agg_value_17 = -1L; /* 131 */ if (!false && agg_isNull_4) { /* 132 */ agg_isNull_17 = agg_bufIsNull_2; /* 133 */ agg_value_17 = agg_bufValue_2; /* 134 */ } else { /* 135 */ boolean agg_isNull_20 = true; /* 136 */ long agg_value_20 = -1L; /* 137 */ /* 138 */ if (!agg_bufIsNull_2) { /* 139 */ agg_isNull_20 = false; // resultCode could change nullability. /* 140 */ /* 141 */ agg_value_20 = agg_bufValue_2 + 1L; /* 142 */ /* 143 */ } /* 144 */ agg_isNull_17 = agg_isNull_20; /* 145 */ agg_value_17 = agg_value_20; /* 146 */ } /* 147 */ // update aggregation buffer /* 148 */ agg_bufIsNull_0 = false; /* 149 */ agg_bufValue_0 = agg_value_6; /* 150 */ /* 151 */ agg_bufIsNull_1 = agg_isNull_11; /* 152 */ agg_bufValue_1 = agg_value_11; /* 153 */ /* 154 */ agg_bufIsNull_2 = agg_isNull_17; /* 155 */ agg_bufValue_2 = agg_value_17; /* 156 */ /* 157 */ } ``` You can check the previous discussion in apache#19082 ## How was this patch tested? Existing tests Closes apache#20965 from maropu/SPARK-21870-2. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…nerated code in HashAggregateExec ### What changes were proposed in this pull request? This pr cleans up string template formats for generated code in HashAggregateExec. This changes comes from rednaxelafx comment: apache#20965 (comment) ### Why are the changes needed? To improve code-readability. ### Does this PR introduce any user-facing change? No ### How was this patch tested? N/A Closes apache#25714 from maropu/SPARK-21870-FOLLOWUP. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
What changes were proposed in this pull request?
This pr proposed to split aggregation code into small functions in
HashAggregateExec. In #18810, we got performance regression if JVMs didn't compile too long functions. I checked and I found the codegen ofHashAggregateExecfrequently goes over the limit when a query has too many aggregate functions (e.g., q66 in TPCDS).The current master places all the generated aggregation code in a single function. In this pr, I modified the code to assign an individual function for each aggregate function (e.g.,
SUMand
AVG). For example, in a querySELECT SUM(a), AVG(a) FROM VALUES(1) t(a), the proposed code defines two functionsfor
SUM(a)andAVG(a)as follows;You can check the previous discussion in #19082
How was this patch tested?
Existing tests