From 705ed462bb307871e65199ce02576f12d60d2176 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei <584620569@qq.com> Date: Sun, 25 Feb 2018 14:06:39 +0800 Subject: [PATCH 1/8] add constranits --- .../plans/logical/QueryPlanConstraints.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 5c7b8e5b97883..e26f3e87e28b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -29,12 +29,26 @@ trait QueryPlanConstraints { self: LogicalPlan => */ lazy val constraints: ExpressionSet = { if (conf.constraintPropagationEnabled) { + var relevantOutPutSet: AttributeSet = outputSet + constraints.foreach { + case eq @ EqualTo(l: Attribute, r: Attribute) => + if (l.references.subsetOf(relevantOutPutSet) + && !r.references.subsetOf(relevantOutPutSet)) { + relevantOutPutSet = relevantOutPutSet.++(r.references) + } + if (r.references.subsetOf(relevantOutPutSet) + && !l.references.subsetOf(relevantOutPutSet)) { + relevantOutPutSet = relevantOutPutSet.++(l.references) + } + case _ => // No inference + } + ExpressionSet( validConstraints .union(inferAdditionalConstraints(validConstraints)) .union(constructIsNotNullConstraints(validConstraints)) .filter { c => - c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic + c.references.nonEmpty && c.references.subsetOf(relevantOutPutSet) && c.deterministic } ) } else { From f44a92ad20895a94577cf2b4de54fc320b0f934b Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei <584620569@qq.com> Date: Tue, 27 Feb 2018 10:49:37 +0800 Subject: [PATCH 2/8] change code according to review --- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../plans/logical/QueryPlanConstraints.scala | 29 +++++-------------- 2 files changed, 8 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a28b6a0feb8f9..91208479be03b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -661,7 +661,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe case join @ Join(left, right, joinType, conditionOpt) => // Only consider constraints that can be pushed down completely to either the left or the // right child - val constraints = join.constraints.filter { c => + val constraints = join.allConstraints.filter { c => c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet) } // Remove those constraints that are already enforced by either the left or the right child diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index e26f3e87e28b9..adfdaf797f6ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -27,30 +27,15 @@ trait QueryPlanConstraints { self: LogicalPlan => * example, if this set contains the expression `a = 2` then that expression is guaranteed to * evaluate to `true` for all rows produced. */ + lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints + .union(inferAdditionalConstraints(validConstraints)) + .union(constructIsNotNullConstraints(validConstraints))) + lazy val constraints: ExpressionSet = { if (conf.constraintPropagationEnabled) { - var relevantOutPutSet: AttributeSet = outputSet - constraints.foreach { - case eq @ EqualTo(l: Attribute, r: Attribute) => - if (l.references.subsetOf(relevantOutPutSet) - && !r.references.subsetOf(relevantOutPutSet)) { - relevantOutPutSet = relevantOutPutSet.++(r.references) - } - if (r.references.subsetOf(relevantOutPutSet) - && !l.references.subsetOf(relevantOutPutSet)) { - relevantOutPutSet = relevantOutPutSet.++(l.references) - } - case _ => // No inference - } - - ExpressionSet( - validConstraints - .union(inferAdditionalConstraints(validConstraints)) - .union(constructIsNotNullConstraints(validConstraints)) - .filter { c => - c.references.nonEmpty && c.references.subsetOf(relevantOutPutSet) && c.deterministic - } - ) + ExpressionSet(allConstraints.filter { c => + c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic + }) } else { ExpressionSet(Set.empty) } From 1e0f78a50bd70a3f94382887a74cc70f7fefe3c6 Mon Sep 17 00:00:00 2001 From: hanghang <584620569@qq.com> Date: Tue, 27 Feb 2018 22:47:03 +0800 Subject: [PATCH 3/8] add test --- .../plans/logical/QueryPlanConstraints.scala | 12 ++++++++---- .../InferFiltersFromConstraintsSuite.scala | 13 +++++++++++++ 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index adfdaf797f6ff..09c37321bb5f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -23,14 +23,18 @@ import org.apache.spark.sql.catalyst.expressions._ trait QueryPlanConstraints { self: LogicalPlan => /** - * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For - * example, if this set contains the expression `a = 2` then that expression is guaranteed to - * evaluate to `true` for all rows produced. - */ + * An [[ExpressionSet]] that contains an additional set of constraints about equality constraints + * and `isNotNull` constraints. + */ lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints .union(inferAdditionalConstraints(validConstraints)) .union(constructIsNotNullConstraints(validConstraints))) + /** + * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For + * example, if this set contains the expression `a = 2` then that expression is guaranteed to + * evaluate to `true` for all rows produced. + */ lazy val constraints: ExpressionSet = { if (conf.constraintPropagationEnabled) { ExpressionSet(allConstraints.filter { c => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 178c4b8c270a0..0737e68beb5e6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -192,4 +192,17 @@ class InferFiltersFromConstraintsSuite extends PlanTest { comparePlans(Optimize.execute(original.analyze), correct.analyze) } + + test("single left-semi join: filter out nulls on either side on equi-join keys") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + val originalQuery = x.join(y, LeftSemi, + condition = Some("x.a".attr === "y.a".attr)).analyze + val left = x.where(IsNotNull('a)) + val right = y.where(IsNotNull('a)) + val correctAnswer = left.join(right, LeftSemi, condition = Some("x.a".attr === "y.a".attr)) + .analyze + val optimized = Optimize.execute(originalQuery) + comparePlans(optimized, correctAnswer) + } } From f7d764efa435327ba34e829da53c16a6ec16f403 Mon Sep 17 00:00:00 2001 From: hanghang <584620569@qq.com> Date: Tue, 27 Feb 2018 23:04:27 +0800 Subject: [PATCH 4/8] code style --- .../sql/catalyst/plans/logical/QueryPlanConstraints.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 09c37321bb5f8..6fb1923c5ed52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.expressions._ trait QueryPlanConstraints { self: LogicalPlan => /** - * An [[ExpressionSet]] that contains an additional set of constraints about equality constraints - * and `isNotNull` constraints. + * An [[ExpressionSet]] that contains an additional set of constraints about equality + * constraints and `isNotNull` constraints. */ lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints .union(inferAdditionalConstraints(validConstraints)) From b3f2ade5f1dc2ad3349f4dc21fe353590e8bbbfd Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei <584620569@qq.com> Date: Wed, 28 Feb 2018 17:12:26 +0800 Subject: [PATCH 5/8] code style --- .../plans/logical/QueryPlanConstraints.scala | 26 ++++++++++++------- .../InferFiltersFromConstraintsSuite.scala | 2 +- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 6fb1923c5ed52..8fb146155f3cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -23,18 +23,24 @@ import org.apache.spark.sql.catalyst.expressions._ trait QueryPlanConstraints { self: LogicalPlan => /** - * An [[ExpressionSet]] that contains an additional set of constraints about equality - * constraints and `isNotNull` constraints. - */ - lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints - .union(inferAdditionalConstraints(validConstraints)) - .union(constructIsNotNullConstraints(validConstraints))) + * An [[ExpressionSet]] that contains an additional set of constraints about equality + * constraints and `isNotNull` constraints. + */ + lazy val allConstraints: ExpressionSet = { + if (conf.constraintPropagationEnabled) { + ExpressionSet(validConstraints + .union(inferAdditionalConstraints(validConstraints)) + .union(constructIsNotNullConstraints(validConstraints))) + } else { + ExpressionSet(Set.empty) + } + } /** - * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For - * example, if this set contains the expression `a = 2` then that expression is guaranteed to - * evaluate to `true` for all rows produced. - */ + * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For + * example, if this set contains the expression `a = 2` then that expression is guaranteed to + * evaluate to `true` for all rows produced. + */ lazy val constraints: ExpressionSet = { if (conf.constraintPropagationEnabled) { ExpressionSet(allConstraints.filter { c => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 0737e68beb5e6..2f9a329064775 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -193,7 +193,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { comparePlans(Optimize.execute(original.analyze), correct.analyze) } - test("single left-semi join: filter out nulls on either side on equi-join keys") { + test("SPARK-23405:single left-semi join, filter out nulls on either side on equi-join keys") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) val originalQuery = x.join(y, LeftSemi, From ed5c170c35d8786df241921ac19d95520ace3836 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei <584620569@qq.com> Date: Wed, 28 Feb 2018 19:42:44 +0800 Subject: [PATCH 6/8] change code according to review --- .../sql/catalyst/plans/logical/QueryPlanConstraints.scala | 4 ++-- .../optimizer/InferFiltersFromConstraintsSuite.scala | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 8fb146155f3cf..5d3d44896c818 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.expressions._ trait QueryPlanConstraints { self: LogicalPlan => /** - * An [[ExpressionSet]] that contains an additional set of constraints about equality - * constraints and `isNotNull` constraints. + * An [[ExpressionSet]] that contains an additional set of constraints, such as equality + * constraints and `isNotNull` constraints, etc. */ lazy val allConstraints: ExpressionSet = { if (conf.constraintPropagationEnabled) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 2f9a329064775..2d2ce9e50cac0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -193,14 +193,14 @@ class InferFiltersFromConstraintsSuite extends PlanTest { comparePlans(Optimize.execute(original.analyze), correct.analyze) } - test("SPARK-23405:single left-semi join, filter out nulls on either side on equi-join keys") { + test("SPARK-23405: left-semi equal-join should filter out null join keys on both sides") { val x = testRelation.subquery('x) val y = testRelation.subquery('y) - val originalQuery = x.join(y, LeftSemi, - condition = Some("x.a".attr === "y.a".attr)).analyze + val condition = Some("x.a".attr === "y.a".attr) + val originalQuery = x.join(y, LeftSemi, condition).analyze val left = x.where(IsNotNull('a)) val right = y.where(IsNotNull('a)) - val correctAnswer = left.join(right, LeftSemi, condition = Some("x.a".attr === "y.a".attr)) + val correctAnswer = left.join(right, LeftSemi, condition) .analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) From 023f2f709db484d82cde22b00db0bad33ac72279 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei <584620569@qq.com> Date: Thu, 1 Mar 2018 08:55:37 +0800 Subject: [PATCH 7/8] change code according to review --- .../sql/catalyst/plans/logical/QueryPlanConstraints.scala | 8 +------- .../optimizer/InferFiltersFromConstraintsSuite.scala | 3 +-- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 5d3d44896c818..4d59ebba228d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -26,15 +26,9 @@ trait QueryPlanConstraints { self: LogicalPlan => * An [[ExpressionSet]] that contains an additional set of constraints, such as equality * constraints and `isNotNull` constraints, etc. */ - lazy val allConstraints: ExpressionSet = { - if (conf.constraintPropagationEnabled) { - ExpressionSet(validConstraints + lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints .union(inferAdditionalConstraints(validConstraints)) .union(constructIsNotNullConstraints(validConstraints))) - } else { - ExpressionSet(Set.empty) - } - } /** * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 2d2ce9e50cac0..f78c2356e35a5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -200,8 +200,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val originalQuery = x.join(y, LeftSemi, condition).analyze val left = x.where(IsNotNull('a)) val right = y.where(IsNotNull('a)) - val correctAnswer = left.join(right, LeftSemi, condition) - .analyze + val correctAnswer = left.join(right, LeftSemi, condition).analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) } From 709ed39052a032d0dc2258b2c637ab107d4b4df7 Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei <584620569@qq.com> Date: Thu, 1 Mar 2018 13:50:08 +0800 Subject: [PATCH 8/8] add if --- .../plans/logical/QueryPlanConstraints.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala index 4d59ebba228d5..046848875548b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala @@ -26,24 +26,24 @@ trait QueryPlanConstraints { self: LogicalPlan => * An [[ExpressionSet]] that contains an additional set of constraints, such as equality * constraints and `isNotNull` constraints, etc. */ - lazy val allConstraints: ExpressionSet = ExpressionSet(validConstraints + lazy val allConstraints: ExpressionSet = { + if (conf.constraintPropagationEnabled) { + ExpressionSet(validConstraints .union(inferAdditionalConstraints(validConstraints)) .union(constructIsNotNullConstraints(validConstraints))) + } else { + ExpressionSet(Set.empty) + } + } /** * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For * example, if this set contains the expression `a = 2` then that expression is guaranteed to * evaluate to `true` for all rows produced. */ - lazy val constraints: ExpressionSet = { - if (conf.constraintPropagationEnabled) { - ExpressionSet(allConstraints.filter { c => + lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c => c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic }) - } else { - ExpressionSet(Set.empty) - } - } /** * This method can be overridden by any child class of QueryPlan to specify a set of constraints