Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,9 +434,10 @@ object SQLConf {

val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
.doc("The minimum number of shuffle partitions after coalescing. If not set, the default " +
"value is the default parallelism of the Spark cluster. This configuration only " +
s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
.doc("The suggested (not guaranteed) minimum number of shuffle partitions after " +
"coalescing. If not set, the default value is the default parallelism of the " +
"Spark cluster. This configuration only has an effect when " +
s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
.version("3.0.0")
.intConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ case class CustomShuffleReaderExec private(
// If it is a local shuffle reader with one mapper per task, then the output partitioning is
// the same as the plan before shuffle.
// TODO this check is based on assumptions of callers' behavior but is sufficient for now.
if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
if (partitionSpecs.nonEmpty &&
partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size ==
partitionSpecs.length) {
child match {
Expand Down Expand Up @@ -98,7 +99,7 @@ case class CustomShuffleReaderExec private(
}

@transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
if (partitionSpecs.nonEmpty && !isLocalReader && shuffleStage.get.mapStats.isDefined) {
val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId
Some(partitionSpecs.map {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ object OptimizeLocalShuffleReader {
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.shuffle.canChangeNumPartitions
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
s.shuffle.canChangeNumPartitions
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
// It's impossible that all the partitions are skewed, as we use median size to define skew.
assert(nonSkewSizes.nonEmpty)
math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
if (nonSkewSizes.isEmpty) {
advisorySize
} else {
math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
}
}

/**
Expand Down Expand Up @@ -297,7 +299,7 @@ private object ShuffleStage {
Some(ShuffleStageInfo(s, mapStats, partitions))

case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs)
if s.mapStats.isDefined =>
if s.mapStats.isDefined && partitionSpecs.nonEmpty =>
val mapStats = s.mapStats.get
val sizes = mapStats.bytesByPartitionId
val partitions = partitionSpecs.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ object ShufflePartitionsUtil extends Logging {
var latestSplitPoint = 0
var coalescedSize = 0L
var i = 0

def createPartitionSpec(): Unit = {
// Skip empty inputs, as it is a waste to launch an empty task.
if (coalescedSize > 0) {
partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
}
}

while (i < numPartitions) {
// We calculate the total size of i-th shuffle partitions from all shuffles.
var totalSizeOfCurrentPartition = 0L
Expand All @@ -103,7 +111,7 @@ object ShufflePartitionsUtil extends Logging {
// If including the `totalSizeOfCurrentPartition` would exceed the target size, then start a
// new coalesced partition.
if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition > targetSize) {
partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
createPartitionSpec()
latestSplitPoint = i
// reset postShuffleInputSize.
coalescedSize = totalSizeOfCurrentPartition
Expand All @@ -112,8 +120,7 @@ object ShufflePartitionsUtil extends Logging {
}
i += 1
}
partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)

createPartitionSpec()
partitionSpecs
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
test("1 shuffle") {
val targetSize = 100

{
// All bytes per partition are 0.
val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, targetSize)
}

{
// Some bytes per partition are 0 and total size is less than the target size.
// 1 coalesced partition is expected.
Expand All @@ -70,8 +63,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(1, 2),
CoalescedPartitionSpec(2, 3),
CoalescedPartitionSpec(3, 4),
CoalescedPartitionSpec(4, 5))
CoalescedPartitionSpec(3, 4))
checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, targetSize)
}

Expand Down Expand Up @@ -108,17 +100,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
}
}

{
// All bytes per partition are 0.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs,
targetSize)
}

{
// Some bytes per partition are 0.
// 1 coalesced partition is expected.
Expand Down Expand Up @@ -217,7 +198,18 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
// the size of data is 0.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
Seq.empty, targetSize, minNumPartitions)
}


{
// The minimal number of coalesced partitions is not enforced because
// there are too many 0-size partitions.
val bytesByPartitionId1 = Array[Long](200, 0, 0)
val bytesByPartitionId2 = Array[Long](100, 0, 0)
val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 1))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs,
Expand Down Expand Up @@ -251,6 +243,37 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
}
}

test("do not create partition spec for 0-size partitions") {
val targetSize = 100
val minNumPartitions = 2

{
// 1 shuffle: All bytes per partition are 0, no partition spec created.
val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
checkEstimation(Array(bytesByPartitionId), Seq.empty, targetSize)
}

{
// 2 shuffles: All bytes per partition are 0, no partition spec created.
val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), Seq.empty, targetSize)
}

{
// No partition spec created for the 0-size partitions.
val bytesByPartitionId1 = Array[Long](200, 0, 0, 0, 0)
val bytesByPartitionId2 = Array[Long](100, 0, 300, 0, 0)
val expectedPartitionSpecs = Seq(
CoalescedPartitionSpec(0, 1),
CoalescedPartitionSpec(2, 3))
checkEstimation(
Array(bytesByPartitionId1, bytesByPartitionId2),
expectedPartitionSpecs,
targetSize, minNumPartitions)
}
}

test("splitSizeListByTargetSize") {
val targetSize = 100

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,17 +157,17 @@ class AdaptiveQueryExecSuite
val localShuffleRDD0 = localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
val localShuffleRDD1 = localReaders(1).execute().asInstanceOf[ShuffledRowRDD]
// The pre-shuffle partition size is [0, 0, 0, 72, 0]
// And the partitionStartIndices is [0, 3, 4], so advisoryParallelism = 3.
// We exclude the 0-size partitions, so only one partition, advisoryParallelism = 1
// the final parallelism is
// math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1
// math.max(1, advisoryParallelism / numMappers): math.max(1, 1/2) = 1
// and the partitions length is 1 * numMappers = 2
assert(localShuffleRDD0.getPartitions.length == 2)
// The pre-shuffle partition size is [0, 72, 0, 72, 126]
// And the partitionStartIndices is [0, 1, 2, 3, 4], so advisoryParallelism = 5.
// We exclude the 0-size partitions, so only 3 partition, advisoryParallelism = 3
// the final parallelism is
// math.max(1, advisoryParallelism / numMappers): math.max(1, 5/2) = 2
// and the partitions length is 2 * numMappers = 4
assert(localShuffleRDD1.getPartitions.length == 4)
// math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1
// and the partitions length is 1 * numMappers = 2
assert(localShuffleRDD1.getPartitions.length == 2)
}
}

Expand Down Expand Up @@ -197,6 +197,38 @@ class AdaptiveQueryExecSuite
}
}

test("Empty stage coalesced to 0-partition RDD") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") {
val df1 = spark.range(10).withColumn("a", 'id)
val df2 = spark.range(10).withColumn("b", 'id)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val testDf = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined)
val coalescedReaders = collect(plan) {
case r: CustomShuffleReaderExec => r
}
assert(coalescedReaders.length == 2)
coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty))
}

withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
val testDf = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count()
checkAnswer(testDf, Seq())
val plan = testDf.queryExecution.executedPlan
assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
val coalescedReaders = collect(plan) {
case r: CustomShuffleReaderExec => r
}
assert(coalescedReaders.length == 2, s"$plan")
coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty))
}
}
}

test("Scalar subquery") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
Expand Down Expand Up @@ -647,12 +679,13 @@ class AdaptiveQueryExecSuite
// Partition 0: both left and right sides are skewed, left side is divided
// into 2 splits and right side is divided into 4 splits, so
// 2 x 4 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the
// size is 0.
// Partition 4: only left side is skewed, and divide into 2 splits, so
// 2 sub-partitions.
// So total (8 + 1 + 3) partitions.
// So total (8 + 0 + 2) partitions.
val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
checkSkewJoin(innerSmj, 8 + 1 + 2)
checkSkewJoin(innerSmj, 8 + 0 + 2)

// skewed left outer join optimization
val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
Expand All @@ -661,12 +694,13 @@ class AdaptiveQueryExecSuite
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, but left join can't split right side,
// so only left side is divided into 2 splits, and thus 2 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the
// size is 0.
// Partition 4: only left side is skewed, and divide into 2 splits, so
// 2 sub-partitions.
// So total (2 + 1 + 2) partitions.
// So total (2 + 0 + 2) partitions.
val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
checkSkewJoin(leftSmj, 2 + 1 + 2)
checkSkewJoin(leftSmj, 2 + 0 + 2)

// skewed right outer join optimization
val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
Expand All @@ -675,12 +709,13 @@ class AdaptiveQueryExecSuite
// right stats:[6292, 0, 0, 0, 0]
// Partition 0: both left and right sides are skewed, but right join can't split left side,
// so only right side is divided into 4 splits, and thus 4 sub-partitions.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
// Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but it's ignored as the
// size is 0.
// Partition 4: only left side is skewed, but right join can't split left side, so just
// 1 partition.
// So total (4 + 1 + 1) partitions.
// So total (4 + 0 + 1) partitions.
val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
checkSkewJoin(rightSmj, 4 + 1 + 1)
checkSkewJoin(rightSmj, 4 + 0 + 1)
}
}
}
Expand Down Expand Up @@ -852,7 +887,7 @@ class AdaptiveQueryExecSuite
}.head
assert(!reader.isLocalReader)
assert(reader.hasSkewedPartition)
assert(reader.hasCoalescedPartition)
assert(!reader.hasCoalescedPartition) // 0-size partitions are ignored.
assert(reader.metrics.contains("numSkewedPartitions"))
assert(reader.metrics("numSkewedPartitions").value > 0)
}
Expand Down