Skip to content

Commit 7fe9329

Browse files
committed
Code refine
1 parent b10879f commit 7fe9329

File tree

2 files changed

+40
-24
lines changed

2 files changed

+40
-24
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -679,26 +679,26 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
679679
// Infer filter for left/right outer joins
680680
val newLeftOpt = joinType match {
681681
case RightOuter if newConditionOpt.isDefined =>
682-
val rightConstraints = right.constraints.union(
683-
splitConjunctivePredicates(newConditionOpt.get).toSet)
684-
val inferredConstraints = ExpressionSet(
685-
QueryPlanConstraints.inferAdditionalConstraints(rightConstraints))
686-
val leftConditions = inferredConstraints
687-
.filter(_.deterministic)
688-
.filter(_.references.subsetOf(left.outputSet))
689-
leftConditions.reduceLeftOption(And).map(Filter(_, left))
682+
val inferredConstraints = left.getRelevantConstraints(
683+
left.constraints
684+
.union(right.constraints)
685+
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
686+
val newFilters = inferredConstraints
687+
.filterNot(left.constraints.contains)
688+
.reduceLeftOption(And)
689+
newFilters.map(Filter(_, left))
690690
case _ => None
691691
}
692692
val newRightOpt = joinType match {
693693
case LeftOuter if newConditionOpt.isDefined =>
694-
val leftConstraints = left.constraints.union(
695-
splitConjunctivePredicates(newConditionOpt.get).toSet)
696-
val inferredConstraints = ExpressionSet(
697-
QueryPlanConstraints.inferAdditionalConstraints(leftConstraints))
698-
val rightConditions = inferredConstraints
699-
.filter(_.deterministic)
700-
.filter(_.references.subsetOf(right.outputSet))
701-
rightConditions.reduceLeftOption(And).map(Filter(_, right))
694+
val inferredConstraints = right.getRelevantConstraints(
695+
right.constraints
696+
.union(left.constraints)
697+
.union(splitConjunctivePredicates(newConditionOpt.get).toSet))
698+
val newFilters = inferredConstraints
699+
.filterNot(right.constraints.contains)
700+
.reduceLeftOption(And)
701+
newFilters.map(Filter(_, right))
702702
case _ => None
703703
}
704704

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ trait QueryPlanConstraints { self: LogicalPlan =>
2929
lazy val allConstraints: ExpressionSet = {
3030
if (conf.constraintPropagationEnabled) {
3131
ExpressionSet(validConstraints
32-
.union(QueryPlanConstraints.inferAdditionalConstraints(validConstraints))
32+
.union(inferAdditionalConstraints(validConstraints))
3333
.union(constructIsNotNullConstraints(validConstraints)))
3434
} else {
3535
ExpressionSet(Set.empty)
@@ -41,9 +41,7 @@ trait QueryPlanConstraints { self: LogicalPlan =>
4141
* example, if this set contains the expression `a = 2` then that expression is guaranteed to
4242
* evaluate to `true` for all rows produced.
4343
*/
44-
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c =>
45-
c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
46-
})
44+
lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter(selfReferenceOnly))
4745

4846
/**
4947
* This method can be overridden by any child class of QueryPlan to specify a set of constraints
@@ -55,6 +53,23 @@ trait QueryPlanConstraints { self: LogicalPlan =>
5553
*/
5654
protected def validConstraints: Set[Expression] = Set.empty
5755

56+
/**
57+
* Returns an [[ExpressionSet]] that contains an additional set of constraints, such as
58+
* equality constraints and `isNotNull` constraints, etc., and that only contains references
59+
* to this [[LogicalPlan]] node.
60+
*/
61+
def getRelevantConstraints(constraints: Set[Expression]): ExpressionSet = {
62+
val allRelevantConstraints =
63+
if (conf.constraintPropagationEnabled) {
64+
constraints
65+
.union(inferAdditionalConstraints(constraints))
66+
.union(constructIsNotNullConstraints(constraints))
67+
} else {
68+
constraints
69+
}
70+
ExpressionSet(allRelevantConstraints.filter(selfReferenceOnly))
71+
}
72+
5873
/**
5974
* Infers a set of `isNotNull` constraints from null intolerant expressions as well as
6075
* non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this
@@ -96,16 +111,13 @@ trait QueryPlanConstraints { self: LogicalPlan =>
96111
case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute)
97112
case _ => Seq.empty[Attribute]
98113
}
99-
}
100-
101-
object QueryPlanConstraints {
102114

103115
/**
104116
* Infers an additional set of constraints from a given set of equality constraints.
105117
* For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an
106118
* additional constraint of the form `b = 5`.
107119
*/
108-
def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
120+
private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = {
109121
var inferredConstraints = Set.empty[Expression]
110122
constraints.foreach {
111123
case eq @ EqualTo(l: Attribute, r: Attribute) =>
@@ -123,4 +135,8 @@ object QueryPlanConstraints {
123135
destination: Attribute): Set[Expression] = constraints.map(_ transform {
124136
case e: Expression if e.semanticEquals(source) => destination
125137
})
138+
139+
private def selfReferenceOnly(e: Expression): Boolean = {
140+
e.references.nonEmpty && e.references.subsetOf(outputSet) && e.deterministic
141+
}
126142
}

0 commit comments

Comments
 (0)