Skip to content

Commit

Permalink
Remove CollectLimitExec.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Oct 25, 2016
1 parent 76a3eaf commit 58e8383
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(
limit,
execution.LocalLimitExec(limit, planLater(child))) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def executeCollect(): Array[InternalRow] = child match {
// This happens when the user is collecting results back to the driver, we could skip
// the shuffling and scan increasingly the RDD to get the limited items.
case g: GlobalLimitExec => g.executeCollect()
case _ => super.executeCollect()
}

override lazy val metrics = Map(
"pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext,
Expand Down
36 changes: 15 additions & 21 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,6 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
import org.apache.spark.util.Utils


/**
* Take the first `limit` elements and collect them to a single partition.
*
* This operator will be used when a logical `Limit` operation is the final operator in an
* logical plan, which happens when the user is collecting results back to the driver.
*/
case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = SinglePartition
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
override def executeCollect(): Array[InternalRow] = {
child.collect {
case l: LocalLimitExec => l
}.head.child.executeTake(limit)
}

protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal(_.take(limit))
}
}

/**
* Helper trait which defines methods that are shared by both
* [[LocalLimitExec]] and [[GlobalLimitExec]].
Expand All @@ -57,6 +36,18 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputPartitioning: Partitioning = child.outputPartitioning
override def executeCollect(): Array[InternalRow] = {
child match {
// Shuffling is inserted under whole stage codegen.
case InputAdapter(ShuffleExchange(_, WholeStageCodegenExec(l: LocalLimitExec), _)) =>
l.executeCollect()
// Shuffling is inserted without whole stage codegen.
case ShuffleExchange(_, l: LocalLimitExec, _) => l.executeCollect()
// No shuffling happened.
case _ => child.executeTake(limit)
}
}

protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.take(limit)
}
Expand Down Expand Up @@ -100,6 +91,9 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {

/**
* Take the first `limit` elements of the child's single output partition.
* If this is the final operator in physical plan, which happens when the user is collecting
* results back to the driver, we could skip the shuffling and scan increasingly the RDD to
* get the limited items.
*/
case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,10 @@ class PlannerSuite extends SharedSQLContext {
assert(planned.output === testData.select('value, 'key).logicalPlan.output)
}

test("terminal limits that are not handled by TakeOrderedAndProject should use CollectLimit") {
test("terminal limits that are not handled by TakeOrderedAndProject should use GlobalLimit") {
val query = testData.select('value).limit(2)
val planned = query.queryExecution.sparkPlan
assert(planned.isInstanceOf[CollectLimitExec])
assert(planned.isInstanceOf[GlobalLimitExec])
assert(planned.output === testData.select('value).logicalPlan.output)
}

Expand All @@ -191,10 +191,10 @@ class PlannerSuite extends SharedSQLContext {
assert(planned.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined)
}

test("CollectLimit can appear in the middle of a plan when caching is used") {
test("GlobalLimit can appear in the middle of a plan when caching is used") {
val query = testData.select('key, 'value).limit(2).cache()
val planned = query.queryExecution.optimizedPlan.asInstanceOf[InMemoryRelation]
assert(planned.child.isInstanceOf[CollectLimitExec])
assert(planned.child.isInstanceOf[GlobalLimitExec])
}

test("PartitioningCollection") {
Expand Down

0 comments on commit 58e8383

Please sign in to comment.