-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27986][SQL] Support ANSI SQL filter predicate for aggregate expression. #26420
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @gatorsmile |
|
Test build #113365 has finished for PR 26420 at commit
|
|
Retest this please. |
|
Test build #113370 has finished for PR 26420 at commit
|
| !aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[ImperativeAggregate]) | ||
| // ImperativeAggregate and filter predicate are not supported right now | ||
| !(aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[ImperativeAggregate]) || | ||
| aggregateExpressions.exists(_.filter.isDefined)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot support this in the codegen mode? Technically hard? If we support this filter expr in agg, I personally think we'd be better to support in the codegen mode first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your review. I think this is a bigger change and I want create two PR to make things simple.
Another reason is I took a lot of time in other jobs.
|
I just looked over the current implementation though, I have one question; have you checked that we couldn't just transform this filter exprs into the Spark existing exprs/operators instead of the current approach (hard-coded in each Agg operators)? |
docs/sql-keywords.md
Outdated
| <tr><td>FALSE</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr> | ||
| <tr><td>FETCH</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr> | ||
| <tr><td>FIELDS</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr> | ||
| <tr><td>FILTER</td><td>reserved</td><td>non-reserved</td><td>non-reserved</td></tr> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FILTER is reserved in SQL-2011. Also, could you please update TableIdentifierParserSuite, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu Thanks for your remind. I will add it.
| } | ||
| } | ||
| case u @ UnresolvedFunction(funcId, children, isDistinct) => | ||
| case u @ UnresolvedFunction(funcId, children, isDistinct, filter) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you throw an analysis exception if filter given in non-aggregation functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maropu Thanks for your remind. I will add it.
|
This is a nice feature! I'd like to know how it's implemented. Seems like we can't transform it into another logical form that we support, and we need to adjust our backend engine. |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
Outdated
Show resolved
Hide resolved
|
I just thought a super simple case like this; The query above might be transformed into... |
|
@maropu looks like a good idea. But we need to make sure the aggregate function ignore nulls. may not work for |
|
Ur, I see.... nice suggestion. I need more time to think about how to implement. |
|
@maropu |
|
I meant we might not need to modify the physical plans for aggregates (e.g., HashAggregateExec). Instead, in the analyzer phase, we might be able to transform filter expressions into projections as shown above (Aggregate with Filter => Project + Aggregate); |
|
@maropu This is a good idea. As @cloud-fan said how to treat |
|
I'd like to propose a solution for the codegen part that'll augment this PR. The overall direction this PR is taking sounds good to me, although I haven't reviewed the full details yet (would like to do that some time this week). I'll prepare a separate PR for demo purposes to show how it'll augment the codegen part. It's actually fairly easy and could also serve as a bit of code clean up for a lot of the declarative aggregate functions. The tl;dr is that I'd like to have explicit support for the user-specified filter clause in the infrastructure, instead of solely relying on a rewrite. |
|
Test build #113566 has finished for PR 26420 at commit
|
|
Test build #113584 has finished for PR 26420 at commit
|
I think so. |
| result | ||
| case UnresolvedExtractValue(child, fieldExpr) if child.resolved => | ||
| ExtractValue(child, fieldExpr, resolver) | ||
| case f @ UnresolvedFunction(_, children, _, filter) if filter.isDefined => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like this?
case f @ UnresolvedFunction(_, _, _, Some(filter)) =>
val newFilter = filter.mapChildren(resolveExpressionTopDown(_, q))
val newChildren = children.map(resolveExpressionTopDown(_, q))
f.copy(children = newChildren, filter = Some(newFilter))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I will try like this.
| override lazy val references: AttributeSet = { | ||
| mode match { | ||
| case Partial | Complete => aggregateFunction.references | ||
| case Partial | Complete if filter == None => aggregateFunction.references |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you said right.
| case (ae: DeclarativeAggregate, expression) => | ||
| expression.mode match { | ||
| val filterExpressions = expressions.map(_.filter) | ||
| val notExistsFilter = !filterExpressions.exists(_ != None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notExistsFilter is predicates.isEmpty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion!
| expression.mode match { | ||
| val filterExpressions = expressions.map(_.filter) | ||
| val notExistsFilter = !filterExpressions.exists(_ != None) | ||
| var isFinalOrMerge = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use val for isFinalOrMerge like this?
val isFinalOrMerge = functions.exists(....)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isFinalOrMerge is related to expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to check if they have PartialMerge or Final;
val isFinalOrMerge = expressions.map(_.mode)
.collect { case PartialMerge | Final => true }.nonEmpty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, plz move this variable to line 223.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
| (currentBuffer: InternalRow, row: InternalRow) => { | ||
| // Process all expression-based aggregate functions. | ||
| updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row)) | ||
| if (notExistsFilter || isFinalOrMerge) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
like this?
if (notExistsFilter || isFinalOrMerge) {
(currentBuffer: InternalRow, row: InternalRow) => {...}
} else {
(currentBuffer: InternalRow, row: InternalRow) => {...}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea!
| case Partial | Complete => | ||
| ae.filter.foreach { filterExpr => | ||
| val filterAttrs = filterExpr.references.toSeq | ||
| val predicate = newPredicate(filterExpr, child.output ++ filterAttrs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
genInterpretedPredicate instead of newPredicate?
btw, in the interpreter mode doExecute() of FileterExec, it seems we currently use generated code from newPredicate for evaluating predicates. Any reason that we cannot turn off codegen there via CODEGEN_FACTORY_MODE? @viirya @cloud-fan
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
Line 230 in e46e487
| val predicate = newPredicate(condition, child.output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The background of adding CODEGEN_FACTORY_MODE is to have a config for test only. It is easier for us to test interpreted, codegen paths separately.
For non test, I think we always go codegen first and fallback to interpreted if codegen fails.
| override def supportCodegen: Boolean = { | ||
| // ImperativeAggregate is not supported right now | ||
| !aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[ImperativeAggregate]) | ||
| // ImperativeAggregate and filter predicate are not supported right now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you file a new jira for the codegen support of filters in aggregates? Then, put a JIRA ID here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem! But let us wait for @rednaxelafx
| // so return an empty kvIterator. | ||
| Iterator.empty | ||
| } else { | ||
| val filterPredicates = new HashMap[Int, GenPredicate] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: mutable.HashMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
| aggregateAttributes: Seq[Attribute], | ||
| initialInputBufferOffset: Int, | ||
| resultExpressions: Seq[NamedExpression], | ||
| predicates: HashMap[Int, GenPredicate], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map instead of HashMap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
| Iterator.empty | ||
| } else { | ||
| val filterPredicates = new mutable.HashMap[Int, GenPredicate] | ||
| aggregateExpressions.zipWithIndex.foreach{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the format foreach{ => foreach {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
|
I did quick reviews and left some comments, so could you check my comments above? I personally think we need to consider more about the exact BNF grammar for aggregate filters that we will support in this pr. |
| ExtractValue(child, fieldExpr, resolver) | ||
| case f @ UnresolvedFunction(_, children, _, filter) if filter.isDefined => | ||
| val newChildren = children.map(resolveExpressionTopDown(_, q)) | ||
| val newFilter = filter.map{ expr => expr.mapChildren(resolveExpressionTopDown(_, q))} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: format .map{ => .map {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the above modifications, this problem does not exist.
|
Test build #113872 has finished for PR 26420 at commit
|
|
Test build #113874 has finished for PR 26420 at commit
|
|
Test build #113877 has finished for PR 26420 at commit
|
|
Test build #113878 has finished for PR 26420 at commit
|
|
@maropu Filter predicate has supported sub query . |
|
Test build #114147 has finished for PR 26420 at commit
|
### What changes were proposed in this pull request? This is to refactor Predicate code; it mainly removed `newPredicate` from `SparkPlan`. Modifications are listed below; - Move `Predicate` from `o.a.s.sqlcatalyst.expressions.codegen.GeneratePredicate.scala` to `o.a.s.sqlcatalyst.expressions.predicates.scala` - To resolve the name conflict, rename `o.a.s.sqlcatalyst.expressions.codegen.Predicate` to `o.a.s.sqlcatalyst.expressions.BasePredicate` - Extend `CodeGeneratorWithInterpretedFallback ` for `BasePredicate` This comes from the cloud-fan suggestion: #26420 (comment) ### Why are the changes needed? For better code/test coverage. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing tests. Closes #26604 from maropu/RefactorPredicate. Authored-by: Takeshi Yamamuro <yamamuro@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
Test build #114161 has finished for PR 26420 at commit
|
|
Test build #114188 has finished for PR 26420 at commit
|
|
Test build #114200 has finished for PR 26420 at commit
|
|
Retest this please |
|
Test build #114220 has finished for PR 26420 at commit
|
|
@cloud-fan @maropu Could you continue to review this PR? |
| name: FunctionIdentifier, | ||
| children: Seq[Expression], | ||
| isDistinct: Boolean) | ||
| inputs: Seq[Expression], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: input -> arguments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| assert (aggregateExpressions.get.size == 1) | ||
| checkAnswer(df, Row(1, 3, 4) :: Row(2, 3, 4) :: Row(3, 3, 4) :: Nil) | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need more exhaustive tests for supporting group filter. Could you add tests in SQLQueryTestSuite, e.g., input/group-by-filter.sql`?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I will added this in new PR.
| case Partial | Complete => | ||
| case Partial | Complete if filterExpressions(i).isDefined => | ||
| (buffer: InternalRow, row: InternalRow) => | ||
| if (predicates(i).eval(row)) { ae.update(buffer, row) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you use the two variables predicates and filterExpressions for filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I will use predicates only.
inputs --> arguments
inputs --> arguments
| if (predicates.isEmpty || isFinalOrMerge) { | ||
| (currentBuffer: InternalRow, row: InternalRow) => { | ||
| updateProjection.target(currentBuffer)(joinedRow(currentBuffer, row)) | ||
| processImperative(currentBuffer, row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit worrid that this cloure can cause some performance overhead when processing regular non-filter aggregate functions. cc: @cloud-fan
| expression.mode match { | ||
| val filterExpressions = expressions.map(_.filter) | ||
| var isFinalOrMerge = false | ||
| val mergeExpressions = functions.zipWithIndex.collect { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you change functions.zip(expressions).flatMap to functions.zipWithIndex.collect here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 248 and 250 will use the index,so I make this change
| case Partial | Complete if filterExpressions(i).isDefined => | ||
| (buffer: InternalRow, row: InternalRow) => | ||
| if (predicates(i).eval(row)) { ae.update(buffer, row) } | ||
| case Partial | Complete if filterExpressions(i).isEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: like this?
case Partial | Complete =>
if (predicateOptions(i).isDefined) {
(buffer: InternalRow, row: InternalRow) =>
if (predicateOptions(i).get.eval(row)) { ae.update(buffer, row) }
} else {
(buffer: InternalRow, row: InternalRow) => ae.update(buffer, row)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have improved in another way
| filterPredicates | ||
| } | ||
|
|
||
| protected val predicates: mutable.Map[Int, BasePredicate] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this variable outside generateProcessRow, so can you move this variable inside it like this?
// Initializing functions used to process a row.
protected def generateProcessRow(
expressions: Seq[AggregateExpression],
functions: Seq[AggregateFunction],
inputAttributes: Seq[Attribute]): (InternalRow, InternalRow) => Unit = {
val joinedRow = new JoinedRow
if (expressions.nonEmpty) {
// Initialize predicates for aggregate functions if necessary
val predicateOptions = expressions.map {
case AggregateExpression(_, mode, _, Some(filter), _) =>
mode match {
case Partial | Complete =>
val filterAttrs = filter.references.toSeq
val predicate = Predicate.create(filter, inputAttributes ++ filterAttrs)
predicate.initialize(partIndex)
Some(predicate)
case _ =>
None
}
case _ =>
None
}
....
| // 2-3. Filter the data row using filter predicate filterC. If the filter predicate | ||
| // filterC is met, then calculate using aggregate expression exprC. | ||
| (currentBuffer: InternalRow, row: InternalRow) => { | ||
| val dynamicMergeExpressions = new mutable.ArrayBuffer[Expression] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move the predicate process for expression-based agg functions outside this row-by-row loop? The current code can cause overkilling overhead when processing rows....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need some time to think about it.
|
Test build #114314 has finished for PR 26420 at commit
|
|
Test build #114313 has finished for PR 26420 at commit
|
|
Test build #114315 has finished for PR 26420 at commit
|
| checkAnswer(df, Row(1, 3, 4) :: Row(2, 3, 4) :: Row(3, 3, 4) :: Nil) | ||
| } | ||
|
|
||
| test("SPARK-27986: support filter clause for aggregate function with hash") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need the prefix now: apache/spark-website#231
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I will remove the prefix in new PR.
|
Can you check the BNF grammar for |
|
Test build #114312 has finished for PR 26420 at commit
|
|
@beliefer Why did you close this? |
What changes were proposed in this pull request?
The filter predicate for aggregate expression is an
ANSI SQL.There are some mainstream database support this syntax.
PostgreSQL:
https://www.postgresql.org/docs/current/sql-expressions.html#SYNTAX-AGGREGATES
For example:
jOOQ:
https://blog.jooq.org/2014/12/30/the-awesome-postgresql-9-4-sql2003-filter-clause-for-aggregate-functions/
Notice:
This PR only support filter predicate without codegen. I will create another PR to support codegen.
There are some show of the PR on my production environment.
Why are the changes needed?
Add new SQL feature.
Does this PR introduce any user-facing change?
'No'.
How was this patch tested?
Exists UT and new UT.