Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2dfc648
Add failing test illustrating bad exchange planning.
JoshRosen Aug 6, 2015
cc5669c
Adding outputPartitioning to Repartition does not fix the test.
JoshRosen Aug 6, 2015
0675956
Preserving ordering and partitioning in row format converters also do…
JoshRosen Aug 6, 2015
adcc742
Move test to PlannerSuite.
JoshRosen Aug 6, 2015
c9fb231
Rewrite exchange to fix better handle this case.
JoshRosen Aug 6, 2015
c628daf
Revert accidental ExchangeSuite change.
JoshRosen Aug 6, 2015
752b8de
style fix
JoshRosen Aug 6, 2015
2e0f33a
Write a more generic test for EnsureRequirements.
JoshRosen Aug 6, 2015
5172ac5
Add test for requiresChildrenToProduceSameNumberOfPartitions.
JoshRosen Aug 6, 2015
0725a34
Small assertion cleanup.
JoshRosen Aug 6, 2015
a1c12b9
Add failing test to demonstrate allCompatible bug
JoshRosen Aug 6, 2015
4f08278
Fix the test by adding the compatibility check to EnsureRequirements
JoshRosen Aug 6, 2015
8dbc845
Add even more tests.
JoshRosen Aug 6, 2015
06aba0c
Add more comments
JoshRosen Aug 6, 2015
642b0bb
Further expand comment / reasoning
JoshRosen Aug 6, 2015
fee65c4
Further refinement to comments / reasoning
JoshRosen Aug 6, 2015
2c7e126
Merge remote-tracking branch 'origin/master' into exchange-fixes
JoshRosen Aug 8, 2015
18cddeb
Rename DummyPlan to DummySparkPlan.
JoshRosen Aug 8, 2015
1307c50
Update conditions for requiring child compatibility.
JoshRosen Aug 8, 2015
8784bd9
Giant comment explaining compatibleWith vs. guarantees
JoshRosen Aug 8, 2015
0983f75
More guarantees vs. compatibleWith cleanup; delete BroadcastPartition…
JoshRosen Aug 9, 2015
38006e7
Rewrite EnsureRequirements _yet again_ to make things even simpler
JoshRosen Aug 9, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,37 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
def clustering: Set[Expression] = ordering.map(_.child).toSet
}

/**
* Describes how an operator's output is split across partitions. The `compatibleWith`,
* `guarantees`, and `satisfies` methods describe relationships between child partitionings,
* target partitionings, and [[Distribution]]s. These relations are described more precisely in
* their individual method docs, but at a high level:
*
* - `satisfies` is a relationship between partitionings and distributions.
* - `compatibleWith` is relationships between an operator's child output partitionings.
* - `guarantees` is a relationship between a child's existing output partitioning and a target
* output partitioning.
*
* Diagrammatically:
*
* +--------------+
* | Distribution |
* +--------------+
* ^
* |
* satisfies
* |
* +--------------+ +--------------+
* | Child | | Target |
* +----| Partitioning |----guarantees--->| Partitioning |
* | +--------------+ +--------------+
* | ^
* | |
* | compatibleWith
* | |
* +------------+
*
*/
sealed trait Partitioning {
/** Returns the number of partitions that the data is split across */
val numPartitions: Int
Expand All @@ -90,9 +121,66 @@ sealed trait Partitioning {
/**
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]]
* guarantees the same partitioning scheme described by `other`.
*
* Compatibility of partitionings is only checked for operators that have multiple children
* and that require a specific child output [[Distribution]], such as joins.
*
* Intuitively, partitionings are compatible if they route the same partitioning key to the same
* partition. For instance, two hash partitionings are only compatible if they produce the same
* number of output partitionings and hash records according to the same hash function and
* same partitioning key schema.
*
* Put another way, two partitionings are compatible with each other if they satisfy all of the
* same distribution guarantees.
*/
// TODO: Add an example once we have the `nullSafe` concept.
def guarantees(other: Partitioning): Boolean
def compatibleWith(other: Partitioning): Boolean

/**
* Returns true iff we can say that the partitioning scheme of this [[Partitioning]] guarantees
* the same partitioning scheme described by `other`. If a `A.guarantees(B)`, then repartitioning
* the child's output according to `B` will be unnecessary. `guarantees` is used as a performance
* optimization to allow the exchange planner to avoid redundant repartitionings. By default,
* a partitioning only guarantees partitionings that are equal to itself (i.e. the same number
* of partitions, same strategy (range or hash), etc).
*
* In order to enable more aggressive optimization, this strict equality check can be relaxed.
* For example, say that the planner needs to repartition all of an operator's children so that
* they satisfy the [[AllTuples]] distribution. One way to do this is to repartition all children
* to have the [[SinglePartition]] partitioning. If one of the operator's children already happens
* to be hash-partitioned with a single partition then we do not need to re-shuffle this child;
* this repartitioning can be avoided if a single-partition [[HashPartitioning]] `guarantees`
* [[SinglePartition]].
*
* The SinglePartition example given above is not particularly interesting; guarantees' real
* value occurs for more advanced partitioning strategies. SPARK-7871 will introduce a notion
* of null-safe partitionings, under which partitionings can specify whether rows whose
* partitioning keys contain null values will be grouped into the same partition or whether they
* will have an unknown / random distribution. If a partitioning does not require nulls to be
* clustered then a partitioning which _does_ cluster nulls will guarantee the null clustered
* partitioning. The converse is not true, however: a partitioning which clusters nulls cannot
* be guaranteed by one which does not cluster them. Thus, in general `guarantees` is not a
* symmetric relation.
*
* Another way to think about `guarantees`: if `A.guarantees(B)`, then any partitioning of rows
* produced by `A` could have also been produced by `B`.
*/
def guarantees(other: Partitioning): Boolean = this == other
}

object Partitioning {
def allCompatible(partitionings: Seq[Partitioning]): Boolean = {
// Note: this assumes transitivity
partitionings.sliding(2).map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can use forall instead of map here and remove the forall at the end.

case Seq(a) => true
case Seq(a, b) =>
if (a.numPartitions != b.numPartitions) {
assert(!a.compatibleWith(b) && !b.compatibleWith(a))
false
} else {
a.compatibleWith(b) && b.compatibleWith(a)
}
}.forall(_ == true)
}
}

