From 2dcecc8a934d6224d7680f5d25b7a98c53779d79 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 26 Feb 2016 00:56:29 +0900 Subject: [PATCH 01/10] Avoid illegal NULL propagation --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 ++++++++++++++- .../org/apache/spark/sql/DataFrameJoinSuite.scala | 11 +++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9a92330f75f6..d1497d37298a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -625,7 +625,7 @@ class Analyzer( case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") - q transformExpressionsUp { + val resolvedPlan = q transformExpressionsUp { case u @ UnresolvedAttribute(nameParts) => // Leave unchanged if resolution fails. Hopefully will be resolved next round. val result = @@ -635,6 +635,19 @@ class Analyzer( case UnresolvedExtractValue(child, fieldExpr) if child.resolved => ExtractValue(child, fieldExpr, resolver) } + + resolvedPlan.transform { + case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => + val joinOutput = new ArrayBuffer[(Attribute, Attribute)] + j.output.map { + case a: AttributeReference => joinOutput += ((a, a)) + } + val joinOutputMap = AttributeMap(joinOutput) + val newFilterCond = filterCondition.transform { + case a: AttributeReference => joinOutputMap.get(a).getOrElse(a) + } + Filter(newFilterCond, j) + } } def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 031e66b57cbc..d99921c48d05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -204,4 +204,15 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { leftJoin2Inner, Row(1, 2, "1", 1, 3, "1") :: Nil) } + + test("filter outer join results using a non-nullable column from a right table") { + val df1 = Seq((0, 0), (1, 0), (2, 0), (3, 0), (4, 0)).toDF("id", "count") + val df2 = Seq(Tuple1(0), Tuple1(1)).toDF("id").groupBy("id").count + checkAnswer( + df1.join(df2, df1("id") === df2("id"), "left_outer").filter(df2("count").isNull), + Row(2, 0, null, null) :: + Row(3, 0, null, null) :: + Row(4, 0, null, null) :: Nil + ) + } } From 1ebfa80e8d52d959ffc98d191019b9e4242dd4a6 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 26 Feb 2016 16:48:11 +0900 Subject: [PATCH 02/10] Add comments --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d1497d37298a..a0a8c92bec9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -636,6 +636,10 @@ class Analyzer( ExtractValue(child, fieldExpr, resolver) } + // Replaces attribute references in a filter if it has a join as a child and it references + // some columns on the base relations of the join. This is because outer joins change + // nullability on columns and this could cause wrong NULL propagation in Optimizer. + // See SPARK-13484 for the concrete query of this case. resolvedPlan.transform { case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => val joinOutput = new ArrayBuffer[(Attribute, Attribute)] From ab5d4f1174839abbd7f33cf587f75ae75cc76b68 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Sun, 28 Feb 2016 00:16:54 +0900 Subject: [PATCH 03/10] Add a new rule to solve illegal references --- .../sql/catalyst/analysis/Analyzer.scala | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a0a8c92bec9b..25db054d162a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -109,6 +109,8 @@ class Analyzer( TimeWindowing :: TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), + Batch("Solve", Once, + SolveIllegalReferences), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, @@ -1464,6 +1466,30 @@ class Analyzer( } } + /** + * Replaces attribute references in a filter if it has a join as a child and it references some + * columns on the base relations of the join. This is because outer joins change nullability on + * columns and this could cause wrong NULL propagation in Optimizer. + * See SPARK-13484 for the concrete query of this case. + */ + object SolveIllegalReferences extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case q: LogicalPlan => + q.transform { + case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => + val joinOutput = new ArrayBuffer[(Attribute, Attribute)] + j.output.map { + case a: AttributeReference => joinOutput += ((a, a)) + } + val joinOutputMap = AttributeMap(joinOutput) + val newFilterCond = filterCondition.transform { + case a: AttributeReference => joinOutputMap.get(a).getOrElse(a) + } + Filter(newFilterCond, j) + } + } + } + /** * Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and * aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]] From 8850cb3046a302afd012e4508bf8bfbd9f59051d Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Wed, 2 Mar 2016 15:19:59 +0900 Subject: [PATCH 04/10] Use foreach not map --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 25db054d162a..e3565dee625d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1478,7 +1478,7 @@ class Analyzer( q.transform { case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => val joinOutput = new ArrayBuffer[(Attribute, Attribute)] - j.output.map { + j.output.foreach { case a: AttributeReference => joinOutput += ((a, a)) } val joinOutputMap = AttributeMap(joinOutput) From b636b34bbe09f4152567b15b1fae30f76f7ffe55 Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 Apr 2016 11:17:01 +0900 Subject: [PATCH 05/10] Solve illegal references in Projects --- .../sql/catalyst/analysis/Analyzer.scala | 35 ++++++++++--------- .../sql/catalyst/planning/patterns.scala | 17 +++++++++ 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e3565dee625d..230d2780a09e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.NewInstance import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification -import org.apache.spark.sql.catalyst.planning.IntegerIndex +import org.apache.spark.sql.catalyst.planning.{ExtractJoinOutputAttributes, IntegerIndex} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} import org.apache.spark.sql.catalyst.rules._ @@ -1467,25 +1467,27 @@ class Analyzer( } /** - * Replaces attribute references in a filter if it has a join as a child and it references some - * columns on the base relations of the join. This is because outer joins change nullability on - * columns and this could cause wrong NULL propagation in Optimizer. - * See SPARK-13484 for the concrete query of this case. + * Corrects attribute references in an expression tree of some operators (e.g., filters and + * projects) if these operators have a join as a child and the references point to columns on the + * input relation of the join. This is because some joins change the nullability of input columns + * and this could cause illegal optimization (e.g., NULL propagation) and wrong answers. + * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. */ object SolveIllegalReferences extends Rule[LogicalPlan] { + + private def replaceReferences(e: Expression, attrMap: AttributeMap[Attribute]) = e.transform { + case a: AttributeReference => attrMap.get(a).getOrElse(a) + } + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case q: LogicalPlan => + case q: LogicalPlan => q.transform { - case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => - val joinOutput = new ArrayBuffer[(Attribute, Attribute)] - j.output.foreach { - case a: AttributeReference => joinOutput += ((a, a)) - } - val joinOutputMap = AttributeMap(joinOutput) - val newFilterCond = filterCondition.transform { - case a: AttributeReference => joinOutputMap.get(a).getOrElse(a) - } - Filter(newFilterCond, j) + case f @ Filter(filterCondition, ExtractJoinOutputAttributes(join, joinOutputMap)) => + f.copy(condition = replaceReferences(filterCondition, joinOutputMap)) + case p @ Project(projectList, ExtractJoinOutputAttributes(join, joinOutputMap)) => + p.copy(projectList = projectList.map { e => + replaceReferences(e, joinOutputMap).asInstanceOf[NamedExpression] + }) } } } @@ -2170,4 +2172,3 @@ object TimeWindowing extends Rule[LogicalPlan] { } } } - diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index f42e67ca6ec2..1561de5ebaf0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -181,6 +181,23 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { } } +/** + * An extractor for join output attributes directly under a given operator. + */ +object ExtractJoinOutputAttributes { + + def unapply(plan: LogicalPlan): Option[(Join, AttributeMap[Attribute])] = { + plan.collectFirst { + case j: Join => j + }.map { join => + val joinOutput = new mutable.ArrayBuffer[(Attribute, Attribute)] + join.output.foreach { + case a: AttributeReference => joinOutput += ((a, a)) + } + (join, AttributeMap(joinOutput)) + } + } +} /** * A pattern that collects all adjacent unions and returns their children as a Seq. From dfd952870d0b157dc8571ddee053a9fe91d372ed Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 Apr 2016 13:28:38 +0900 Subject: [PATCH 06/10] Add tests in DataFrameJoinSuite --- .../sql/catalyst/analysis/Analyzer.scala | 19 +------------------ .../apache/spark/sql/DataFrameJoinSuite.scala | 12 +++++++++++- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 230d2780a09e..f433c4947f41 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -627,7 +627,7 @@ class Analyzer( case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") - val resolvedPlan = q transformExpressionsUp { + q transformExpressionsUp { case u @ UnresolvedAttribute(nameParts) => // Leave unchanged if resolution fails. Hopefully will be resolved next round. val result = @@ -637,23 +637,6 @@ class Analyzer( case UnresolvedExtractValue(child, fieldExpr) if child.resolved => ExtractValue(child, fieldExpr, resolver) } - - // Replaces attribute references in a filter if it has a join as a child and it references - // some columns on the base relations of the join. This is because outer joins change - // nullability on columns and this could cause wrong NULL propagation in Optimizer. - // See SPARK-13484 for the concrete query of this case. - resolvedPlan.transform { - case f @ Filter(filterCondition, j @ Join(_, _, _, _)) => - val joinOutput = new ArrayBuffer[(Attribute, Attribute)] - j.output.map { - case a: AttributeReference => joinOutput += ((a, a)) - } - val joinOutputMap = AttributeMap(joinOutput) - val newFilterCond = filterCondition.transform { - case a: AttributeReference => joinOutputMap.get(a).getOrElse(a) - } - Filter(newFilterCond, j) - } } def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index d99921c48d05..4342c039aefc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -205,7 +205,8 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(1, 2, "1", 1, 3, "1") :: Nil) } - test("filter outer join results using a non-nullable column from a right table") { + test("process outer join results using the non-nullable columns in the join input") { + // Filter data using a non-nullable column from a right table val df1 = Seq((0, 0), (1, 0), (2, 0), (3, 0), (4, 0)).toDF("id", "count") val df2 = Seq(Tuple1(0), Tuple1(1)).toDF("id").groupBy("id").count checkAnswer( @@ -214,5 +215,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(3, 0, null, null) :: Row(4, 0, null, null) :: Nil ) + + // Coalesce data using non-nullable columns in input tables + val df3 = Seq((1, 1)).toDF("a", "b") + val df4 = Seq((2, 2)).toDF("a", "b") + checkAnswer( + df3.join(df4, df3("a") === df4("a"), "outer") + .select(coalesce(df3("a"), df3("b")), coalesce(df4("a"), df4("b"))), + Row(1, null) :: Row(null, 2) :: Nil + ) } } From 78ed4ef7a5d6c4565efd468cc48d6bfd1321f08b Mon Sep 17 00:00:00 2001 From: Takeshi YAMAMURO Date: Fri, 15 Apr 2016 17:44:34 +0900 Subject: [PATCH 07/10] Fix test codes in ResolveNaturalJoinSuite --- .../spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala index 1423a8705af2..748579df4158 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala @@ -100,7 +100,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { val naturalPlan = r3.join(r4, NaturalJoin(FullOuter), None) val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq(UnresolvedAttribute("b"))), None) val expected = r3.join(r4, FullOuter, Some(EqualTo(bNotNull, bNotNull))).select( - Alias(Coalesce(Seq(bNotNull, bNotNull)), "b")(), a, c) + Alias(Coalesce(Seq(b, b)), "b")(), a, c) checkAnalysis(naturalPlan, expected) checkAnalysis(usingPlan, expected) } From fff33829192fd5c35339aeda80d392ae2bfd486c Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Tue, 24 May 2016 23:16:30 -0700 Subject: [PATCH 08/10] Try to have a rule to fix nullability --- .../sql/catalyst/analysis/Analyzer.scala | 47 ++++++++++--------- .../sql/catalyst/planning/patterns.scala | 18 ------- 2 files changed, 26 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f433c4947f41..18008f41484c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.objects.NewInstance import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification -import org.apache.spark.sql.catalyst.planning.{ExtractJoinOutputAttributes, IntegerIndex} +import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} import org.apache.spark.sql.catalyst.rules._ @@ -109,8 +109,8 @@ class Analyzer( TimeWindowing :: TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), - Batch("Solve", Once, - SolveIllegalReferences), + Batch("FixNullability", Once, + FixNullability), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, @@ -1450,27 +1450,32 @@ class Analyzer( } /** - * Corrects attribute references in an expression tree of some operators (e.g., filters and - * projects) if these operators have a join as a child and the references point to columns on the - * input relation of the join. This is because some joins change the nullability of input columns - * and this could cause illegal optimization (e.g., NULL propagation) and wrong answers. + * Fixes nullability of Attributes in a resolved LogicalPlan by using the nullability of + * corresponding Attributes of its children output Attributes. This step is needed because + * users can use a resolved AttributeReference in the Dataset API and outer joins + * can change the nullability of an AttribtueReference. Without the fix, a nullable column's + * nullable field can be actually set as non-nullable, which cause illegal optimization + * (e.g., NULL propagation) and wrong answers. * See SPARK-13484 and SPARK-13801 for the concrete queries of this case. */ - object SolveIllegalReferences extends Rule[LogicalPlan] { + object FixNullability extends Rule[LogicalPlan] { - private def replaceReferences(e: Expression, attrMap: AttributeMap[Attribute]) = e.transform { - case a: AttributeReference => attrMap.get(a).getOrElse(a) - } - - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case q: LogicalPlan => - q.transform { - case f @ Filter(filterCondition, ExtractJoinOutputAttributes(join, joinOutputMap)) => - f.copy(condition = replaceReferences(filterCondition, joinOutputMap)) - case p @ Project(projectList, ExtractJoinOutputAttributes(join, joinOutputMap)) => - p.copy(projectList = projectList.map { e => - replaceReferences(e, joinOutputMap).asInstanceOf[NamedExpression] - }) + def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case q: LogicalPlan if q.resolved => + val childrenOutput = q.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { + case (exprId, attributes) => + // If there are multiple Attributes having the same ExpirId, we need to resolve + // the conflict of nullable field. + val nullable = attributes.map(_.nullable).reduce(_ || _) + attributes.map(attr => attr.withNullability(nullable)) + }.toSeq + val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr)) + // For an Attribute used by the current LogicalPlan, if it is from its children, + // we fix the nullable field by using the nullability setting of the corresponding + // output Attribute from the children. + q.transformExpressions { + case attr: Attribute if attributeMap.contains(attr) => + attr.withNullability(attributeMap(attr).nullable) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 1561de5ebaf0..c8f86f1f84d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -181,24 +181,6 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { } } -/** - * An extractor for join output attributes directly under a given operator. - */ -object ExtractJoinOutputAttributes { - - def unapply(plan: LogicalPlan): Option[(Join, AttributeMap[Attribute])] = { - plan.collectFirst { - case j: Join => j - }.map { join => - val joinOutput = new mutable.ArrayBuffer[(Attribute, Attribute)] - join.output.foreach { - case a: AttributeReference => joinOutput += ((a, a)) - } - (join, AttributeMap(joinOutput)) - } - } -} - /** * A pattern that collects all adjacent unions and returns their children as a Seq. */ From 127024da7e1058cd39b71e85c6dcd08b5e3e2b53 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 25 May 2016 20:24:42 -0700 Subject: [PATCH 09/10] update --- .../spark/sql/catalyst/analysis/Analyzer.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 18008f41484c..6a6a3dc812bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -109,12 +109,12 @@ class Analyzer( TimeWindowing :: TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), - Batch("FixNullability", Once, - FixNullability), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), + Batch("FixNullability", Once, + FixNullability), Batch("Cleanup", fixedPoint, CleanupAliases) ) @@ -1461,10 +1461,11 @@ class Analyzer( object FixNullability extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case q: LogicalPlan if q.resolved => - val childrenOutput = q.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { + case p if !p.resolved => p // Skip unresolved nodes. + case p: LogicalPlan if p.resolved => + val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { case (exprId, attributes) => - // If there are multiple Attributes having the same ExpirId, we need to resolve + // If there are multiple Attributes having the same ExprId, we need to resolve // the conflict of nullable field. val nullable = attributes.map(_.nullable).reduce(_ || _) attributes.map(attr => attr.withNullability(nullable)) @@ -1473,7 +1474,7 @@ class Analyzer( // For an Attribute used by the current LogicalPlan, if it is from its children, // we fix the nullable field by using the nullability setting of the corresponding // output Attribute from the children. - q.transformExpressions { + p.transformExpressions { case attr: Attribute if attributeMap.contains(attr) => attr.withNullability(attributeMap(attr).nullable) } From 071b670303489238fda5944c1b10d0cc11277ca9 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 1 Jun 2016 14:58:42 -0700 Subject: [PATCH 10/10] update --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 ++++-- .../org/apache/spark/sql/catalyst/planning/patterns.scala | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6a6a3dc812bb..977c2f329150 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1466,10 +1466,12 @@ class Analyzer( val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap { case (exprId, attributes) => // If there are multiple Attributes having the same ExprId, we need to resolve - // the conflict of nullable field. - val nullable = attributes.map(_.nullable).reduce(_ || _) + // the conflict of nullable field. We do not really expect this happen. + val nullable = attributes.exists(_.nullable) attributes.map(attr => attr.withNullability(nullable)) }.toSeq + // At here, we create an AttributeMap that only compare the exprId for the lookup + // operation. So, we can find the corresponding input attribute's nullability. val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr)) // For an Attribute used by the current LogicalPlan, if it is from its children, // we fix the nullable field by using the nullability setting of the corresponding diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index c8f86f1f84d7..f42e67ca6ec2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -181,6 +181,7 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper { } } + /** * A pattern that collects all adjacent unions and returns their children as a Seq. */