Skip to content

Commit 3b4c679

Browse files
committed
Drop PartitionCoalesce and use Reparititon
1 parent c0306d3 commit 3b4c679

File tree

5 files changed

+27
-31
lines changed

5 files changed

+27
-31
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -596,14 +596,15 @@ object CollapseProject extends Rule[LogicalPlan] {
596596
object CollapseRepartition extends Rule[LogicalPlan] {
597597
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
598598
// Case 1: When a Repartition has a child of Repartition or RepartitionByExpression,
599-
// 1) When the top node does not enable the shuffle (i.e., coalesce API), but the child
600-
// enables the shuffle. Returns the child node if the last numPartitions is bigger;
601-
// otherwise, keep unchanged.
599+
// 1) When the top node does not enable the shuffle (i.e., coalesce with no user-specified
600+
// strategy), but the child enables the shuffle. Returns the child node if the last
601+
// numPartitions is bigger; otherwise, keep unchanged.
602602
// 2) In the other cases, returns the top node with the child's child
603-
case r @ Repartition(_, _, child: RepartitionOperation) => (r.shuffle, child.shuffle) match {
604-
case (false, true) => if (r.numPartitions >= child.numPartitions) child else r
605-
case _ => r.copy(child = child.child)
606-
}
603+
case r @ Repartition(_, _, child: RepartitionOperation, None) =>
604+
(r.shuffle, child.shuffle) match {
605+
case (false, true) => if (r.numPartitions >= child.numPartitions) child else r
606+
case _ => r.copy(child = child.child)
607+
}
607608
// Case 2: When a RepartitionByExpression has a child of Repartition or RepartitionByExpression
608609
// we can remove the child.
609610
case r @ RepartitionByExpression(_, child: RepartitionOperation, _) =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -746,22 +746,24 @@ abstract class RepartitionOperation extends UnaryNode {
746746
* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
747747
* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
748748
* of the output requires some specific ordering or distribution of the data.
749+
*
750+
* If `shuffle` = false (`coalesce` cases), this logical plan can have an user-specified strategy
751+
* to coalesce input partitions.
752+
*
753+
* @param numPartitions How many partitions to use in the output RDD
754+
* @param shuffle Whether to shuffle when repartitioning
755+
* @param child the LogicalPlan
756+
* @param coalescer Optional coalescer that an user specifies
749757
*/
750-
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
758+
case class Repartition(
759+
numPartitions: Int,
760+
shuffle: Boolean,
761+
child: LogicalPlan,
762+
coalescer: Option[PartitionCoalescer] = None)
751763
extends RepartitionOperation {
752764
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
753765
}
754766

755-
/**
756-
* Returns a new RDD that has at most `numPartitions` partitions. This behavior can be modified by
757-
* supplying a `PartitionCoalescer` to control the behavior of the partitioning.
758-
*/
759-
case class PartitionCoalesce(numPartitions: Int, coalescer: PartitionCoalescer, child: LogicalPlan)
760-
extends UnaryNode {
761-
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
762-
override def output: Seq[Attribute] = child.output
763-
}
764-
765767
/**
766768
* This method repartitions data using [[Expression]]s into `numPartitions`, and receives
767769
* information about the number of partitions during execution. Used when a specific ordering or

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2680,14 +2680,9 @@ class Dataset[T] private[sql](
26802680
* @group typedrel
26812681
* @since 2.3.0
26822682
*/
2683-
def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer]): Dataset[T] = {
2684-
userDefinedCoalescer.map { coalescer =>
2685-
withTypedPlan {
2686-
PartitionCoalesce(numPartitions, coalescer, logicalPlan)
2687-
}
2688-
}.getOrElse {
2689-
coalesce(numPartitions)
2690-
}
2683+
def coalesce(numPartitions: Int, userDefinedCoalescer: Option[PartitionCoalescer])
2684+
: Dataset[T] = withTypedPlan {
2685+
Repartition(numPartitions, shuffle = false, logicalPlan, userDefinedCoalescer)
26912686
}
26922687

26932688
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -390,14 +390,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
390390
f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
391391
planLater(left), planLater(right)) :: Nil
392392

393-
case logical.Repartition(numPartitions, shuffle, child) =>
393+
case logical.Repartition(numPartitions, shuffle, child, coalescer) =>
394394
if (shuffle) {
395395
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
396396
} else {
397-
execution.CoalesceExec(numPartitions, planLater(child), None) :: Nil
397+
execution.CoalesceExec(numPartitions, planLater(child), coalescer) :: Nil
398398
}
399-
case logical.PartitionCoalesce(numPartitions, coalescer, child) =>
400-
execution.CoalesceExec(numPartitions, planLater(child), Some(coalescer)) :: Nil
401399
case logical.Sort(sortExprs, global, child) =>
402400
execution.SortExec(sortExprs, global, planLater(child)) :: Nil
403401
case logical.Project(projectList, child) =>

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ class PlannerSuite extends SharedSQLContext {
244244
assert(countRepartitions(doubleRepartitioned.queryExecution.logical) === 3)
245245
assert(countRepartitions(doubleRepartitioned.queryExecution.optimizedPlan) === 2)
246246
doubleRepartitioned.queryExecution.optimizedPlan match {
247-
case Repartition (numPartitions, shuffle, Repartition(_, shuffleChild, _)) =>
247+
case Repartition(numPartitions, shuffle, Repartition(_, shuffleChild, _, _), _) =>
248248
assert(numPartitions === 5)
249249
assert(shuffle === false)
250250
assert(shuffleChild === true)

0 commit comments

Comments
 (0)