From 33c10a0994c9802df901f211e1f28c52e34df27f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Nov 2016 00:00:55 -0800 Subject: [PATCH 1/6] fix. --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 2 +- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index c842f85af693..14160b0ef497 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -98,7 +98,7 @@ trait NamedExpression extends Expression { } } -abstract class Attribute extends LeafExpression with NamedExpression with NullIntolerant { +abstract class Attribute extends LeafExpression with NamedExpression { override def references: AttributeSet = AttributeSet(this) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f5bc8785d5a2..58d89ce6e9ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1720,6 +1720,11 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { dates.intersect(widenTypedRows).collect() } + test("SPARK-17897: Attribute is not NullIntolerant") { + val data = Seq[java.lang.Integer](1, null).toDF("key") + checkAnswer(data.filter("not key is not null"), Row(null)) + } + test("SPARK-18070 binary operator should not consider nullability when comparing input types") { val rows = Seq(Row(Seq(1), Seq(1))) val schema = new StructType() From 025632a6897abd4901254688a049079ed7358e93 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Nov 2016 13:26:02 -0800 Subject: [PATCH 2/6] fix. --- .../expressions/namedExpressions.scala | 2 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 25 ++++++++++++++----- .../org/apache/spark/sql/DataFrameSuite.scala | 11 ++++---- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 14160b0ef497..c842f85af693 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -98,7 +98,7 @@ trait NamedExpression extends Expression { } } -abstract class Attribute extends LeafExpression with NamedExpression { +abstract class Attribute extends LeafExpression with NamedExpression with NullIntolerant { override def references: AttributeSet = AttributeSet(this) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 45ee2964d4db..1da092082532 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -40,14 +40,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } /** - * Infers a set of `isNotNull` constraints from a given set of equality/comparison expressions as - * well as non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this + * Infers a set of `isNotNull` constraints from null intolerant expressions as well as + * non-nullable attributes. For e.g., if an expression is of the form (`a > 5`), this * returns a constraint of the form `isNotNull(a)` */ private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = { // First, we propagate constraints from the null intolerant expressions. - var isNotNullConstraints: Set[Expression] = - constraints.flatMap(scanNullIntolerantExpr).map(IsNotNull(_)) + var isNotNullConstraints: Set[Expression] = constraints.flatMap(generateIsNotNullConstraints) // Second, we infer additional constraints from non-nullable attributes that are part of the // operator's output @@ -57,14 +56,28 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT isNotNullConstraints -- constraints } + /** + * Generate IsNotNull constraints from the null intolerant child expressions of constraints. + */ + private def generateIsNotNullConstraints(constraint: Expression): Seq[Expression] = + constraint match { + case IsNotNull(_: Attribute) => constraint :: Nil + // When the root is IsNotNull, we can push IsNotNull through the child null intolerant + // expressions. + case IsNotNull(expr) => scanNullIntolerantExpr(expr).map(IsNotNull(_)) + // Constraints always return true for all the inputs. That means, null will never be returned. + // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child + // null intolerant expressions. + case _ => scanNullIntolerantExpr(constraint).map(IsNotNull(_)) + } + /** * Recursively explores the expressions which are null intolerant and returns all attributes * in these expressions. */ private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match { case a: Attribute => Seq(a) - case _: NullIntolerant | IsNotNull(_: NullIntolerant) => - expr.children.flatMap(scanNullIntolerantExpr) + case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantExpr) case _ => Seq.empty[Attribute] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 58d89ce6e9ed..0d2e749a276f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1697,6 +1697,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { expr = "cast((_1 + _2) as boolean)", expectedNonNullableColumns = Seq("_1", "_2")) } + test("SPARK-17897: Attribute is not NullIntolerant") { + val data = Seq[java.lang.Integer](1, null).toDF("key") + checkAnswer(data.filter("not key is not null"), Row(null)) + checkAnswer(data.filter("not ((- key) is not null)"), Row(null)) + } + test("SPARK-17957: outer join + na.fill") { val df1 = Seq((1, 2), (2, 3)).toDF("a", "b") val df2 = Seq((2, 5), (3, 4)).toDF("a", "c") @@ -1720,11 +1726,6 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { dates.intersect(widenTypedRows).collect() } - test("SPARK-17897: Attribute is not NullIntolerant") { - val data = Seq[java.lang.Integer](1, null).toDF("key") - checkAnswer(data.filter("not key is not null"), Row(null)) - } - test("SPARK-18070 binary operator should not consider nullability when comparing input types") { val rows = Seq(Row(Seq(1), Seq(1))) val schema = new StructType() From 0722ae52d4b4031b4ff2751d22c787b070547fa0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Nov 2016 13:36:42 -0800 Subject: [PATCH 3/6] improve the comments. --- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 1da092082532..f3ac3e90fe2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -57,13 +57,14 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } /** - * Generate IsNotNull constraints from the null intolerant child expressions of constraints. + * Generate Attribute-specific IsNotNull constraints from the null intolerant child expressions + * of constraints. */ private def generateIsNotNullConstraints(constraint: Expression): Seq[Expression] = constraint match { case IsNotNull(_: Attribute) => constraint :: Nil // When the root is IsNotNull, we can push IsNotNull through the child null intolerant - // expressions. + // expressions case IsNotNull(expr) => scanNullIntolerantExpr(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child From f693040d8bd1bfcf7ddeda7a6eabfce1de08c62a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Nov 2016 14:22:38 -0800 Subject: [PATCH 4/6] rename the func --- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index f3ac3e90fe2d..6236e06c69ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -46,7 +46,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT */ private def constructIsNotNullConstraints(constraints: Set[Expression]): Set[Expression] = { // First, we propagate constraints from the null intolerant expressions. - var isNotNullConstraints: Set[Expression] = constraints.flatMap(generateIsNotNullConstraints) + var isNotNullConstraints: Set[Expression] = constraints.flatMap(inferIsNotNullConstraints) // Second, we infer additional constraints from non-nullable attributes that are part of the // operator's output @@ -57,10 +57,10 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } /** - * Generate Attribute-specific IsNotNull constraints from the null intolerant child expressions + * Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions * of constraints. */ - private def generateIsNotNullConstraints(constraint: Expression): Seq[Expression] = + private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] = constraint match { case IsNotNull(_: Attribute) => constraint :: Nil // When the root is IsNotNull, we can push IsNotNull through the child null intolerant From a835f804d593151578c7227cc4c240b0c5c27754 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Nov 2016 19:42:33 -0800 Subject: [PATCH 5/6] address comments. --- .../sql/catalyst/plans/ConstraintPropagationSuite.scala | 9 +++++++++ .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 8068ce922e63..a191aa8fee70 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -351,6 +351,15 @@ class ConstraintPropagationSuite extends SparkFunSuite { IsNotNull(IsNotNull(resolveColumn(tr, "b"))), IsNotNull(resolveColumn(tr, "a")), IsNotNull(resolveColumn(tr, "c"))))) + + verifyConstraints( + tr.where('a.attr === 1 && IsNotNull(resolveColumn(tr, "b")) && + IsNotNull(resolveColumn(tr, "c"))).analyze.constraints, + ExpressionSet(Seq( + resolveColumn(tr, "a") === 1, + IsNotNull(resolveColumn(tr, "c")), + IsNotNull(resolveColumn(tr, "a")), + IsNotNull(resolveColumn(tr, "b"))))) } test("infer IsNotNull constraints from non-nullable attributes") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 0d2e749a276f..cdadd7b16504 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1697,7 +1697,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { expr = "cast((_1 + _2) as boolean)", expectedNonNullableColumns = Seq("_1", "_2")) } - test("SPARK-17897: Attribute is not NullIntolerant") { + test("SPARK-17897: Fixed IsNotNull Constraint Inference Rule") { val data = Seq[java.lang.Integer](1, null).toDF("key") checkAnswer(data.filter("not key is not null"), Row(null)) checkAnswer(data.filter("not ((- key) is not null)"), Row(null)) From 54c0dd10d4aabc4700d4a33206c481703c16fb83 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 29 Nov 2016 23:02:51 -0800 Subject: [PATCH 6/6] address comments. --- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 9 ++++----- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 6236e06c69ea..b108017c4c48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -62,23 +62,22 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT */ private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] = constraint match { - case IsNotNull(_: Attribute) => constraint :: Nil // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantExpr(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantExpr(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) } /** * Recursively explores the expressions which are null intolerant and returns all attributes * in these expressions. */ - private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = expr match { + private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match { case a: Attribute => Seq(a) - case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantExpr) + case _: NullIntolerant => expr.children.flatMap(scanNullIntolerantAttribute) case _ => Seq.empty[Attribute] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index cdadd7b16504..312cd17c26d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1699,8 +1699,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("SPARK-17897: Fixed IsNotNull Constraint Inference Rule") { val data = Seq[java.lang.Integer](1, null).toDF("key") - checkAnswer(data.filter("not key is not null"), Row(null)) - checkAnswer(data.filter("not ((- key) is not null)"), Row(null)) + checkAnswer(data.filter(!$"key".isNotNull), Row(null)) + checkAnswer(data.filter(!(- $"key").isNotNull), Row(null)) } test("SPARK-17957: outer join + na.fill") {