Skip to content

Commit cc254f2

Browse files
xingchaozhGitHub Enterprise
authored andcommitted
[CARMEL-6383] Allow introduce new shuffle in skew handling (#1161)
* [CARMEL-6383] Allow introduce new shuffle in skew handling * fix code style * Support multi skew joins in EliminateSkewOptimzeIntroducedShuffle * fix ut * fix ut * Add tag to detect newly introduced shuffle * Add tags back when updateShuffleReads
1 parent e49b25b commit cc254f2

File tree

13 files changed

+538
-122
lines changed

13 files changed

+538
-122
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,12 @@ object SQLConf {
696696
.booleanConf
697697
.createWithDefault(false)
698698

699+
val ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN =
700+
buildConf("spark.sql.adaptive.forceOptimizeSkewedJoin")
701+
.doc("When true, force enable OptimizeSkewedJoin even if it introduces extra shuffle.")
702+
.version("3.3.0")
703+
.fallbackConf(ALLOW_ADDITIONAL_SHUFFLE)
704+
699705
val SKEW_JOIN_SKEWED_PARTITION_FACTOR =
700706
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
701707
.doc("A partition is considered as skewed if its size is larger than this factor " +

sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantSorts.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.sql.catalyst.expressions.SortOrder
2121
import org.apache.spark.sql.catalyst.rules.Rule
22+
import org.apache.spark.sql.execution.window.WindowExec
2223
import org.apache.spark.sql.internal.SQLConf
2324

2425
/**
@@ -42,5 +43,9 @@ object RemoveRedundantSorts extends Rule[SparkPlan] {
4243
if SortOrder.orderingSatisfies(child.outputOrdering, orders) &&
4344
child.outputPartitioning.satisfies(s.requiredChildDistribution.head) =>
4445
child
46+
case w @ WindowExec(_, _, _, _, s1 @ WindowSortLimitExec(
47+
_, _, _, _, _, _, s2: WindowSortLimitExec, _))
48+
if conf.enableWindowLimit && s2.redundantWith(s1) =>
49+
w.copy(child = s2)
4550
}
4651
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,28 @@ case class WindowSortLimitExec(
184184
override protected def doProduce(ctx: CodegenContext): String = {
185185
doProduce(ctx, classOf[AbstractUnsafeExternalRowSorter].getName)
186186
}
187+
188+
def redundantWith(o: WindowSortLimitExec): Boolean = {
189+
val partitionSpecSame = partitionSpec.length == o.partitionSpec.length &&
190+
partitionSpec.zip(o.partitionSpec).forall {
191+
case (l, r) => l.semanticEquals(r)
192+
}
193+
194+
val sortOrderInWindowSame = sortOrderInWindow.length == o.sortOrderInWindow.length &&
195+
sortOrderInWindow.zip(o.sortOrderInWindow).forall {
196+
case (l, r) => l.semanticEquals(r)
197+
}
198+
199+
val sortOrderAcrossWindowsSame =
200+
sortOrderAcrossWindows.length == o.sortOrderAcrossWindows.length &&
201+
sortOrderAcrossWindows.zip(o.sortOrderAcrossWindows).forall {
202+
case (l, r) => l.semanticEquals(r)
203+
}
204+
205+
partitionSpecSame && sortOrderInWindowSame && sortOrderAcrossWindowsSame &&
206+
global == o.global && partitionLimit == o.partitionLimit &&
207+
maxBufferSize == o.maxBufferSize && testSpillFrequency == o.testSpillFrequency
208+
}
187209
}
188210

189211
abstract class SortExecBase(

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,10 @@ case class AdaptiveSparkPlanExec(
9797
EnsureRepartitionForWriting,
9898
EliminateShuffleExec,
9999
DisableUnnecessaryBucketedScan,
100-
AdjustScanPartitionSizeDynamically
100+
AdjustScanPartitionSizeDynamically,
101+
OptimizeSkewedJoin, // ensureRequirements
102+
EliminateSkewOptimzeIntroducedShuffle,
103+
removeRedundantSorts
101104
) ++ context.session.sessionState.queryStagePrepRules
102105

103106
@transient private val initialPlan = context.session.withActive {
@@ -109,16 +112,15 @@ case class AdaptiveSparkPlanExec(
109112
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
110113
PlanAdaptiveDynamicPruningFilters(initialPlan),
111114
ReuseAdaptiveSubquery(context.subqueryCache),
112-
113-
OptimizeSkewedJoin,
114115
CoalesceShufflePartitions(context.session),
115116
// The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
116117
// added by `CoalesceShufflePartitions`. So they must be executed after it.
117118
OptimizeSkewedRangePartition,
118119
OptimizeSkewedInsert,
119120
OptimizeLocalShuffleReader,
120121
ensureRequirements,
121-
EliminateSkewOptimzeIntroducedShuffle
122+
EliminateSkewOptimzeIntroducedShuffle,
123+
removeRedundantSorts
122124
)
123125

124126
// A list of physical optimizer rules to be applied right after a new stage is created. The input
@@ -128,7 +130,8 @@ case class AdaptiveSparkPlanExec(
128130
CollapseCodegenStages()
129131
)
130132

131-
@transient private val costEvaluator = SimpleCostEvaluator
133+
@transient private val costEvaluator =
134+
SimpleCostEvaluator(conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN))
132135

133136
@volatile private[sql] var currentPhysicalPlan = initialPlan
134137

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ case class CoalesceShufflePartitions(session: SparkSession)
231231
}
232232

233233
private def collectShuffleStageInfos(plan: SparkPlan): Seq[ShuffleStageInformation] = plan match {
234-
case ShuffleStageInformation(stage, specs) => Seq(new ShuffleStageInformation(stage, specs))
234+
case ShuffleStageInformation(stage, specs, _) => Seq(new ShuffleStageInformation(stage, specs))
235235
case _ => plan.children.flatMap(collectShuffleStageInfos)
236236
}
237237

@@ -242,7 +242,7 @@ case class CoalesceShufflePartitions(session: SparkSession)
242242
// Even for shuffle exchange whose input RDD has 0 partition, we should still update its
243243
// `partitionStartIndices`, so that all the leaf shuffles in a stage have the same
244244
// number of output partitions.
245-
case ShuffleStageInformation(stage, _) =>
245+
case ShuffleStageInformation(stage, _, optimizeTags) =>
246246
specsMap.get(stage.id).map { specs =>
247247
// Since we may not submit tasks which read empty partition, and the MapOutputStatistics
248248
// above could be all '0' of each partition size. And the coalesced partitionSpecs could
@@ -251,7 +251,9 @@ case class CoalesceShufflePartitions(session: SparkSession)
251251
val normalizedSpecs = if (specs.isEmpty) {
252252
specs :+ CoalescedPartitionSpec(0, numPartitions)
253253
} else specs
254-
CustomShuffleReaderExec(stage, normalizedSpecs)
254+
val newCustomShuffleReaderExec = CustomShuffleReaderExec(stage, normalizedSpecs)
255+
optimizeTags.foreach(newCustomShuffleReaderExec.addOptimizeTag(_))
256+
newCustomShuffleReaderExec
255257
}.getOrElse(plan)
256258
case other => other.mapChildren(updateShuffleReads(_, specsMap, numPartitions))
257259
}
@@ -262,11 +264,12 @@ private class ShuffleStageInformation(val shuffleStage: ShuffleQueryStageExec,
262264

263265
private object ShuffleStageInformation {
264266
def unapply(plan: SparkPlan)
265-
: Option[(ShuffleQueryStageExec, Option[Seq[ShufflePartitionSpec]])] = plan match {
267+
: Option[(ShuffleQueryStageExec,
268+
Option[Seq[ShufflePartitionSpec]], Seq[String])] = plan match {
266269
case stage: ShuffleQueryStageExec =>
267-
Some((stage, None))
268-
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
269-
Some((s, Some(partitionSpecs)))
270+
Some((stage, None, Seq.empty[String]))
271+
case c@CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
272+
Some((s, Some(partitionSpecs), c.getOptimizeTags()))
270273
case _ => None
271274
}
272275
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateSkewOptimzeIntroducedShuffle.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.adaptive
1818

1919
import org.apache.spark.sql.catalyst.rules.Rule
2020
import org.apache.spark.sql.execution.{CoalescedPartitionSpec, SparkPlan}
21-
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
21+
import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec}
2222
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
2323

2424
/**
@@ -43,8 +43,8 @@ private[adaptive] object EliminateSkewOptimzeIntroducedShuffle extends Rule[Spar
4343
val skewedJoins = plan.collect {
4444
case s: SortMergeJoinExec if s.isSkewJoin => s
4545
}
46-
// Sanity check. Suppose this rule should take effect iff there is one skewed join operator.
47-
if (skewedJoins.size != 1) {
46+
// Sanity check. Suppose this rule should take effect if at least one skewed join operator.
47+
if (skewedJoins.isEmpty) {
4848
plan
4949
} else {
5050
plan.transformUp {
@@ -60,8 +60,8 @@ private[adaptive] object EliminateSkewOptimzeIntroducedShuffle extends Rule[Spar
6060
}
6161
}
6262

63-
private def existSkewedReader(plan: SparkPlan): Boolean =
64-
plan.find {
63+
private def existSkewedReader(child: SparkPlan): Boolean =
64+
child.find {
6565
// All CoalescedPartitionSpec and there are some duplicate specs.
6666
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitions) =>
6767
partitions.forall(_.isInstanceOf[CoalescedPartitionSpec]) &&

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ object OptimizeLocalShuffleReader extends Rule[SparkPlan] {
114114
s match {
115115
case proj1 @ ProjectExec(_, sort@SortExec(_, true, proj2@ProjectExec(_,
116116
c@CustomShuffleReaderExec(_: ShuffleQueryStageExec, _)), _)) =>
117-
proj1.withNewChildren(Seq(sort.withNewChildren(
118-
Seq(proj2.withNewChildren(Seq(createLocalReader(c)))))))
117+
proj1.withNewChildren(sort.withNewChildren(
118+
proj2.withNewChildren(createLocalReader(c) :: Nil) :: Nil) :: Nil)
119119
case limit @ LocalLimitExec(_, proj1@ProjectExec(_, sort@SortExec(_, true,
120120
proj2@ProjectExec(_, c@CustomShuffleReaderExec(_: ShuffleQueryStageExec, _)), _))) =>
121-
limit.withNewChildren(Seq(proj1.withNewChildren(Seq(sort.withNewChildren(
122-
Seq(proj2.withNewChildren(Seq(createLocalReader(c)))))))))
121+
limit.withNewChildren(proj1.withNewChildren(sort.withNewChildren(
122+
proj2.withNewChildren(createLocalReader(c) :: Nil) :: Nil) :: Nil) :: Nil)
123123
case _ => createLocalReader(s)
124124
}
125125
case s: SparkPlan =>
@@ -180,13 +180,15 @@ object OptimizeLocalShuffleReader extends Rule[SparkPlan] {
180180
case _ => false
181181
}
182182
case ProjectExec(_, sort @ SortExec(_, true, ProjectExec(_, CustomShuffleReaderExec(
183-
_: ShuffleQueryStageExec, _)), _))
183+
s: ShuffleQueryStageExec, _)), _))
184184
if conf.getConf(SQLConf.OPTIMIZE_RANGE_PARTITION_SKEW_ENABLED) =>
185+
supportLocalReader(s.shuffle) &&
185186
sort.getTagValue(OptimizeSkewedRangePartition.SKEWED_RANGE_PARTITION_TAG).nonEmpty
186187
case LocalLimitExec(_, ProjectExec(_, sort@SortExec(_, true, ProjectExec(_,
187-
CustomShuffleReaderExec(_: ShuffleQueryStageExec, _)), _)))
188+
CustomShuffleReaderExec(s: ShuffleQueryStageExec, _)), _)))
188189
if conf.getConf(SQLConf.OPTIMIZE_RANGE_PARTITION_SKEW_ENABLED) =>
189-
sort.getTagValue(OptimizeSkewedRangePartition.SKEWED_RANGE_PARTITION_TAG).nonEmpty
190+
supportLocalReader(s.shuffle) &&
191+
sort.getTagValue(OptimizeSkewedRangePartition.SKEWED_RANGE_PARTITION_TAG).nonEmpty
190192
case _ => false
191193
}
192194

0 commit comments

Comments
 (0)