From 6337c773c48fb18f9718bb967c437b66c5852275 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 25 Oct 2019 23:47:18 +0800 Subject: [PATCH 1/5] Improve EliminateOuterJoin performance --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 5 +++-- .../sql/catalyst/plans/ConstraintPropagationSuite.scala | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index ec9bf90247f8..c50d8c54eba8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -170,13 +170,14 @@ abstract class UnaryNode extends LogicalPlan { * original constraint expressions with the corresponding alias */ protected def getAllValidConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - var allConstraints = child.constraints.asInstanceOf[Set[Expression]] + val childConstraints = child.constraints + var allConstraints = childConstraints.asInstanceOf[Set[Expression]] projectList.foreach { case a @ Alias(l: Literal, _) => allConstraints += EqualNullSafe(a.toAttribute, l) case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. - allConstraints ++= allConstraints.map(_ transform { + allConstraints ++= childConstraints.map(_ transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute }) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 5ad748b6113d..5c1ea47ab2a3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -135,15 +135,13 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest { ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "x")), resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze, "y"), - resolveColumn(aliasedRelation.analyze, "z") <=> resolveColumn(aliasedRelation.analyze, "x"), resolveColumn(aliasedRelation.analyze, "z") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "z"))))) val multiAlias = tr.where('a === 'c + 10).select('a.as('x), 'c.as('y)) verifyConstraints(multiAlias.analyze.constraints, ExpressionSet(Seq(IsNotNull(resolveColumn(multiAlias.analyze, "x")), - IsNotNull(resolveColumn(multiAlias.analyze, "y")), - resolveColumn(multiAlias.analyze, "x") === resolveColumn(multiAlias.analyze, "y") + 10)) + IsNotNull(resolveColumn(multiAlias.analyze, "y")))) ) } From 719d81217e99ad15d018d227cd8fe62752080bb6 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 16 Nov 2019 00:04:39 +0800 Subject: [PATCH 2/5] Try 2 --- .../catalyst/plans/logical/LogicalPlan.scala | 24 ++++++++++++++----- .../plans/ConstraintPropagationSuite.scala | 4 +++- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index c50d8c54eba8..1839fadc6a8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -170,17 +170,29 @@ abstract class UnaryNode extends LogicalPlan { * original constraint expressions with the corresponding alias */ protected def getAllValidConstraints(projectList: Seq[NamedExpression]): Set[Expression] = { - val childConstraints = child.constraints - var allConstraints = childConstraints.asInstanceOf[Set[Expression]] + var allConstraints = child.constraints.asInstanceOf[Set[Expression]] projectList.foreach { case a @ Alias(l: Literal, _) => allConstraints += EqualNullSafe(a.toAttribute, l) case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. - allConstraints ++= childConstraints.map(_ transform { - case expr: Expression if expr.semanticEquals(e) => - a.toAttribute - }) + allConstraints ++= allConstraints.map { + case binaryExpression @ EqualNullSafe(_: BinaryExpression, _) => + binaryExpression + case ternaryExpression @ EqualNullSafe(_: TernaryExpression, _) => + ternaryExpression + case quaternaryExpression @ EqualNullSafe(_: QuaternaryExpression, _) => + quaternaryExpression + case septenaryExpression @ EqualNullSafe(_: SeptenaryExpression, _) => + septenaryExpression + case userDefinedExpression @ EqualNullSafe(_: UserDefinedExpression, _) => + userDefinedExpression + case other => + other transform { + case expr: Expression if expr.semanticEquals(e) => + a.toAttribute + } + } allConstraints += EqualNullSafe(e, a.toAttribute) case _ => // Don't change. } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 5c1ea47ab2a3..5ad748b6113d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -135,13 +135,15 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest { ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "x")), resolveColumn(aliasedRelation.analyze, "b") <=> resolveColumn(aliasedRelation.analyze, "y"), + resolveColumn(aliasedRelation.analyze, "z") <=> resolveColumn(aliasedRelation.analyze, "x"), resolveColumn(aliasedRelation.analyze, "z") > 10, IsNotNull(resolveColumn(aliasedRelation.analyze, "z"))))) val multiAlias = tr.where('a === 'c + 10).select('a.as('x), 'c.as('y)) verifyConstraints(multiAlias.analyze.constraints, ExpressionSet(Seq(IsNotNull(resolveColumn(multiAlias.analyze, "x")), - IsNotNull(resolveColumn(multiAlias.analyze, "y")))) + IsNotNull(resolveColumn(multiAlias.analyze, "y")), + resolveColumn(multiAlias.analyze, "x") === resolveColumn(multiAlias.analyze, "y") + 10)) ) } From ca4480e902398f3b32583380e11b85c81970c185 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 16 Nov 2019 17:48:09 +0800 Subject: [PATCH 3/5] Avoid generating too many constraints --- .../catalyst/plans/logical/LogicalPlan.scala | 21 +++++++------------ .../plans/ConstraintPropagationSuite.scala | 8 +++++++ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 1839fadc6a8c..0e1659c87ea7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -177,20 +177,13 @@ abstract class UnaryNode extends LogicalPlan { case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. allConstraints ++= allConstraints.map { - case binaryExpression @ EqualNullSafe(_: BinaryExpression, _) => - binaryExpression - case ternaryExpression @ EqualNullSafe(_: TernaryExpression, _) => - ternaryExpression - case quaternaryExpression @ EqualNullSafe(_: QuaternaryExpression, _) => - quaternaryExpression - case septenaryExpression @ EqualNullSafe(_: SeptenaryExpression, _) => - septenaryExpression - case userDefinedExpression @ EqualNullSafe(_: UserDefinedExpression, _) => - userDefinedExpression - case other => - other transform { - case expr: Expression if expr.semanticEquals(e) => - a.toAttribute + case e @ EqualNullSafe(l, _: AttributeReference) + if !l.isInstanceOf[AttributeReference] => e + case e @ EqualNullSafe(_: AttributeReference, r) + if !r.isInstanceOf[AttributeReference] => e + case other => other transform { + case expr: Expression if expr.semanticEquals(e) => + a.toAttribute } } allConstraints += EqualNullSafe(e, a.toAttribute) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index 5ad748b6113d..22bd6f78cbd3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -422,4 +422,12 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest { assert(aliasedRelation.analyze.constraints.isEmpty) } } + + test("SPARK-29606 Avoid generating too many constraints") { + val aliasedRelation = LocalRelation('a.int, 'b.int, 'c.int) + .select('a, 'b, 'c, ('a + 'b + 'c).as("abc")) + .select('a.as("a1"), 'b.as("b1"), 'c.as("c1"), 'abc.as("abc1")) + assert(aliasedRelation.analyze.isInstanceOf[Project]) + assert(aliasedRelation.analyze.asInstanceOf[Project].validConstraints.size === 5) + } } From 29fe7f06b3626d46bf584190e031517b4f65d471 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 1 Mar 2020 00:17:16 +0800 Subject: [PATCH 4/5] case e @ EqualNullSafe(l, _: AttributeReference) if l.references.size > 1 => e --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0e1659c87ea7..13f0d7b4a461 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -177,10 +177,7 @@ abstract class UnaryNode extends LogicalPlan { case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. allConstraints ++= allConstraints.map { - case e @ EqualNullSafe(l, _: AttributeReference) - if !l.isInstanceOf[AttributeReference] => e - case e @ EqualNullSafe(_: AttributeReference, r) - if !r.isInstanceOf[AttributeReference] => e + case e @ EqualNullSafe(l, _: AttributeReference) if l.references.size > 1 => e case other => other transform { case expr: Expression if expr.semanticEquals(e) => a.toAttribute From 50fd37739d2f26c987ebbb72627a955865454164 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 2 May 2020 14:23:42 +0800 Subject: [PATCH 5/5] Add comment --- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 13f0d7b4a461..39514ddaf454 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -177,6 +177,9 @@ abstract class UnaryNode extends LogicalPlan { case a @ Alias(e, _) => // For every alias in `projectList`, replace the reference in constraints by its attribute. allConstraints ++= allConstraints.map { + // To Avoid generating too many constraints, for example, + // SELECT a AS a1, b AS b1, ab AS ab1 FROM (SELECT a, b, a + b AS ab FROM tbl) t + // Avoid generating ((a#4 + b1#2) <=> ab#0) for ((a#4 + b#5) <=> ab#0). case e @ EqualNullSafe(l, _: AttributeReference) if l.references.size > 1 => e case other => other transform { case expr: Expression if expr.semanticEquals(e) =>