-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-35688][SQL]Subexpressions should be lazy evaluation in GeneratePredicate #32977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
The example is based on the order of predicates. But I remember the order is not guaranteed. Spark can internally reorder the predicates. That's said we should not assume |
It also throws an exception if we use |
Then it's definitely a bug we need to fix, as IF should guarantee the execution order. When did we introduce this bug? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change it to If:
val df = spark.read.load(dir.getCanonicalPath)
.filter("if(size(c1) > 5, element_at(c1, 7) = 8 or element_at(c1, 7) = 7, false)")Seems the optimizer will optimize the If condition to AND.
== Analyzed Logical Plan ==
c1: array<int>
Filter if ((size(c1#19104, false) > 5)) ((element_at(c1#19104, 7, true) = 8) OR (element_at(c1#19104, 7, true) = 7)) else false
+- Relation [c1#19104] parquet
== Optimized Logical Plan ==
Filter ((size(c1#19104, false) > 5) AND ((element_at(c1#19104, 7, true) = 8) OR (element_at(c1#19104, 7, true) = 7)))
+- Relation [c1#19104] parquet
That's why If doesn't work to make element_at conditional execution here.
|
BTW, the optimization rule is Once I remove this optimization, the added test can pass without other change. |
|
The exception happens not only in the For instance: CaseWhen spark.read.load("/tmp/parquet/t1")
.filter(
"""
|case
| when element_at(c1, 1) = 1 then 1
| when element_at(c1, 7) = 7 then 7
| when element_at(c1, 7) = 8 then 8
| else 10
|end = 7
|or
|case
| when element_at(c1, 1) = 1 then 1
| when element_at(c1, 7) = 7 then 7
| when element_at(c1, 7) = 8 then 8
| else 10
|end = 8
|""".stripMargin)Coalesce spark.read.load("/tmp/parquet/t1")
.filter(
"""
|coalesce(element_at(c1, 1) > 0, element_at(c1, 7) > 0, element_at(c1, 7) + 1 > 0) == true
|or
|coalesce(element_at(c1, 1) > 1, element_at(c1, 7) > 1, element_at(c1, 7) + 1 > 1) == true
|""".stripMargin
)In both tow case, I'm not sure, whether spark should guarantee the order of predicates for conditional expression(If/CaseWhen/Coalesce). |
|
This is generate code. Source code spark.read.load("/tmp/parquet/t1")
.filter(
"""
|case
| when element_at(c1, 1) = 1 then 1
| when element_at(c1, 7) = 7 then 7
| when element_at(c1, 7) = 8 then 8
| else 10
|end = 7
|or
|case
| when element_at(c1, 1) = 1 then 1
| when element_at(c1, 7) = 7 then 7
| when element_at(c1, 7) = 8 then 8
| else 10
|end = 8
|""".stripMargin)Generated predicate '(CASE WHEN (element_at(input[0, array<int>, true], 1, true) = 1) THEN false WHEN (element_at(input[0, array<int>, true], 7, true) = 7) THEN true WHEN (element_at(input[0, array<int>, true], 7, true) = 8) THEN false ELSE false END OR CASE WHEN (element_at(input[0, array<int>, true], 1, true) = 1) THEN false WHEN (element_at(input[0, array<int>, true], 7, true) = 7) THEN false WHEN (element_at(input[0, array<int>, true], 7, true) = 8) THEN true ELSE false END)':
/* 001 */ public SpecificPredicate generate(Object[] references) {
/* 002 */ return new SpecificPredicate(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificPredicate extends org.apache.spark.sql.catalyst.expressions.BasePredicate {
/* 006 */ private final Object[] references;
/* 007 */ private boolean subExprIsNull_0;
/* 008 */ private int subExprValue_0;
/* 009 */ private boolean subExprIsNull_1;
/* 010 */ private boolean subExprValue_1;
/* 011 */ private boolean globalIsNull_0;
/* 012 */ private boolean value_10_0;
/* 013 */ private boolean globalIsNull_1;
/* 014 */ private boolean value_20_0;
/* 015 */ private boolean globalIsNull_2;
/* 016 */
/* 017 */ public SpecificPredicate(Object[] references) {
/* 018 */ this.references = references;
/* 019 */
/* 020 */
/* 021 */ }
/* 022 */
/* 023 */ public void initialize(int partitionIndex) {
/* 024 */
/* 025 */ }
/* 026 */
/* 027 */ public boolean eval(InternalRow i) {
/* 028 */ subExpr_0(i);subExpr_1(i);
/* 029 */ boolean value_19 = CaseWhen_0(i);
/* 030 */ boolean value_9 = true;
/* 031 */
/* 032 */ if (!value_19) {
/* 033 */ boolean value_29 = CaseWhen_1(i);
/* 034 */ value_9 = value_29;
/* 035 */ }
/* 036 */ return !false && value_9;
/* 037 */ }
/* 038 */
/* 039 */
/* 040 */ private boolean CaseWhen_0(InternalRow i) {
/* 041 */ byte caseWhenResultState_0 = -1;
/* 042 */ do {
/* 043 */
/* 044 */
/* 045 */ if (!subExprIsNull_1 && subExprValue_1) {
/* 046 */
/* 047 */ caseWhenResultState_0 = (byte)(false ? 1 : 0);
/* 048 */ value_10_0 = false;
/* 049 */ continue;
/* 050 */ }
/* 051 */
/* 052 */ boolean isNull_11 = true;
/* 053 */ boolean value_12 = false;
/* 054 */
/* 055 */ if (!subExprIsNull_0) {
/* 056 */
/* 057 */
/* 058 */ isNull_11 = false; // resultCode could change nullability.
/* 059 */ value_12 = subExprValue_0 == 7;
/* 060 */
/* 061 */ }
/* 062 */ if (!isNull_11 && value_12) {
/* 063 */
/* 064 */ caseWhenResultState_0 = (byte)(false ? 1 : 0);
/* 065 */ value_10_0 = true;
/* 066 */ continue;
/* 067 */ }
/* 068 */
/* 069 */ boolean isNull_14 = true;
/* 070 */ boolean value_15 = false;
/* 071 */
/* 072 */ if (!subExprIsNull_0) {
/* 073 */
/* 074 */
/* 075 */ isNull_14 = false; // resultCode could change nullability.
/* 076 */ value_15 = subExprValue_0 == 8;
/* 077 */
/* 078 */ }
/* 079 */ if (!isNull_14 && value_15) {
/* 080 */
/* 081 */ caseWhenResultState_0 = (byte)(false ? 1 : 0);
/* 082 */ value_10_0 = false;
/* 083 */ continue;
/* 084 */ }
/* 085 */
/* 086 */
/* 087 */ caseWhenResultState_0 = (byte)(false ? 1 : 0);
/* 088 */ value_10_0 = false;
/* 089 */
/* 090 */ } while (false);
/* 091 */ // TRUE if any condition is met and the result is null, or no any condition is met.
/* 092 */ final boolean isNull_9 = (caseWhenResultState_0 != 0);
/* 093 */ globalIsNull_1 = isNull_9;
/* 094 */ return value_10_0;
/* 095 */ }
/* 096 */
/* 097 */
/* 098 */ private void subExpr_1(InternalRow i) {
/* 099 */ boolean value_8 = EqualTo_0(i);
/* 100 */ subExprIsNull_1 = globalIsNull_0;
/* 101 */ subExprValue_1 = value_8;
/* 102 */ }
/* 103 */
/* 104 */
/* 105 */ private void subExpr_0(InternalRow i) {
/* 106 */ boolean isNull_0 = true;
/* 107 */ int value_0 = -1;
/* 108 */ boolean isNull_1 = i.isNullAt(0);
/* 109 */ ArrayData value_1 = isNull_1 ?
/* 110 */ null : (i.getArray(0));
/* 111 */ if (!isNull_1) {
/* 112 */
/* 113 */
/* 114 */ isNull_0 = false; // resultCode could change nullability.
/* 115 */
/* 116 */ int elementAtIndex_0 = (int) 7;
/* 117 */ if (value_1.numElements() < Math.abs(elementAtIndex_0)) {
/* 118 */ throw QueryExecutionErrors.invalidArrayIndexError(elementAtIndex_0, value_1.numElements());
/* 119 */ } else {
/* 120 */ if (elementAtIndex_0 == 0) {
/* 121 */ throw QueryExecutionErrors.sqlArrayIndexNotStartAtOneError();
/* 122 */ } else if (elementAtIndex_0 > 0) {
/* 123 */ elementAtIndex_0--;
/* 124 */ } else {
/* 125 */ elementAtIndex_0 += value_1.numElements();
/* 126 */ }
/* 127 */
/* 128 */ if (value_1.isNullAt(elementAtIndex_0)) {
/* 129 */ isNull_0 = true;
/* 130 */ } else
/* 131 */
/* 132 */ {
/* 133 */ value_0 = value_1.getInt(elementAtIndex_0);
/* 134 */ }
/* 135 */ }
/* 136 */
/* 137 */
/* 138 */ }
/* 139 */ subExprIsNull_0 = isNull_0;
/* 140 */ subExprValue_0 = value_0;
/* 141 */ }
/* 142 */
/* 143 */
/* 144 */ private boolean EqualTo_0(InternalRow i) {
/* 145 */ boolean isNull_3 = true;
/* 146 */ boolean value_3 = false;
/* 147 */ boolean isNull_4 = true;
/* 148 */ int value_4 = -1;
/* 149 */ boolean isNull_5 = i.isNullAt(0);
/* 150 */ ArrayData value_5 = isNull_5 ?
/* 151 */ null : (i.getArray(0));
/* 152 */ if (!isNull_5) {
/* 153 */
/* 154 */
/* 155 */ isNull_4 = false; // resultCode could change nullability.
/* 156 */
/* 157 */ int elementAtIndex_1 = (int) 1;
/* 158 */ if (value_5.numElements() < Math.abs(elementAtIndex_1)) {
/* 159 */ throw QueryExecutionErrors.invalidArrayIndexError(elementAtIndex_1, value_5.numElements());
/* 160 */ } else {
/* 161 */ if (elementAtIndex_1 == 0) {
/* 162 */ throw QueryExecutionErrors.sqlArrayIndexNotStartAtOneError();
/* 163 */ } else if (elementAtIndex_1 > 0) {
/* 164 */ elementAtIndex_1--;
/* 165 */ } else {
/* 166 */ elementAtIndex_1 += value_5.numElements();
/* 167 */ }
/* 168 */
/* 169 */ if (value_5.isNullAt(elementAtIndex_1)) {
/* 170 */ isNull_4 = true;
/* 171 */ } else
/* 172 */
/* 173 */ {
/* 174 */ value_4 = value_5.getInt(elementAtIndex_1);
/* 175 */ }
/* 176 */ }
/* 177 */
/* 178 */
/* 179 */ }
/* 180 */ if (!isNull_4) {
/* 181 */
/* 182 */
/* 183 */ isNull_3 = false; // resultCode could change nullability.
/* 184 */ value_3 = value_4 == 1;
/* 185 */
/* 186 */ }
/* 187 */ globalIsNull_0 = isNull_3;
/* 188 */ return value_3;
/* 189 */ }
/* 190 */
/* 191 */
/* 192 */ private boolean CaseWhen_1(InternalRow i) {
/* 193 */ byte caseWhenResultState_1 = -1;
/* 194 */ do {
/* 195 */
/* 196 */
/* 197 */ if (!subExprIsNull_1 && subExprValue_1) {
/* 198 */
/* 199 */ caseWhenResultState_1 = (byte)(false ? 1 : 0);
/* 200 */ value_20_0 = false;
/* 201 */ continue;
/* 202 */ }
/* 203 */
/* 204 */ boolean isNull_20 = true;
/* 205 */ boolean value_22 = false;
/* 206 */
/* 207 */ if (!subExprIsNull_0) {
/* 208 */
/* 209 */
/* 210 */ isNull_20 = false; // resultCode could change nullability.
/* 211 */ value_22 = subExprValue_0 == 7;
/* 212 */
/* 213 */ }
/* 214 */ if (!isNull_20 && value_22) {
/* 215 */
/* 216 */ caseWhenResultState_1 = (byte)(false ? 1 : 0);
/* 217 */ value_20_0 = false;
/* 218 */ continue;
/* 219 */ }
/* 220 */
/* 221 */ boolean isNull_23 = true;
/* 222 */ boolean value_25 = false;
/* 223 */
/* 224 */ if (!subExprIsNull_0) {
/* 225 */
/* 226 */
/* 227 */ isNull_23 = false; // resultCode could change nullability.
/* 228 */ value_25 = subExprValue_0 == 8;
/* 229 */
/* 230 */ }
/* 231 */ if (!isNull_23 && value_25) {
/* 232 */
/* 233 */ caseWhenResultState_1 = (byte)(false ? 1 : 0);
/* 234 */ value_20_0 = true;
/* 235 */ continue;
/* 236 */ }
/* 237 */
/* 238 */
/* 239 */ caseWhenResultState_1 = (byte)(false ? 1 : 0);
/* 240 */ value_20_0 = false;
/* 241 */
/* 242 */ } while (false);
/* 243 */ // TRUE if any condition is met and the result is null, or no any condition is met.
/* 244 */ final boolean isNull_18 = (caseWhenResultState_1 != 0);
/* 245 */ globalIsNull_2 = isNull_18;
/* 246 */ return value_20_0;
/* 247 */ }
/* 248 */
/* 249 */ }line 028 eager call function |
|
Hmm, I tried with CaseWhen but it doesn't fail. Do you run on current master? |
Yes, you need set properties. sql("set spark.sql.ansi.enabled = true")
sql("set spark.sql.codegen.wholeStage = false") |
|
Oh, I see. Don't repeat the following as it will be considered a subexpression too. case
when element_at(c1, 1) = 1 then 1
when element_at(c1, 7) = 7 then 7
when element_at(c1, 7) = 8 then 8
else 10
end = 7If changing the second case when to the following, it will fail. case
when element_at(c1, 1) = 1 then 1
when element_at(c1, 7) = 7 then 7
when element_at(c1, 7) = 8 then 8
else 11
end = 7 |
|
But does the predicate makes sense? case
when element_at(c1, 1) = 1 then 1
when element_at(c1, 7) = 7 then 7
when element_at(c1, 7) = 8 then 8
else 10
end = 7Technically, |
|
Just an example to illustrate that the subexpression may throw an exception due to eagerly execute in a conditional predicate, especially the subexpression that contains UDFs. It's fine for me to keep the current implementation (except |
FWIW this is a super edge-case'y bug with subexpression elimination that I attempt to fix as part of #32987 to properly handle conditional expressions. In general subexpressions should only be created for expressions that 100% will be evaluated, so the eager execution doesn't matter. If that's not the case, then that could be a bug. For the |
Based on what proposed in #32987, isn't it to consider a expression conditionally run twice as subexpression? Why it it a fix for this? Don't you think 100% will be evaluated conflict with conditionally evaluated twice? Besides |
It's not to consider an expression conditionally run twice as a subexpression, it's to consider an expression definitely run once and conditionally run once as a subexpression, so still will only run if it will always run at least once |
|
So you are saying |
|
IIUC, SQL definition does not guarantee the evaluation order of case-when. In practice SQL system may apply short-circuit evaluation so there will be an evaluation order. |
I'm saying |
Well, I think it depends on how we evaluate case-when. Personally I prefer not to rely on short-circuit evaluation to guard an expression. If it is not defined in SQL definition, it is not reliable. For example, if we are going to evaluate all conditions in parallel? Let's wait for more other voices. If there is consensus that we should follow the short-circuit evaluation, then we should remove the subexpression. |
I can totally agree with you. but in another way, lazy evaluation may improve performance when |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I looked through the change here. I got what is proposed as so-called lazy evaluation.
It is basically to wrap subexpression value/isNull variables as two function calls.
In each function call, it checks a flag indicating if the subexpression is evaluated or not. If not, it evaluates it, record and the value. If yes, just return the value.
I think this has a few issues. First, not sure if taking function call as variable name might cause potential issue. We may not assign subexpression value/isNull again, but by definition isNull and value are terms holding values. Mixing variable terms and function calls may be a potential issue in the future.
Second, it adds more functions and more function calls. The number of method is not unlimited.
I think we should deal with the root cause there. Is such subexpression reasonable for the conditional expressions such as CaseWhen? If we respect short-circuit evaluation practice, then we should get rid of such subexpression, making higher bar for subexpression in conditional expressions.
kindly ping @cloud-fan @maropu |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
is this still an issue? |
What changes were proposed in this pull request?
Currently the subexpression elimination in GeneraterPredicate is eager execution, add lazy evaluation mode for subexpression elimination. Fix error when the subexression elimination is inside a condition branch and the subexression depends on the other conditions.
For instance:
In this case,
element_atdepends on conditionsize(c1) > 5, before this pr, an exception will throw when we disable codegen.As the lazy evaluation is expensive (we need to extract a variable and check the subexpr is initialized or not before we use the subexpr), we keep subexpression eager execution in
GenerateUnsafeProjection/GenerateMutableProjectionWhy are the changes needed?
Fix bug when subexpression elimination enabled.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Exsiting test and new test.