Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ abstract class Expression extends TreeNode[Expression] {
*/
def stateful: Boolean = false

/**
* Returns true if the expression could potentially throw an exception when evaluated.
*/
lazy val throwable: Boolean = children.exists(_.throwable)

/**
* Returns a copy of this expression where all stateful expressions are replaced with fresh
* uninitialized copies. If the expression contains no stateful expressions then the original
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2983,6 +2983,9 @@ case class Sequence(

override def nullable: Boolean = children.exists(_.nullable)

// If step is defined, then an error will be thrown if the start and stop do not satisfy the step.
override lazy val throwable: Boolean = stepOpt.isDefined
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leverage NoThrow trait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem with NoThrow is that it doesn't mark that many expressions right now (i.e. if we only push down NoThrow expressions then we would be not pushing down most things). we want the opposite here (marking only certain expressions not to push down). defining it this way also allows us to be more precise with defining what is throwable, compared to just setting it for the class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two are similar but be conservative in two opposite directions. Filter pushdown is a very important feature and we don't want to suddenly disable it for many predicates. We only mark sequence as throwable for now.

NoThrow was added for a new optimization and only a few expressions are marked as NoThrow that can be optimized.

Since runtime error is rare, I think the current solution is better to produce more optimized plans for most cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's rare, why do we add a property to Expression?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it's indeed an expression property. NoThrow is not the right way to add properties as the caller side needs to do recursion: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala#L53-L59 , which is not good for code reuse (you need to call a util function)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably remove NoThrow then ... I approved it for now.


override def dataType: ArrayType = ArrayType(start.dataType, containsNull = false)

override def checkInputDataTypes(): TypeCheckResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1549,18 +1549,19 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {

val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
// The query execution/optimization does not guarantee the expressions are evaluated in order.
// We only can combine them if and only if both are deterministic.
// We only can combine them if and only if both are deterministic and the outer condition is not
// throwable (inner can be throwable as it was going to be evaluated first anyways).
case Filter(fc, nf @ Filter(nc, grandChild)) if nc.deterministic =>
val (combineCandidates, nonDeterministic) =
splitConjunctivePredicates(fc).partition(_.deterministic)
val (combineCandidates, rest) =
splitConjunctivePredicates(fc).partition(p => p.deterministic && !p.throwable)
val mergedFilter = (ExpressionSet(combineCandidates) --
ExpressionSet(splitConjunctivePredicates(nc))).reduceOption(And) match {
case Some(ac) =>
Filter(And(nc, ac), grandChild)
case None =>
nf
}
nonDeterministic.reduceOption(And).map(c => Filter(c, mergedFilter)).getOrElse(mergedFilter)
rest.reduceOption(And).map(c => Filter(c, mergedFilter)).getOrElse(mergedFilter)
}
}

Expand Down Expand Up @@ -1730,16 +1731,12 @@ object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with PredicateHelpe

// For each filter, expand the alias and check if the filter can be evaluated using
// attributes produced by the aggregate operator's child operator.
val (candidates, nonDeterministic) =
splitConjunctivePredicates(condition).partition(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet)
cond.deterministic && !cond.throwable &&
cond.references.nonEmpty && replaced.references.subsetOf(aggregate.child.outputSet)
}

val stayUp = rest ++ nonDeterministic

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
Expand Down Expand Up @@ -1904,13 +1901,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
* @return (canEvaluateInLeft, canEvaluateInRight, haveToEvaluateInBoth)
*/
private def split(condition: Seq[Expression], left: LogicalPlan, right: LogicalPlan) = {
val (pushDownCandidates, nonDeterministic) = condition.partition(_.deterministic)
val (pushDownCandidates, stayUp) =
condition.partition(cond => cond.deterministic && !cond.throwable)
val (leftEvaluateCondition, rest) =
pushDownCandidates.partition(_.references.subsetOf(left.outputSet))
val (rightEvaluateCondition, commonCondition) =
rest.partition(expr => expr.references.subsetOf(right.outputSet))

(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic)
(leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ stayUp)
}

private def canPushThrough(joinType: JoinType): Boolean = joinType match {
Expand All @@ -1933,8 +1931,9 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
val newRight = rightFilterConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
// don't push throwable expressions into join condition
val (newJoinConditions, others) =
commonFilterCondition.partition(canEvaluateWithinJoin)
commonFilterCondition.partition(cond => canEvaluateWithinJoin(cond) && !cond.throwable)
val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And)

val join = Join(newLeft, newRight, joinType, newJoinCond, hint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1433,4 +1433,67 @@ class FilterPushdownSuite extends PlanTest {
val correctAnswer = RebalancePartitions(Seq.empty, testRelation.where($"a" > 3)).analyze
comparePlans(optimized, correctAnswer)
}

test("SPARK-46707: push down predicate with sequence (without step) through joins") {
val x = testRelation.subquery("x")
val y = testRelation1.subquery("y")

// do not push down when sequence has step param
val queryWithStep = x.join(y, joinType = Inner, condition = Some($"x.c" === $"y.d"))
.where(IsNotNull(Sequence($"x.a", $"x.b", Some(Literal(1)))))
.analyze
val optimizedQueryWithStep = Optimize.execute(queryWithStep)
comparePlans(optimizedQueryWithStep, queryWithStep)

// push down when sequence does not have step param
val queryWithoutStep = x.join(y, joinType = Inner, condition = Some($"x.c" === $"y.d"))
.where(IsNotNull(Sequence($"x.a", $"x.b", None)))
.analyze
val optimizedQueryWithoutStep = Optimize.execute(queryWithoutStep)
val correctAnswer = x.where(IsNotNull(Sequence($"x.a", $"x.b", None)))
.join(y, joinType = Inner, condition = Some($"x.c" === $"y.d"))
.analyze
comparePlans(optimizedQueryWithoutStep, correctAnswer)
}

test("SPARK-46707: push down predicate with sequence (without step) through aggregates") {
val x = testRelation.subquery("x")

// do not push down when sequence has step param
val queryWithStep = x.groupBy($"x.a", $"x.b")($"x.a", $"x.b")
.where(IsNotNull(Sequence($"x.a", $"x.b", Some(Literal(1)))))
.analyze
val optimizedQueryWithStep = Optimize.execute(queryWithStep)
comparePlans(optimizedQueryWithStep, queryWithStep)

// push down when sequence does not have step param
val queryWithoutStep = x.groupBy($"x.a", $"x.b")($"x.a", $"x.b")
.where(IsNotNull(Sequence($"x.a", $"x.b", None)))
.analyze
val optimizedQueryWithoutStep = Optimize.execute(queryWithoutStep)
val correctAnswer = x.where(IsNotNull(Sequence($"x.a", $"x.b", None)))
.groupBy($"x.a", $"x.b")($"x.a", $"x.b")
.analyze
comparePlans(optimizedQueryWithoutStep, correctAnswer)
}

test("SPARK-46707: combine predicate with sequence (without step) with other filters") {
val x = testRelation.subquery("x")

// do not combine when sequence has step param
val queryWithStep = x.where($"x.c" > 1)
.where(IsNotNull(Sequence($"x.a", $"x.b", Some(Literal(1)))))
.analyze
val optimizedQueryWithStep = Optimize.execute(queryWithStep)
comparePlans(optimizedQueryWithStep, queryWithStep)

// combine when sequence does not have step param
val queryWithoutStep = x.where($"x.c" > 1)
.where(IsNotNull(Sequence($"x.a", $"x.b", None)))
.analyze
val optimizedQueryWithoutStep = Optimize.execute(queryWithoutStep)
val correctAnswer = x.where(IsNotNull(Sequence($"x.a", $"x.b", None)) && $"x.c" > 1)
.analyze
comparePlans(optimizedQueryWithoutStep, correctAnswer)
}
}