Skip to content

Commit fff3382

Browse files
committed
Try to have a rule to fix nullability
1 parent 78ed4ef commit fff3382

File tree

2 files changed

+26
-39
lines changed

2 files changed

+26
-39
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
2828
import org.apache.spark.sql.catalyst.expressions.aggregate._
2929
import org.apache.spark.sql.catalyst.expressions.objects.NewInstance
3030
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
31-
import org.apache.spark.sql.catalyst.planning.{ExtractJoinOutputAttributes, IntegerIndex}
31+
import org.apache.spark.sql.catalyst.planning.IntegerIndex
3232
import org.apache.spark.sql.catalyst.plans._
3333
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
3434
import org.apache.spark.sql.catalyst.rules._
@@ -109,8 +109,8 @@ class Analyzer(
109109
TimeWindowing ::
110110
TypeCoercion.typeCoercionRules ++
111111
extendedResolutionRules : _*),
112-
Batch("Solve", Once,
113-
SolveIllegalReferences),
112+
Batch("FixNullability", Once,
113+
FixNullability),
114114
Batch("Nondeterministic", Once,
115115
PullOutNondeterministic),
116116
Batch("UDF", Once,
@@ -1450,27 +1450,32 @@ class Analyzer(
14501450
}
14511451

14521452
/**
1453-
* Corrects attribute references in an expression tree of some operators (e.g., filters and
1454-
* projects) if these operators have a join as a child and the references point to columns on the
1455-
* input relation of the join. This is because some joins change the nullability of input columns
1456-
* and this could cause illegal optimization (e.g., NULL propagation) and wrong answers.
1453+
* Fixes nullability of Attributes in a resolved LogicalPlan by using the nullability of
1454+
* corresponding Attributes of its children output Attributes. This step is needed because
1455+
* users can use a resolved AttributeReference in the Dataset API and outer joins
1456+
* can change the nullability of an AttribtueReference. Without the fix, a nullable column's
1457+
* nullable field can be actually set as non-nullable, which cause illegal optimization
1458+
* (e.g., NULL propagation) and wrong answers.
14571459
* See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
14581460
*/
1459-
object SolveIllegalReferences extends Rule[LogicalPlan] {
1461+
object FixNullability extends Rule[LogicalPlan] {
14601462

1461-
private def replaceReferences(e: Expression, attrMap: AttributeMap[Attribute]) = e.transform {
1462-
case a: AttributeReference => attrMap.get(a).getOrElse(a)
1463-
}
1464-
1465-
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
1466-
case q: LogicalPlan =>
1467-
q.transform {
1468-
case f @ Filter(filterCondition, ExtractJoinOutputAttributes(join, joinOutputMap)) =>
1469-
f.copy(condition = replaceReferences(filterCondition, joinOutputMap))
1470-
case p @ Project(projectList, ExtractJoinOutputAttributes(join, joinOutputMap)) =>
1471-
p.copy(projectList = projectList.map { e =>
1472-
replaceReferences(e, joinOutputMap).asInstanceOf[NamedExpression]
1473-
})
1463+
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
1464+
case q: LogicalPlan if q.resolved =>
1465+
val childrenOutput = q.children.flatMap(c => c.output).groupBy(_.exprId).flatMap {
1466+
case (exprId, attributes) =>
1467+
// If there are multiple Attributes having the same ExpirId, we need to resolve
1468+
// the conflict of nullable field.
1469+
val nullable = attributes.map(_.nullable).reduce(_ || _)
1470+
attributes.map(attr => attr.withNullability(nullable))
1471+
}.toSeq
1472+
val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr))
1473+
// For an Attribute used by the current LogicalPlan, if it is from its children,
1474+
// we fix the nullable field by using the nullability setting of the corresponding
1475+
// output Attribute from the children.
1476+
q.transformExpressions {
1477+
case attr: Attribute if attributeMap.contains(attr) =>
1478+
attr.withNullability(attributeMap(attr).nullable)
14741479
}
14751480
}
14761481
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -181,24 +181,6 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
181181
}
182182
}
183183

184-
/**
185-
* An extractor for join output attributes directly under a given operator.
186-
*/
187-
object ExtractJoinOutputAttributes {
188-
189-
def unapply(plan: LogicalPlan): Option[(Join, AttributeMap[Attribute])] = {
190-
plan.collectFirst {
191-
case j: Join => j
192-
}.map { join =>
193-
val joinOutput = new mutable.ArrayBuffer[(Attribute, Attribute)]
194-
join.output.foreach {
195-
case a: AttributeReference => joinOutput += ((a, a))
196-
}
197-
(join, AttributeMap(joinOutput))
198-
}
199-
}
200-
}
201-
202184
/**
203185
* A pattern that collects all adjacent unions and returns their children as a Seq.
204186
*/

0 commit comments

Comments
 (0)