From f82b0ab02145cd801660056010e82fa22bf8e913 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Mon, 26 Apr 2021 18:51:02 +0800 Subject: [PATCH 1/4] init init --- .../catalyst/plans/logical/basicLogicalOperators.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0f5bc7e1f13e..21700931e83c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -742,6 +742,16 @@ case class Range( } } + override def maxRowsPerPartition: Option[Long] = { + if (numSlices.isDefined) { + var m = numElements / numSlices.get + if (numElements % numSlices.get != 0) m += 1 + if (m.isValidLong) Some(m.toLong) else maxRows + } else { + maxRows + } + } + override def computeStats(): Statistics = { if (numElements == 0) { Statistics(sizeInBytes = 0, rowCount = Some(0)) From 0a69629d25ba3ed7cc4f405ed25452de62f60f0e Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 29 Apr 2021 10:16:50 +0800 Subject: [PATCH 2/4] add test --- .../apache/spark/sql/catalyst/dsl/package.scala | 2 ++ .../spark/sql/catalyst/optimizer/Optimizer.scala | 6 ++++++ .../plans/logical/basicLogicalOperators.scala | 2 ++ .../catalyst/optimizer/CombiningLimitsSuite.scala | 14 ++++++++++++++ 4 files changed, 24 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 626ece33f157..4ab768579f73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -374,6 +374,8 @@ package object dsl { def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan) + def localLimit(limitExpr: Expression): LogicalPlan = LocalLimit(limitExpr, logicalPlan) + def join( otherPlan: LogicalPlan, joinType: JoinType = Inner, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 16e3e43356b9..b569feee3c4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1618,12 +1618,18 @@ object EliminateLimits extends Rule[LogicalPlan] { private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = { limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] } } + private def canEliminateLocal(localLimitExpr: Expression, child: LogicalPlan): Boolean = { + localLimitExpr.foldable && + child.maxRowsPerPartition.exists { _ <= localLimitExpr.eval().asInstanceOf[Int] } + } def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case Limit(l, child) if canEliminate(l, child) => child case GlobalLimit(l, child) if canEliminate(l, child) => child + case LocalLimit(l, child) if canEliminateLocal(l, child) => + child case GlobalLimit(le, GlobalLimit(ne, grandChild)) => GlobalLimit(Least(Seq(ne, le)), grandChild) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 21700931e83c..7231897020cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -69,6 +69,7 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) override def maxRows: Option[Long] = child.maxRows + override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition override lazy val resolved: Boolean = { val hasSpecialExpressions = projectList.exists ( _.collect { @@ -161,6 +162,7 @@ case class Filter(condition: Expression, child: LogicalPlan) override def output: Seq[Attribute] = child.output override def maxRows: Option[Long] = child.maxRows + override def maxRowsPerPartition: Option[Long] = child.maxRowsPerPartition final override val nodePatterns: Seq[TreePattern] = Seq(FILTER) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 423ff81aaebc..355cf0612058 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -150,6 +150,14 @@ class CombiningLimitsSuite extends PlanTest { ) } + test("SPARK-35231: Eliminate LocalLimit if Range maxRowsPerPartition not larger than Limit") { + checkPlanAndMaxRowsPerPartition( + Range(0, 100, 1, 3).select().localLimit(34), + Range(0, 100, 1, 3).select(), + 34 + ) + } + test("SPARK-33497: Eliminate Limit if Sample max rows not larger than Limit") { checkPlanAndMaxRow( testRelation.select().sample(0, 0.2, false, 1).limit(10), @@ -240,6 +248,12 @@ class CombiningLimitsSuite extends PlanTest { comparePlans(Optimize.execute(optimized.analyze), expected.analyze) assert(expected.maxRows.get == expectedMaxRow) } + + private def checkPlanAndMaxRowsPerPartition( + optimized: LogicalPlan, expected: LogicalPlan, expectedMaxRowsPerPartition: Long): Unit = { + comparePlans(Optimize.execute(optimized.analyze), expected.analyze) + assert(expected.maxRowsPerPartition.get == expectedMaxRowsPerPartition) + } } case class RelationWithoutMaxRows(output: Seq[Attribute]) extends LeafNode { From 30db1c2f4852a902b4c34ff33f78cbfbce06ca5e Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 29 Apr 2021 14:27:46 +0800 Subject: [PATCH 3/4] fix test --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b569feee3c4a..209abdfb1f99 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1618,7 +1618,7 @@ object EliminateLimits extends Rule[LogicalPlan] { private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = { limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] } } - private def canEliminateLocal(localLimitExpr: Expression, child: LogicalPlan): Boolean = { + private def canEliminateLocalLimit(localLimitExpr: Expression, child: LogicalPlan): Boolean = { localLimitExpr.foldable && child.maxRowsPerPartition.exists { _ <= localLimitExpr.eval().asInstanceOf[Int] } } @@ -1628,7 +1628,7 @@ object EliminateLimits extends Rule[LogicalPlan] { child case GlobalLimit(l, child) if canEliminate(l, child) => child - case LocalLimit(l, child) if canEliminateLocal(l, child) => + case LocalLimit(l, child) if !plan.isStreaming && canEliminateLocalLimit(l, child) => child case GlobalLimit(le, GlobalLimit(ne, grandChild)) => From a3e26c79de9f46aeee82dc4e5bc11cfe2ab69668 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 6 May 2021 14:35:22 +0800 Subject: [PATCH 4/4] use a simple test --- .../apache/spark/sql/catalyst/dsl/package.scala | 2 -- .../spark/sql/catalyst/optimizer/Optimizer.scala | 6 ------ .../catalyst/optimizer/CombiningLimitsSuite.scala | 14 -------------- .../sql/catalyst/plans/LogicalPlanSuite.scala | 11 ++++++++++- 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 4ab768579f73..626ece33f157 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -374,8 +374,6 @@ package object dsl { def limit(limitExpr: Expression): LogicalPlan = Limit(limitExpr, logicalPlan) - def localLimit(limitExpr: Expression): LogicalPlan = LocalLimit(limitExpr, logicalPlan) - def join( otherPlan: LogicalPlan, joinType: JoinType = Inner, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 209abdfb1f99..16e3e43356b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1618,18 +1618,12 @@ object EliminateLimits extends Rule[LogicalPlan] { private def canEliminate(limitExpr: Expression, child: LogicalPlan): Boolean = { limitExpr.foldable && child.maxRows.exists { _ <= limitExpr.eval().asInstanceOf[Int] } } - private def canEliminateLocalLimit(localLimitExpr: Expression, child: LogicalPlan): Boolean = { - localLimitExpr.foldable && - child.maxRowsPerPartition.exists { _ <= localLimitExpr.eval().asInstanceOf[Int] } - } def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { case Limit(l, child) if canEliminate(l, child) => child case GlobalLimit(l, child) if canEliminate(l, child) => child - case LocalLimit(l, child) if !plan.isStreaming && canEliminateLocalLimit(l, child) => - child case GlobalLimit(le, GlobalLimit(ne, grandChild)) => GlobalLimit(Least(Seq(ne, le)), grandChild) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 355cf0612058..423ff81aaebc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -150,14 +150,6 @@ class CombiningLimitsSuite extends PlanTest { ) } - test("SPARK-35231: Eliminate LocalLimit if Range maxRowsPerPartition not larger than Limit") { - checkPlanAndMaxRowsPerPartition( - Range(0, 100, 1, 3).select().localLimit(34), - Range(0, 100, 1, 3).select(), - 34 - ) - } - test("SPARK-33497: Eliminate Limit if Sample max rows not larger than Limit") { checkPlanAndMaxRow( testRelation.select().sample(0, 0.2, false, 1).limit(10), @@ -248,12 +240,6 @@ class CombiningLimitsSuite extends PlanTest { comparePlans(Optimize.execute(optimized.analyze), expected.analyze) assert(expected.maxRows.get == expectedMaxRow) } - - private def checkPlanAndMaxRowsPerPartition( - optimized: LogicalPlan, expected: LogicalPlan, expectedMaxRowsPerPartition: Long): Unit = { - comparePlans(Optimize.execute(optimized.analyze), expected.analyze) - assert(expected.maxRowsPerPartition.get == expectedMaxRowsPerPartition) - } } case class RelationWithoutMaxRows(output: Seq[Attribute]) extends LeafNode { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index 3784f4010170..0cd6d8164fe8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.IntegerType @@ -96,4 +98,11 @@ class LogicalPlanSuite extends SparkFunSuite { OneRowRelation()) assert(result.sameResult(expected)) } + + test("SPARK-35231: logical.Range override maxRowsPerPartition") { + assert(Range(0, 100, 1, 3).maxRowsPerPartition === Some(34)) + assert(Range(0, 100, 1, 4).maxRowsPerPartition === Some(25)) + assert(Range(0, 100, 1, 3).select('id).maxRowsPerPartition === Some(34)) + assert(Range(0, 100, 1, 3).where('id % 2 === 1).maxRowsPerPartition === Some(34)) + } }