From ac17976fd2b024039ee6cd848b864d2d052ec573 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Tue, 13 Mar 2018 14:05:37 -0700 Subject: [PATCH 1/4] SPARK-21479 Outer join filter pushdown in null supplying table when condition is on one of the joined columns --- .../sql/catalyst/optimizer/Optimizer.scala | 61 +++++++++++++++++++ .../plans/logical/QueryPlanConstraints.scala | 7 ++- .../InferFiltersFromConstraintsSuite.scala | 37 +++++++++++ 3 files changed, 103 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 91208479be03..b63468ac24a6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -58,6 +58,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) LimitPushDown, ColumnPruning, InferFiltersFromConstraints, + TransitPredicateInOuterJoin, // Operator combine CollapseRepartition, CollapseProject, @@ -1071,6 +1072,66 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } } +/** + * Infer and transit predicate from the preserved side to the null-supplying side + * of an outer join. The predicate is inferred from the preserved side based on the + * join condition and will be pushed over to the null-supplying side. For example, + * if the preserved side has constraints of the form 'a > 5' and the join condition + * is 'a = b', in which 'b' is an attribute from the null-supplying side, a [[Filter]] + * operator of 'b > 5' will be applied to the null-supplying side. + * + * Applying this rule will not change the constraints of the [[Join]] operator, so + * aside from its child being transformed, there is no side-effect to the [[Join]] + * operator itself. + */ +object TransitPredicateInOuterJoin extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = + if (!plan.conf.constraintPropagationEnabled) { + plan + } else plan transform { + case j@Join(left, right, joinType, joinCondition) => + joinType match { + case RightOuter if joinCondition.isDefined => + val rightConstraints = right.constraints.union( + splitConjunctivePredicates(joinCondition.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints + .filter(_.deterministic) + .filter(_.references.subsetOf(left.outputSet)) + if (leftConditions.isEmpty) { + j + } else { + // push the predicate down to left side sub query. + val newLeft = leftConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = right + + Join(newLeft, newRight, RightOuter, joinCondition) + } + case LeftOuter if joinCondition.isDefined => + val leftConstraints = left.constraints.union( + splitConjunctivePredicates(joinCondition.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(leftConstraints)) + val rightConditions = inferredConstraints + .filter(_.deterministic) + .filter(_.references.subsetOf(right.outputSet)) + if (rightConditions.isEmpty) { + j + } else { + // push the predicate down to right side sub query. + val newRight = rightConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newLeft = left + + Join(newLeft, newRight, LeftOuter, joinCondition) + } + case _ => j + } + } +} + /** * Combines two adjacent [[Limit]] operators into one, merging the * expressions into one single expression. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 046848875548..de49d749d86e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -29,7 +29,7 @@ trait QueryPlanConstraints { self: LogicalPlan => lazy val allConstraints: ExpressionSet = { if (conf.constraintPropagationEnabled) { ExpressionSet(validConstraints - .union(inferAdditionalConstraints(validConstraints)) + .union(QueryPlanConstraints.inferAdditionalConstraints(validConstraints)) .union(constructIsNotNullConstraints(validConstraints))) } else { ExpressionSet(Set.empty) @@ -96,13 +96,16 @@ trait QueryPlanConstraints { self: LogicalPlan => case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute) case _ => Seq.empty[Attribute] } +} + +object QueryPlanConstraints { /** * Infers an additional set of constraints from a given set of equality constraints. * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an * additional constraint of the form `b = 5`. */ - private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { + def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { var inferredConstraints = Set.empty[Expression] constraints.foreach { case eq @ EqualTo(l: Attribute, r: Attribute) => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index f78c2356e35a..c877fffdec57 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -33,6 +33,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { PushPredicateThroughJoin, PushDownPredicate, InferFiltersFromConstraints, + TransitPredicateInOuterJoin, CombineFilters, SimplifyBinaryComparison, BooleanSimplification) :: Nil @@ -204,4 +205,40 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } + + test("SPARK-21479: Outer join after-join filters push down to null-supplying side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val condition = Some("x.a".attr === "y.a".attr) + val originalQuery = x.join(y, LeftOuter, condition).where("x.a".attr === 2).analyze + val left = x.where(IsNotNull('a) && 'a === 2) + val right = y.where(IsNotNull('a) && 'a === 2) + val correctAnswer = left.join(right, LeftOuter, condition).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("SPARK-21479: Outer join pre-existing filters push down to null-supplying side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val condition = Some("x.a".attr === "y.a".attr) + val originalQuery = x.join(y.where("y.a".attr > 5), RightOuter, condition).analyze + val left = x.where(IsNotNull('a) && 'a > 5) + val right = y.where(IsNotNull('a) && 'a > 5) + val correctAnswer = left.join(right, RightOuter, condition).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } + + test("SPARK-21479: Outer join no filter push down to preserved side") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val condition = Some("x.a".attr === "y.a".attr) + val originalQuery = x.join(y.where("y.a".attr === 1), LeftOuter, condition).analyze + val left = x + val right = y.where(IsNotNull('a) && 'a === 1) + val correctAnswer = left.join(right, LeftOuter, condition).analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } } From 1c5dedb6f61cfe754d238c09b670a3b327a4feb8 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Tue, 13 Mar 2018 16:14:56 -0700 Subject: [PATCH 2/4] Code refine: combine TransitPredicateInOuterJoin with InferFiltersFromConstraints --- .../sql/catalyst/optimizer/Optimizer.scala | 111 ++++++++---------- .../InferFiltersFromConstraintsSuite.scala | 1 - 2 files changed, 47 insertions(+), 65 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b63468ac24a6..23d8c7466738 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -58,7 +58,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) LimitPushDown, ColumnPruning, InferFiltersFromConstraints, - TransitPredicateInOuterJoin, // Operator combine CollapseRepartition, CollapseProject, @@ -636,8 +635,11 @@ object CollapseWindow extends Rule[LogicalPlan] { * constraints. These filters are currently inserted to the existing conditions in the Filter * operators and on either side of Join operators. * - * Note: While this optimization is applicable to all types of join, it primarily benefits Inner and - * LeftSemi joins. + * In addition, for left/right outer joins, infer predicate from the preserved side of the Join + * operator and push the inferred filter over to the null-supplying side. For example, if the + * preserved side has constraints of the form 'a > 5' and the join condition is 'a = b', in + * which 'b' is an attribute from the null-supplying side, a [[Filter]] operator of 'b > 5' will + * be applied to the null-supplying side. */ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelper { @@ -674,7 +676,48 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe case None => additionalConstraints.reduceOption(And) } - if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + val j = if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join + + // Infer filter for left/right outer joins + joinType match { + case RightOuter if j.condition.isDefined => + val rightConstraints = right.constraints.union( + splitConjunctivePredicates(j.condition.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) + val leftConditions = inferredConstraints + .filter(_.deterministic) + .filter(_.references.subsetOf(left.outputSet)) + if (leftConditions.isEmpty) { + j + } else { + // push the predicate down to left side sub query. + val newLeft = leftConditions. + reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) + val newRight = right + + Join(newLeft, newRight, RightOuter, j.condition) + } + case LeftOuter if j.condition.isDefined => + val leftConstraints = left.constraints.union( + splitConjunctivePredicates(j.condition.get).toSet) + val inferredConstraints = ExpressionSet( + QueryPlanConstraints.inferAdditionalConstraints(leftConstraints)) + val rightConditions = inferredConstraints + .filter(_.deterministic) + .filter(_.references.subsetOf(right.outputSet)) + if (rightConditions.isEmpty) { + j + } else { + // push the predicate down to right side sub query. + val newRight = rightConditions. + reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) + val newLeft = left + + Join(newLeft, newRight, LeftOuter, j.condition) + } + case _ => j + } } } @@ -1072,66 +1115,6 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { } } -/** - * Infer and transit predicate from the preserved side to the null-supplying side - * of an outer join. The predicate is inferred from the preserved side based on the - * join condition and will be pushed over to the null-supplying side. For example, - * if the preserved side has constraints of the form 'a > 5' and the join condition - * is 'a = b', in which 'b' is an attribute from the null-supplying side, a [[Filter]] - * operator of 'b > 5' will be applied to the null-supplying side. - * - * Applying this rule will not change the constraints of the [[Join]] operator, so - * aside from its child being transformed, there is no side-effect to the [[Join]] - * operator itself. - */ -object TransitPredicateInOuterJoin extends Rule[LogicalPlan] with PredicateHelper { - def apply(plan: LogicalPlan): LogicalPlan = - if (!plan.conf.constraintPropagationEnabled) { - plan - } else plan transform { - case j@Join(left, right, joinType, joinCondition) => - joinType match { - case RightOuter if joinCondition.isDefined => - val rightConstraints = right.constraints.union( - splitConjunctivePredicates(joinCondition.get).toSet) - val inferredConstraints = ExpressionSet( - QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) - val leftConditions = inferredConstraints - .filter(_.deterministic) - .filter(_.references.subsetOf(left.outputSet)) - if (leftConditions.isEmpty) { - j - } else { - // push the predicate down to left side sub query. - val newLeft = leftConditions. - reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = right - - Join(newLeft, newRight, RightOuter, joinCondition) - } - case LeftOuter if joinCondition.isDefined => - val leftConstraints = left.constraints.union( - splitConjunctivePredicates(joinCondition.get).toSet) - val inferredConstraints = ExpressionSet( - QueryPlanConstraints.inferAdditionalConstraints(leftConstraints)) - val rightConditions = inferredConstraints - .filter(_.deterministic) - .filter(_.references.subsetOf(right.outputSet)) - if (rightConditions.isEmpty) { - j - } else { - // push the predicate down to right side sub query. - val newRight = rightConditions. - reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newLeft = left - - Join(newLeft, newRight, LeftOuter, joinCondition) - } - case _ => j - } - } -} - /** * Combines two adjacent [[Limit]] operators into one, merging the * expressions into one single expression. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index c877fffdec57..e068f5104458 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -33,7 +33,6 @@ class InferFiltersFromConstraintsSuite extends PlanTest { PushPredicateThroughJoin, PushDownPredicate, InferFiltersFromConstraints, - TransitPredicateInOuterJoin, CombineFilters, SimplifyBinaryComparison, BooleanSimplification) :: Nil From b10879f96aaf7e575fc9aed841171037e2688ba3 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Wed, 14 Mar 2018 15:53:37 -0700 Subject: [PATCH 3/4] Code refine --- .../sql/catalyst/optimizer/Optimizer.scala | 46 ++++++++----------- 1 file changed, 18 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 23d8c7466738..42d3f75ce9d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -672,51 +672,41 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe val newConditionOpt = conditionOpt match { case Some(condition) => val newFilters = additionalConstraints -- splitConjunctivePredicates(condition) - if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else None + if (newFilters.nonEmpty) Option(And(newFilters.reduce(And), condition)) else conditionOpt case None => additionalConstraints.reduceOption(And) } - val j = if (newConditionOpt.isDefined) Join(left, right, joinType, newConditionOpt) else join - // Infer filter for left/right outer joins - joinType match { - case RightOuter if j.condition.isDefined => + val newLeftOpt = joinType match { + case RightOuter if newConditionOpt.isDefined => val rightConstraints = right.constraints.union( - splitConjunctivePredicates(j.condition.get).toSet) + splitConjunctivePredicates(newConditionOpt.get).toSet) val inferredConstraints = ExpressionSet( QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) val leftConditions = inferredConstraints .filter(_.deterministic) .filter(_.references.subsetOf(left.outputSet)) - if (leftConditions.isEmpty) { - j - } else { - // push the predicate down to left side sub query. - val newLeft = leftConditions. - reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = right - - Join(newLeft, newRight, RightOuter, j.condition) - } - case LeftOuter if j.condition.isDefined => + leftConditions.reduceLeftOption(And).map(Filter(_, left)) + case _ => None + } + val newRightOpt = joinType match { + case LeftOuter if newConditionOpt.isDefined => val leftConstraints = left.constraints.union( - splitConjunctivePredicates(j.condition.get).toSet) + splitConjunctivePredicates(newConditionOpt.get).toSet) val inferredConstraints = ExpressionSet( QueryPlanConstraints.inferAdditionalConstraints(leftConstraints)) val rightConditions = inferredConstraints .filter(_.deterministic) .filter(_.references.subsetOf(right.outputSet)) - if (rightConditions.isEmpty) { - j - } else { - // push the predicate down to right side sub query. - val newRight = rightConditions. - reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newLeft = left + rightConditions.reduceLeftOption(And).map(Filter(_, right)) + case _ => None + } - Join(newLeft, newRight, LeftOuter, j.condition) - } - case _ => j + if ((newConditionOpt.isDefined && (newConditionOpt ne conditionOpt)) + || newLeftOpt.isDefined || newRightOpt.isDefined) { + Join(newLeftOpt.getOrElse(left), newRightOpt.getOrElse(right), joinType, newConditionOpt) + } else { + join } } } From 7fe93295df5627f2fc4e712b71aa9ce75383d410 Mon Sep 17 00:00:00 2001 From: maryannxue Date: Mon, 19 Mar 2018 22:50:02 -0700 Subject: [PATCH 4/4] Code refine --- .../sql/catalyst/optimizer/Optimizer.scala | 32 +++++++++---------- .../plans/logical/QueryPlanConstraints.scala | 32 ++++++++++++++----- 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 42d3f75ce9d2..c947bdd28ab3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -679,26 +679,26 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe // Infer filter for left/right outer joins val newLeftOpt = joinType match { case RightOuter if newConditionOpt.isDefined => - val rightConstraints = right.constraints.union( - splitConjunctivePredicates(newConditionOpt.get).toSet) - val inferredConstraints = ExpressionSet( - QueryPlanConstraints.inferAdditionalConstraints(rightConstraints)) - val leftConditions = inferredConstraints - .filter(_.deterministic) - .filter(_.references.subsetOf(left.outputSet)) - leftConditions.reduceLeftOption(And).map(Filter(_, left)) + val inferredConstraints = left.getRelevantConstraints( + left.constraints + .union(right.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints + .filterNot(left.constraints.contains) + .reduceLeftOption(And) + newFilters.map(Filter(_, left)) case _ => None } val newRightOpt = joinType match { case LeftOuter if newConditionOpt.isDefined => - val leftConstraints = left.constraints.union( - splitConjunctivePredicates(newConditionOpt.get).toSet) - val inferredConstraints = ExpressionSet( - QueryPlanConstraints.inferAdditionalConstraints(leftConstraints)) - val rightConditions = inferredConstraints - .filter(_.deterministic) - .filter(_.references.subsetOf(right.outputSet)) - rightConditions.reduceLeftOption(And).map(Filter(_, right)) + val inferredConstraints = right.getRelevantConstraints( + right.constraints + .union(left.constraints) + .union(splitConjunctivePredicates(newConditionOpt.get).toSet)) + val newFilters = inferredConstraints + .filterNot(right.constraints.contains) + .reduceLeftOption(And) + newFilters.map(Filter(_, right)) case _ => None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index de49d749d86e..a29f3d29236c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -29,7 +29,7 @@ trait QueryPlanConstraints { self: LogicalPlan => lazy val allConstraints: ExpressionSet = { if (conf.constraintPropagationEnabled) { ExpressionSet(validConstraints - .union(QueryPlanConstraints.inferAdditionalConstraints(validConstraints)) + .union(inferAdditionalConstraints(validConstraints)) .union(constructIsNotNullConstraints(validConstraints))) } else { ExpressionSet(Set.empty) @@ -41,9 +41,7 @@ trait QueryPlanConstraints { self: LogicalPlan => * example, if this set contains the expression `a = 2` then that expression is guaranteed to * evaluate to `true` for all rows produced. */ - lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c => - c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic - }) + lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter(selfReferenceOnly)) /** * This method can be overridden by any child class of QueryPlan to specify a set of constraints @@ -55,6 +53,23 @@ trait QueryPlanConstraints { self: LogicalPlan => */ protected def validConstraints: Set[Expression] = Set.empty + /** + * Returns an [[ExpressionSet]] that contains an additional set of constraints, such as + * equality constraints and `isNotNull` constraints, etc., and that only contains references + * to this [[LogicalPlan]] node. + */ + def getRelevantConstraints(constraints: Set[Expression]): ExpressionSet = { + val allRelevantConstraints = + if (conf.constraintPropagationEnabled) { + constraints + .union(inferAdditionalConstraints(constraints)) + .union(constructIsNotNullConstraints(constraints)) + } else { + constraints + } + ExpressionSet(allRelevantConstraints.filter(selfReferenceOnly)) + } + /** * Infers a set of `isNotNull` constraints from null intolerant expressions as well as * non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this @@ -96,16 +111,13 @@ trait QueryPlanConstraints { self: LogicalPlan => case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute) case _ => Seq.empty[Attribute] } -} - -object QueryPlanConstraints { /** * Infers an additional set of constraints from a given set of equality constraints. * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an * additional constraint of the form `b = 5`. */ - def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { + private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { var inferredConstraints = Set.empty[Expression] constraints.foreach { case eq @ EqualTo(l: Attribute, r: Attribute) => @@ -123,4 +135,8 @@ object QueryPlanConstraints { destination: Attribute): Set[Expression] = constraints.map(_ transform { case e: Expression if e.semanticEquals(source) => destination }) + + private def selfReferenceOnly(e: Expression): Boolean = { + e.references.nonEmpty && e.references.subsetOf(outputSet) && e.deterministic + } }