From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 01/11] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdb..5a5b71e52dd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da0..5e00546a74c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 02/11] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00a..47962ebe6ef8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c0..61d9dcd37572 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 03/11] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572..3427152b2da0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From a5f1c49127892b5463b0d25f4a1b670899aa616a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 3 Jan 2016 19:32:39 -0800 Subject: [PATCH 04/11] outer join elimination by Filter condition --- .../sql/catalyst/optimizer/Optimizer.scala | 74 ++++++++++ .../optimizer/OuterJoinEliminationSuite.scala | 138 ++++++++++++++++++ .../apache/spark/sql/DataFrameJoinSuite.scala | 48 ++++++ .../org/apache/spark/sql/JoinSuite.scala | 2 +- 4 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 55c168d552a4..f0477f3d5deb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -62,6 +62,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { SetOperationPushDown, SamplePushDown, ReorderJoin, + OuterJoinElimination, PushPredicateThroughJoin, PushPredicateThroughProject, PushPredicateThroughGenerate, @@ -931,6 +932,79 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { } } +/** + * Elimination of outer joins, if the predicates can restrict the result sets so that + * all null-supplying rows are eliminated + * + * - full outer -> inner if both sides have such predicates + * - left outer -> inner if the right side has such predicates + * - right outer -> inner if the left side has such predicates + * - full outer -> left outer if only the left side has such predicates + * - full outer -> right outer if only the right side has such predicates + * + * This rule should be executed before pushing down the Filter + */ +object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { + + private def containsAttr(plan: LogicalPlan, attr: Attribute): Boolean = + plan.outputSet.exists(_.semanticEquals(attr)) + + private def hasNullFilteringPredicate(predicate: Expression, plan: LogicalPlan): Boolean = { + predicate match { + case EqualTo(ar: AttributeReference, _) if containsAttr(plan, ar) => true + case EqualTo(_, ar: AttributeReference) if containsAttr(plan, ar) => true + case EqualNullSafe(ar: AttributeReference, l) + if !l.nullable && containsAttr(plan, ar) => true + case EqualNullSafe(l, ar: AttributeReference) + if !l.nullable && containsAttr(plan, ar) => true + case GreaterThan(ar: AttributeReference, _) if containsAttr(plan, ar) => true + case GreaterThan(_, ar: AttributeReference) if containsAttr(plan, ar) => true + case GreaterThanOrEqual(ar: AttributeReference, _) if containsAttr(plan, ar) => true + case GreaterThanOrEqual(_, ar: AttributeReference) if containsAttr(plan, ar) => true + case LessThan(ar: AttributeReference, _) if containsAttr(plan, ar) => true + case LessThan(_, ar: AttributeReference) if containsAttr(plan, ar) => true + case LessThanOrEqual(ar: AttributeReference, _) if containsAttr(plan, ar) => true + case LessThanOrEqual(_, ar: AttributeReference) if containsAttr(plan, ar) => true + case In(ar: AttributeReference, _) if containsAttr(plan, ar) => true + case IsNotNull(ar: AttributeReference) if containsAttr(plan, ar) => true + case And(l, r) => hasNullFilteringPredicate(l, plan) || hasNullFilteringPredicate(r, plan) + case Or(l, r) => hasNullFilteringPredicate(l, plan) && hasNullFilteringPredicate(r, plan) + case Not(e) => !hasNullFilteringPredicate(e, plan) + case _ => false + } + } + + private def buildNewJoin( + otherCondition: Expression, + left: LogicalPlan, + right: LogicalPlan, + joinType: JoinType, + condition: Option[Expression]): Join = { + val leftHasNonNullPredicate = hasNullFilteringPredicate(otherCondition, left) + val rightHasNonNullPredicate = hasNullFilteringPredicate(otherCondition, right) + + joinType match { + case RightOuter if leftHasNonNullPredicate => + Join(left, right, Inner, condition) + case LeftOuter if rightHasNonNullPredicate => + Join(left, right, Inner, condition) + case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => + Join(left, right, Inner, condition) + case FullOuter if leftHasNonNullPredicate => + Join(left, right, LeftOuter, condition) + case FullOuter if rightHasNonNullPredicate => + Join(left, right, RightOuter, condition) + case _ => Join(left, right, joinType, condition) + } + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Only three outer join types are eligible: RightOuter|LeftOuter|FullOuter + case f @ Filter(filterCond, j @ Join(left, right, RightOuter|LeftOuter|FullOuter, joinCond)) => + Filter(filterCond, buildNewJoin(filterCond, left, right, j.joinType, joinCond)) + } +} + /** * Pushes down [[Filter]] operators where the `condition` can be * evaluated using only the attributes of the left or right side of a join. Other diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala new file mode 100644 index 000000000000..da3548118f42 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class OuterJoinEliminationSuite extends PlanTest { + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubQueries) :: + Batch("Outer Join Elimination", Once, + OuterJoinElimination, + PushPredicateThroughJoin) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + val testRelation1 = LocalRelation('d.int, 'e.int, 'f.int) + + test("joins: full outer to inner") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)) + .where("x.b".attr >= 1 && "y.d".attr >= 2) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b >= 1) + val right = testRelation1.where('d >= 2) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: full outer to right") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("y.d".attr > 2) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where('d > 2) + val correctAnswer = + left.join(right, RightOuter, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: full outer to left") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)).where("x.a".attr <=> 2) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('a <=> 2) + val right = testRelation1 + val correctAnswer = + left.join(right, LeftOuter, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: right to inner") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, RightOuter, Option("x.a".attr === "y.d".attr)).where("x.b".attr > 2) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation.where('b > 2) + val right = testRelation1 + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: left to inner") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where("y.e".attr.isNotNull) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where('e.isNotNull) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: left to inner with complicated filter predicates") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where(!'e.isNull || ('d.isNotNull && 'f.isNull)) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where(!'e.isNull || ('d.isNotNull && 'f.isNull)) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } +} \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index a5e5f156423c..067a62d011ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.Join import org.apache.spark.sql.execution.joins.BroadcastHashJoin import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -156,4 +158,50 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(df1.join(broadcast(pf1)).count() === 4) } } + + test("join - outer join conversion") { + val df = Seq((1, 2, "1"), (3, 4, "3")).toDF("int", "int2", "str").as("a") + val df2 = Seq((1, 3, "1"), (5, 6, "5")).toDF("int", "int2", "str").as("b") + + // outer -> left + val outerJoin2Left = df.join(df2, $"a.int" === $"b.int", "outer").where($"a.int" === 3) + assert(outerJoin2Left.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, LeftOuter, _) => j }.size === 1) + checkAnswer( + outerJoin2Left, + Row(3, 4, "3", null, null, null) :: Nil) + + // outer -> right + val outerJoin2Right = df.join(df2, $"a.int" === $"b.int", "outer").where($"b.int" === 5) + assert(outerJoin2Right.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, RightOuter, _) => j }.size === 1) + checkAnswer( + outerJoin2Right, + Row(null, null, null, 5, 6, "5") :: Nil) + + // outer -> inner + val outerJoin2Inner = df.join(df2, $"a.int" === $"b.int", "outer"). + where($"a.int" === 1 && $"b.int2" === 3) + assert(outerJoin2Inner.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, Inner, _) => j }.size === 1) + checkAnswer( + outerJoin2Inner, + Row(1, 2, "1", 1, 3, "1") :: Nil) + + // right -> inner + val rightJoin2Inner = df.join(df2, $"a.int" === $"b.int", "right").where($"a.int" === 1) + assert(rightJoin2Inner.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, Inner, _) => j }.size === 1) + checkAnswer( + rightJoin2Inner, + Row(1, 2, "1", 1, 3, "1") :: Nil) + + // left -> inner + val leftJoin2Inner = df.join(df2, $"a.int" === $"b.int", "left").where($"b.int2" === 3) + assert(leftJoin2Inner.queryExecution.optimizedPlan.collect { + case j @ Join(_, _, Inner, _) => j }.size === 1) + checkAnswer( + leftJoin2Inner, + Row(1, 2, "1", 1, 3, "1") :: Nil) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 92ff7e73fad8..d972822a7dde 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -81,7 +81,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]), ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeOuterJoin]), ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", - classOf[SortMergeOuterJoin]), + classOf[SortMergeJoin]), // conversion from Right Outer to Inner ("SELECT * FROM testData right join testData2 ON key = a and key = 2", classOf[SortMergeOuterJoin]), ("SELECT * FROM testData full outer join testData2 ON key = a", From a61272a0fae7dfd19e36d77b11d669e89ebaa9bc Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 3 Jan 2016 19:52:10 -0800 Subject: [PATCH 05/11] style fix. --- .../sql/catalyst/optimizer/OuterJoinEliminationSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index da3548118f42..985cfdfc1541 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -135,4 +135,4 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } -} \ No newline at end of file +} From aadb8e3119fb4a98b96e118f8d3703579f26d466 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 10 Feb 2016 23:20:53 -0800 Subject: [PATCH 06/11] use the latest null propagation. --- .../sql/catalyst/optimizer/Optimizer.scala | 60 +++++-------------- .../spark/sql/catalyst/plans/QueryPlan.scala | 2 + .../optimizer/OuterJoinEliminationSuite.scala | 3 +- 3 files changed, 19 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f0477f3d5deb..78fabc0df31f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -946,62 +946,32 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { */ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { - private def containsAttr(plan: LogicalPlan, attr: Attribute): Boolean = - plan.outputSet.exists(_.semanticEquals(attr)) - - private def hasNullFilteringPredicate(predicate: Expression, plan: LogicalPlan): Boolean = { - predicate match { - case EqualTo(ar: AttributeReference, _) if containsAttr(plan, ar) => true - case EqualTo(_, ar: AttributeReference) if containsAttr(plan, ar) => true - case EqualNullSafe(ar: AttributeReference, l) - if !l.nullable && containsAttr(plan, ar) => true - case EqualNullSafe(l, ar: AttributeReference) - if !l.nullable && containsAttr(plan, ar) => true - case GreaterThan(ar: AttributeReference, _) if containsAttr(plan, ar) => true - case GreaterThan(_, ar: AttributeReference) if containsAttr(plan, ar) => true - case GreaterThanOrEqual(ar: AttributeReference, _) if containsAttr(plan, ar) => true - case GreaterThanOrEqual(_, ar: AttributeReference) if containsAttr(plan, ar) => true - case LessThan(ar: AttributeReference, _) if containsAttr(plan, ar) => true - case LessThan(_, ar: AttributeReference) if containsAttr(plan, ar) => true - case LessThanOrEqual(ar: AttributeReference, _) if containsAttr(plan, ar) => true - case LessThanOrEqual(_, ar: AttributeReference) if containsAttr(plan, ar) => true - case In(ar: AttributeReference, _) if containsAttr(plan, ar) => true - case IsNotNull(ar: AttributeReference) if containsAttr(plan, ar) => true - case And(l, r) => hasNullFilteringPredicate(l, plan) || hasNullFilteringPredicate(r, plan) - case Or(l, r) => hasNullFilteringPredicate(l, plan) && hasNullFilteringPredicate(r, plan) - case Not(e) => !hasNullFilteringPredicate(e, plan) - case _ => false - } - } + private def buildNewJoin(filter: Filter, join: Join): Join = { - private def buildNewJoin( - otherCondition: Expression, - left: LogicalPlan, - right: LogicalPlan, - joinType: JoinType, - condition: Option[Expression]): Join = { - val leftHasNonNullPredicate = hasNullFilteringPredicate(otherCondition, left) - val rightHasNonNullPredicate = hasNullFilteringPredicate(otherCondition, right) + val leftHasNonNullPredicate = filter.constraints.filter(_.isInstanceOf[IsNotNull]) + .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty) + val rightHasNonNullPredicate = filter.constraints.filter(_.isInstanceOf[IsNotNull]) + .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty) - joinType match { + join.joinType match { case RightOuter if leftHasNonNullPredicate => - Join(left, right, Inner, condition) + Join(join.left, join.right, Inner, join.condition) case LeftOuter if rightHasNonNullPredicate => - Join(left, right, Inner, condition) + Join(join.left, join.right, Inner, join.condition) case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => - Join(left, right, Inner, condition) + Join(join.left, join.right, Inner, join.condition) case FullOuter if leftHasNonNullPredicate => - Join(left, right, LeftOuter, condition) + Join(join.left, join.right, LeftOuter, join.condition) case FullOuter if rightHasNonNullPredicate => - Join(left, right, RightOuter, condition) - case _ => Join(left, right, joinType, condition) + Join(join.left, join.right, RightOuter, join.condition) + case _ => + join } } def apply(plan: LogicalPlan): LogicalPlan = plan transform { - // Only three outer join types are eligible: RightOuter|LeftOuter|FullOuter - case f @ Filter(filterCond, j @ Join(left, right, RightOuter|LeftOuter|FullOuter, joinCond)) => - Filter(filterCond, buildNewJoin(filterCond, left, right, j.joinType, joinCond)) + case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) => + Filter(condition, buildNewJoin(f, j)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 05f5bdbfc076..ef42f839065d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -56,6 +56,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy Set(IsNotNull(l), IsNotNull(r)) case LessThanOrEqual(l, r) => Set(IsNotNull(l), IsNotNull(r)) + case EqualNullSafe(l, r) if !r.nullable || !l.nullable => + Set(IsNotNull(l), IsNotNull(r)) case _ => Set.empty[Expression] }.foldLeft(Set.empty[Expression])(_ union _.toSet) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index 985cfdfc1541..c8d272588403 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -119,7 +119,8 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - test("joins: left to inner with complicated filter predicates") { + // Need to enhance constructIsNotNullConstraints to support OR + ignore("joins: left to inner with complicated filter predicates") { val x = testRelation.subquery('x) val y = testRelation1.subquery('y) From 75efacec46170380b8d6ef551de64696cd2f555d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 11 Feb 2016 06:43:38 -0800 Subject: [PATCH 07/11] style fix. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../sql/catalyst/optimizer/OuterJoinEliminationSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 78fabc0df31f..a967aa7a030f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -21,8 +21,8 @@ import scala.collection.immutable.HashSet import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.planning.{ExtractFiltersAndInnerJoins, Unions} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index c8d272588403..d03a1a23e63a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.dsl.expressions._ class OuterJoinEliminationSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { From 618f128e50ae9dee591822d78b85ea697693e893 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 18 Feb 2016 12:35:53 -0800 Subject: [PATCH 08/11] added canFilterOutNull to the rule --- .../sql/catalyst/optimizer/Optimizer.scala | 26 ++++++-- .../optimizer/OuterJoinEliminationSuite.scala | 60 ++++++++++++++++++- 2 files changed, 80 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a967aa7a030f..48aa666e3982 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -946,12 +946,30 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { */ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { + /** + * Returns whether the expression returns null or false when all inputs are nulls. + */ + private def canFilterOutNull(e: Expression): Boolean = { + val attributes = e.references.toSeq + val emptyRow = new GenericInternalRow(attributes.length) + val v = BindReferences.bindReference(e, attributes).eval(emptyRow) + v == null || v == false + } + private def buildNewJoin(filter: Filter, join: Join): Join = { - val leftHasNonNullPredicate = filter.constraints.filter(_.isInstanceOf[IsNotNull]) - .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty) - val rightHasNonNullPredicate = filter.constraints.filter(_.isInstanceOf[IsNotNull]) - .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty) + val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition) + val leftConditions = splitConjunctiveConditions + .filter(_.references.subsetOf(join.left.outputSet)) + val rightConditions = splitConjunctiveConditions + .filter(_.references.subsetOf(join.right.outputSet)) + + val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) || + filter.constraints.filter(_.isInstanceOf[IsNotNull]) + .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty) + val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) || + filter.constraints.filter(_.isInstanceOf[IsNotNull]) + .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty) join.joinType match { case RightOuter if leftHasNonNullPredicate => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index d03a1a23e63a..a1dc836a5fd1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -119,8 +119,8 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } - // Need to enhance constructIsNotNullConstraints to support OR - ignore("joins: left to inner with complicated filter predicates") { + // evaluating if mixed OR and NOT expressions can eliminate all null-supplying rows + test("joins: left to inner with complicated filter predicates #1") { val x = testRelation.subquery('x) val y = testRelation1.subquery('y) @@ -136,4 +136,60 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + // eval(emptyRow) of 'e.in(1, 2) will return null instead of false + test("joins: left to inner with complicated filter predicates #2") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where('e.in(1, 2)) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where('e.in(1, 2)) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + // evaluating if mixed OR and AND expressions can eliminate all null-supplying rows + test("joins: left to inner with complicated filter predicates #3") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1.where((!'e.isNull || ('d.isNotNull && 'f.isNull)) && 'e.isNull) + val correctAnswer = + left.join(right, Inner, Option("a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + // evaluating if the expressions that have both left and right attributes + // can eliminate all null-supplying rows + // FULL OUTER => INNER + test("joins: left to inner with complicated filter predicates #4") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)) + .where("x.b".attr + 3 === "y.e".attr) + + val optimized = Optimize.execute(originalQuery.analyze) + val left = testRelation + val right = testRelation1 + val correctAnswer = + left.join(right, Inner, Option("b".attr + 3 === "e".attr && "a".attr === "d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } } From 6977fdf392be4e742425568d098c0a1b4337b789 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 18 Feb 2016 12:38:55 -0800 Subject: [PATCH 09/11] style fix. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 48aa666e3982..4b4d5300d243 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -957,7 +957,6 @@ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { } private def buildNewJoin(filter: Filter, join: Join): Join = { - val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition) val leftConditions = splitConjunctiveConditions .filter(_.references.subsetOf(join.left.outputSet)) From cc0262c909cfb573e578b6adaf19ad4cf58cebd9 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 19 Feb 2016 10:57:30 -0800 Subject: [PATCH 10/11] address comments. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 +++- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 2 -- sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4b4d5300d243..e56dcd713282 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -950,6 +950,7 @@ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { * Returns whether the expression returns null or false when all inputs are nulls. */ private def canFilterOutNull(e: Expression): Boolean = { + if (!e.deterministic) return false val attributes = e.references.toSeq val emptyRow = new GenericInternalRow(attributes.length) val v = BindReferences.bindReference(e, attributes).eval(emptyRow) @@ -988,7 +989,8 @@ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) => - Filter(condition, buildNewJoin(f, j)) + val newJoin = buildNewJoin(f, j) + if (j.joinType == newJoin.joinType) f else Filter(condition, newJoin) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index ef42f839065d..05f5bdbfc076 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -56,8 +56,6 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy Set(IsNotNull(l), IsNotNull(r)) case LessThanOrEqual(l, r) => Set(IsNotNull(l), IsNotNull(r)) - case EqualNullSafe(l, r) if !r.nullable || !l.nullable => - Set(IsNotNull(l), IsNotNull(r)) case _ => Set.empty[Expression] }.foldLeft(Set.empty[Expression])(_ union _.toSet) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index d972822a7dde..8f2a0c035136 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -81,7 +81,7 @@ class JoinSuite extends QueryTest with SharedSQLContext { ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]), ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeOuterJoin]), ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", - classOf[SortMergeJoin]), // conversion from Right Outer to Inner + classOf[SortMergeJoin]), // converted from Right Outer to Inner ("SELECT * FROM testData right join testData2 ON key = a and key = 2", classOf[SortMergeOuterJoin]), ("SELECT * FROM testData full outer join testData2 ON key = a", From 82357e0c0e396f5631d53d3d8151e9f406c5337d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 19 Feb 2016 20:28:25 -0800 Subject: [PATCH 11/11] address comments. --- .../sql/catalyst/optimizer/Optimizer.scala | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e56dcd713282..b7d8d932edfc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -957,7 +957,7 @@ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { v == null || v == false } - private def buildNewJoin(filter: Filter, join: Join): Join = { + private def buildNewJoinType(filter: Filter, join: Join): JoinType = { val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition) val leftConditions = splitConjunctiveConditions .filter(_.references.subsetOf(join.left.outputSet)) @@ -972,25 +972,19 @@ object OuterJoinElimination extends Rule[LogicalPlan] with PredicateHelper { .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty) join.joinType match { - case RightOuter if leftHasNonNullPredicate => - Join(join.left, join.right, Inner, join.condition) - case LeftOuter if rightHasNonNullPredicate => - Join(join.left, join.right, Inner, join.condition) - case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => - Join(join.left, join.right, Inner, join.condition) - case FullOuter if leftHasNonNullPredicate => - Join(join.left, join.right, LeftOuter, join.condition) - case FullOuter if rightHasNonNullPredicate => - Join(join.left, join.right, RightOuter, join.condition) - case _ => - join + case RightOuter if leftHasNonNullPredicate => Inner + case LeftOuter if rightHasNonNullPredicate => Inner + case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner + case FullOuter if leftHasNonNullPredicate => LeftOuter + case FullOuter if rightHasNonNullPredicate => RightOuter + case o => o } } def apply(plan: LogicalPlan): LogicalPlan = plan transform { case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) => - val newJoin = buildNewJoin(f, j) - if (j.joinType == newJoin.joinType) f else Filter(condition, newJoin) + val newJoinType = buildNewJoinType(f, j) + if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) } }