Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ private[spark] object SQLConf {

val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"

// Options that control which operators can be chosen by the query planner. These should be
// considered hints and may be ignored by future versions of Spark SQL.
val EXTERNAL_SORT = "spark.sql.planner.externalSort"

// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"

Expand Down Expand Up @@ -96,6 +100,9 @@ private[sql] trait SQLConf {
private[spark] def parquetFilterPushDown =
getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean

/** When true the planner will use the external sort, which may spill to disk. */
private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean

/**
* When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode
* that evaluates expressions found in queries. In general this custom code runs much faster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Distinct(child) =>
execution.Distinct(partial = false,
execution.Distinct(partial = true, planLater(child))) :: Nil

case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled =>
execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil
case logical.Sort(sortExprs, child) =>
// This sort is a global sort. Its requiredDistribution will be an OrderedDistribution.
execution.Sort(sortExprs, global = true, planLater(child)):: Nil

case logical.SortPartitions(sortExprs, child) =>
// This sort only sorts tuples within a partition. Its requiredDistribution will be
// an UnspecifiedDistribution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair
import org.apache.spark.util.collection.ExternalSorter

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -189,6 +190,9 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)

/**
* :: DeveloperApi ::
* Performs a sort on-heap.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we document the parameters, e.g. "global" for both Sort and ExternalSort?

* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
@DeveloperApi
case class Sort(
Expand All @@ -199,12 +203,37 @@ case class Sort(
override def requiredChildDistribution =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute() = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
}, preservesPartitioning = true)
}

override def output = child.output
}

/**
* :: DeveloperApi ::
* Performs a sort, spilling to disk as needed.
* @param global when true performs a global sort of all partitions by shuffling the data first
* if necessary.
*/
@DeveloperApi
case class ExternalSort(
sortOrder: Seq[SortOrder],
global: Boolean,
child: SparkPlan)
extends UnaryNode {
override def requiredChildDistribution =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute() = attachTree(this, "sort") {
child.execute()
.mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
sorter.insertAll(iterator.map(r => (r, null)))
sorter.iterator.map(_._1)
}, preservesPartitioning = true)
}

Expand Down
16 changes: 15 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
Seq(Seq("1")))
}

test("sorting") {
def sortTest() = {
checkAnswer(
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC"),
Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
Expand Down Expand Up @@ -238,6 +238,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
mapData.collect().sortBy(_.data(1)).reverse.toSeq)
}

test("sorting") {
val before = externalSortEnabled
setConf(SQLConf.EXTERNAL_SORT, "false")
sortTest()
setConf(SQLConf.EXTERNAL_SORT, before.toString)
}

test("external sorting") {
val before = externalSortEnabled
setConf(SQLConf.EXTERNAL_SORT, "true")
sortTest()
setConf(SQLConf.EXTERNAL_SORT, before.toString)
}

test("limit") {
checkAnswer(
sql("SELECT * FROM testData LIMIT 10"),
Expand Down