-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29968][SQL] Remove the Predicate code from SparkPlan #26604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan @viirya How about this refactoring? |
|
Test build #114125 has finished for PR 26604 at commit
|
fcc9d22 to
4b1e311
Compare
|
IIUC |
|
Hi, @maropu . So, are you going to make a few PR for them one by one? |
|
|
||
| /** | ||
| * Returns an MutableProjection for given sequence of Expressions, which will be bound to | ||
| * Returns a MutableProjection for given sequence of Expressions, which will be bound to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not touch this file in this PR~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You meant a separate PR for this typo?
| /** | ||
| * Interface for generated/interpreted predicate | ||
| */ | ||
| abstract class BasePredicate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems reasonable because we renamed Predicate => BasePredicate before.
|
|
||
|
|
||
| /** | ||
| * Interface for generated/interpreted predicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not wrong and this comes from the old comment. Shall we use A base class instead of Interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| str | ||
| } | ||
| logWarning(s"Codegen disabled for this expression:\n $logMessage") | ||
| InterpretedPredicate.create(expression, inputSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't follow the context in the previous PR. Is this genInterpretedPredicate function unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, CodeGeneratorWithInterpretedFallback#createInterpretedObject does the same thing with genInterpretedPredicate :
Line 63 in 5a70af7
| protected def createInterpretedObject(in: IN): OUT |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
Show resolved
Hide resolved
|
Thanks for the comment, @dongjoon-hyun. I just wanted to check if this refactoring is right first, and all the tests passed for this fix because this change might increase the test coverage for interpreted predicates (e.g., in |
|
|
||
| val boundPredicate = | ||
| InterpretedPredicate.create(predicates.reduce(And).transform { | ||
| Predicate.create(predicates.reduce(And).transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to use the interpreted version here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sometimes it's better to use the interpreted version if the input data is known to be small. Codegen can be slower as compiling takes time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok. I'll revert this.
This reverts commit 73588fc.
| case Filter(condition, LocalRelation(output, data, isStreaming)) | ||
| if !hasUnevaluableExpr(condition) => | ||
| val predicate = InterpretedPredicate.create(condition, output) | ||
| val predicate = Predicate.create(condition, output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to optimize local relation so perf doesn't matter too much. The change should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| val predicate = partitionPruningPredicates.reduce(expressions.And) | ||
|
|
||
| val boundPredicate = InterpretedPredicate.create(predicate.transform { | ||
| val boundPredicate = Predicate.create(predicate.transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be safe, I think we should keep using the interpreted version, in case there are only a few partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| def create(expr: Expression): BasePredicate = { | ||
| create(expr) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a method def createInterpreted... for places that want to use interpreted predicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea. ok.
37f470e to
d10b004
Compare
d10b004 to
0896afe
Compare
|
|
||
| val boundPredicate = | ||
| InterpretedPredicate.create(predicates.reduce(And).transform { | ||
| Predicate.createInterpretedPredicate(predicates.reduce(And).transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double predicate looks weird, how about just Predicate.createInterpreted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
| expressions.GreaterThan(attribute, literal) | ||
| }.reduceOption(expressions.And).getOrElse(Literal(true)) | ||
| InterpretedPredicate.create(filterCondition, inputAttributes) | ||
| Predicate.createInterpretedPredicate(filterCondition, inputAttributes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a testing source, we don't care it's codegen or not. Should be fine to call Predicate.create
|
Test build #114127 has finished for PR 26604 at commit
|
| InterpretedPredicate(in) | ||
| } | ||
|
|
||
| def createInterpreted(e: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method seems not used.
|
Test build #114140 has finished for PR 26604 at commit
|
|
Test build #114135 has finished for PR 26604 at commit
|
|
Test build #114141 has finished for PR 26604 at commit
|
|
Test build #114129 has finished for PR 26604 at commit
|
|
Test build #114132 has finished for PR 26604 at commit
|
|
retest this please |
|
Test build #114142 has finished for PR 26604 at commit
|
|
thanks, merging to master! |
|
@maropu can you create JIRA tickets to do the same thing for newMutableProjection, newOrdering and newNaturalAscendingOrdering? |
|
Yea, sure. Better to fix them in a single next PR? |
|
sounds good |
|
ok |
|
@maropu Thanks for your change . This PR makes my work easier. |
newPredicate has been removed in Spark 3.0 apache/spark#26604
newPredicate has been removed in Spark 3.0 apache/spark#26604
What changes were proposed in this pull request?
This is to refactor Predicate code; it mainly removed
newPredicatefromSparkPlan.Modifications are listed below;
Predicatefromo.a.s.sqlcatalyst.expressions.codegen.GeneratePredicate.scalatoo.a.s.sqlcatalyst.expressions.predicates.scalao.a.s.sqlcatalyst.expressions.codegen.Predicatetoo.a.s.sqlcatalyst.expressions.BasePredicateCodeGeneratorWithInterpretedFallbackforBasePredicateThis comes from the @cloud-fan suggestion: #26420 (comment)
Why are the changes needed?
For better code/test coverage.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.