case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
Expand All @@ -101,6 +189,8 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
case _ => false
}

override def compatibleWith(other: Partitioning): Boolean = false

override def guarantees(other: Partitioning): Boolean = false
}

Expand All @@ -109,21 +199,9 @@ case object SinglePartition extends Partitioning {

override def satisfies(required: Distribution): Boolean = true

override def guarantees(other: Partitioning): Boolean = other match {
case SinglePartition => true
case _ => false
}
}

case object BroadcastPartitioning extends Partitioning {
val numPartitions = 1
override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1

override def satisfies(required: Distribution): Boolean = true

override def guarantees(other: Partitioning): Boolean = other match {
case BroadcastPartitioning => true
case _ => false
}
override def guarantees(other: Partitioning): Boolean = other.numPartitions == 1
}

/**
Expand All @@ -147,6 +225,12 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
case _ => false
}

override def compatibleWith(other: Partitioning): Boolean = other match {
case o: HashPartitioning =>
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
case _ => false
}

override def guarantees(other: Partitioning): Boolean = other match {
case o: HashPartitioning =>
this.clusteringSet == o.clusteringSet && this.numPartitions == o.numPartitions
Expand Down Expand Up @@ -185,6 +269,11 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
case _ => false
}

override def compatibleWith(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this == o
case _ => false
}

override def guarantees(other: Partitioning): Boolean = other match {
case o: RangePartitioning => this == o
case _ => false
Expand Down Expand Up @@ -228,6 +317,13 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
override def satisfies(required: Distribution): Boolean =
partitionings.exists(_.satisfies(required))

/**
* Returns true if any `partitioning` of this collection is compatible with
* the given [[Partitioning]].
*/
override def compatibleWith(other: Partitioning): Boolean =
partitionings.exists(_.compatibleWith(other))

