Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you committed Sep 11, 2021
1 parent ca63321 commit f5e4b91
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ case class SimpleCost(value: Long) extends Cost {
}

/**
* A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number
* A skew join aware implementation of [[Cost]], which consider shuffle number and skew join number.
*
* We always pick the cost which has more skew join even if it introduces one or more extra shuffle.
* Otherwise, if two costs have the same number of skew join or no skew join, we will pick the one
* with small number of shuffle.
*/
case class SkewJoinAwareCost(
numShuffles: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ case class EnsureRequirements(
extends Rule[SparkPlan] {

private def ensureDistributionAndOrdering(
originChildren: Seq[SparkPlan],
originalChildren: Seq[SparkPlan],
requiredChildDistributions: Seq[Distribution],
requiredChildOrderings: Seq[Seq[SortOrder]],
shuffleOrigin: ShuffleOrigin): Seq[SparkPlan] = {
assert(requiredChildDistributions.length == originChildren.length)
assert(requiredChildOrderings.length == originChildren.length)
assert(requiredChildDistributions.length == originalChildren.length)
assert(requiredChildOrderings.length == originalChildren.length)
// Ensure that the operator's children satisfy their output distribution requirements.
var children = originChildren.zip(requiredChildDistributions).map {
var newChildren = originalChildren.zip(requiredChildDistributions).map {
case (child, distribution) if child.outputPartitioning.satisfies(distribution) =>
child
case (child, BroadcastDistribution(mode)) =>
Expand All @@ -74,7 +74,7 @@ case class EnsureRequirements(
}.map(_._2)

val childrenNumPartitions =
childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
childrenIndexes.map(newChildren(_).outputPartitioning.numPartitions).toSet

if (childrenNumPartitions.size > 1) {
// Get the number of partitions which is explicitly required by the distributions.
Expand All @@ -92,7 +92,7 @@ case class EnsureRequirements(
// 1. We should avoid shuffling these children.
// 2. We should have a reasonable parallelism.
val nonShuffleChildrenNumPartitions =
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
childrenIndexes.map(newChildren).filterNot(_.isInstanceOf[ShuffleExchangeExec])
.map(_.outputPartitioning.numPartitions)
val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) {
if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) {
Expand All @@ -111,7 +111,7 @@ case class EnsureRequirements(

val targetNumPartitions = requiredNumPartitions.getOrElse(expectedChildrenNumPartitions)

children = children.zip(requiredChildDistributions).zipWithIndex.map {
newChildren = newChildren.zip(requiredChildDistributions).zipWithIndex.map {
case ((child, distribution), index) if childrenIndexes.contains(index) =>
if (child.outputPartitioning.numPartitions == targetNumPartitions) {
child
Expand All @@ -129,7 +129,7 @@ case class EnsureRequirements(
}

// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings:
children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
newChildren = newChildren.zip(requiredChildOrderings).map { case (child, requiredOrdering) =>
// If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort.
if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
child
Expand All @@ -138,7 +138,7 @@ case class EnsureRequirements(
}
}

children
newChildren
}

private def reorder(
Expand Down

0 comments on commit f5e4b91

Please sign in to comment.