/**
* Returns true if any `partitioning` of this collection guarantees
* the given [[Partitioning]].
Expand Down
104 changes: 55 additions & 49 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,66 +190,72 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
* of input data meets the
* [[org.apache.spark.sql.catalyst.plans.physical.Distribution Distribution]] requirements for
* each operator by inserting [[Exchange]] Operators where required. Also ensure that the
* required input partition ordering requirements are met.
* input partition ordering requirements are met.
*/
private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[SparkPlan] {
// TODO: Determine the number of partitions.
def numPartitions: Int = sqlContext.conf.numShufflePartitions
private def numPartitions: Int = sqlContext.conf.numShufflePartitions

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator: SparkPlan =>
// Adds Exchange or Sort operators as required
def addOperatorsIfNecessary(
partitioning: Partitioning,
rowOrdering: Seq[SortOrder],
child: SparkPlan): SparkPlan = {

def addShuffleIfNecessary(child: SparkPlan): SparkPlan = {
if (!child.outputPartitioning.guarantees(partitioning)) {
Exchange(partitioning, child)
} else {
child
}
}
/**
* Given a required distribution, returns a partitioning that satisfies that distribution.
*/
private def canonicalPartitioning(requiredDistribution: Distribution): Partitioning = {
requiredDistribution match {
case AllTuples => SinglePartition
case ClusteredDistribution(clustering) => HashPartitioning(clustering, numPartitions)
case OrderedDistribution(ordering) => RangePartitioning(ordering, numPartitions)
case dist => sys.error(s"Do not know how to satisfy distribution $dist")
}
}

def addSortIfNecessary(child: SparkPlan): SparkPlan = {

if (rowOrdering.nonEmpty) {
// If child.outputOrdering is [a, b] and rowOrdering is [a], we do not need to sort.
val minSize = Seq(rowOrdering.size, child.outputOrdering.size).min
if (minSize == 0 || rowOrdering.take(minSize) != child.outputOrdering.take(minSize)) {
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
} else {
child
}
} else {
child
}
}
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering
var children: Seq[SparkPlan] = operator.children

addSortIfNecessary(addShuffleIfNecessary(child))
// Ensure that the operator's children satisfy their output distribution requirements:
children = children.zip(requiredChildDistributions).map { case (child, distribution) =>
if (child.outputPartitioning.satisfies(distribution)) {
child
} else {
Exchange(canonicalPartitioning(distribution), child)
}
}

val requirements =
(operator.requiredChildDistribution, operator.requiredChildOrdering, operator.children)

val fixedChildren = requirements.zipped.map {
case (AllTuples, rowOrdering, child) =>
addOperatorsIfNecessary(SinglePartition, rowOrdering, child)
case (ClusteredDistribution(clustering), rowOrdering, child) =>
addOperatorsIfNecessary(HashPartitioning(clustering, numPartitions), rowOrdering, child)
case (OrderedDistribution(ordering), rowOrdering, child) =>
addOperatorsIfNecessary(RangePartitioning(ordering, numPartitions), rowOrdering, child)

case (UnspecifiedDistribution, Seq(), child) =>
// If the operator has multiple children and specifies child output distributions (e.g. join),
// then the children's output partitionings must be compatible:
if (children.length > 1
&& requiredChildDistributions.toSet != Set(UnspecifiedDistribution)
&& !Partitioning.allCompatible(children.map(_.outputPartitioning))) {
children = children.zip(requiredChildDistributions).map { case (child, distribution) =>
val targetPartitioning = canonicalPartitioning(distribution)
if (child.outputPartitioning.guarantees(targetPartitioning)) {
child
case (UnspecifiedDistribution, rowOrdering, child) =>
sqlContext.planner.BasicOperators.getSortOperator(rowOrdering, global = false, child)
} else {
Exchange(targetPartitioning, child)
}
}
}

case (dist, ordering, _) =>
sys.error(s"Don't know how to ensure $dist with ordering $ordering")
// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
if (requiredOrdering.nonEmpty) {
// If child.outputOrdering is [a, b] and requiredOrdering is [a], we do not need to sort.
val minSize = Seq(requiredOrdering.size, child.outputOrdering.size).min
if (minSize == 0 || requiredOrdering.take(minSize) != child.outputOrdering.take(minSize)) {
sqlContext.planner.BasicOperators.getSortOperator(requiredOrdering, global = false, child)
} else {
child
}
} else {
child
}
}

operator.withNewChildren(fixedChildren)
operator.withNewChildren(children)
}

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator: SparkPlan => ensureDistributionAndOrdering(operator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = {
if (numPartitions == 1) SinglePartition
else UnknownPartitioning(numPartitions)
}

protected override def doExecute(): RDD[InternalRow] = {
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule

/**
Expand All @@ -33,6 +34,8 @@ case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode {
require(UnsafeProjection.canSupport(child.schema), s"Cannot convert ${child.schema} to Unsafe")

override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = false
override def canProcessSafeRows: Boolean = true
Expand All @@ -51,6 +54,8 @@ case class ConvertToUnsafe(child: SparkPlan) extends UnaryNode {
@DeveloperApi
case class ConvertToSafe(child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def outputsUnsafeRows: Boolean = false
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = false
Expand Down
Loading