diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e9f8a60978220..226d8e59a402a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -353,41 +353,49 @@ object SQLConf { .booleanConf .createWithDefault(false) - val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = - buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize") - .doc("The target post-shuffle input size in bytes of a task.") - .bytesConf(ByteUnit.BYTE) - .createWithDefault(64 * 1024 * 1024) + val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED = + buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled") + .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables reducing " + + "the number of post-shuffle partitions based on map output statistics.") + .booleanConf + .createWithDefault(true) val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED = buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled") .doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " + "one by one, fetching continuous shuffle blocks for the same map task in batch can " + - "reduce IO and improve performance. Note, this feature also depends on a relocatable " + - "serializer and the concatenation support codec in use.") + "reduce IO and improve performance. Note, multiple continuous blocks exist in single " + + s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + + s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled, this feature also depends " + + "on a relocatable serializer and the concatenation support codec in use.") .booleanConf .createWithDefault(true) - val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED = - buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled") - .doc("When true and adaptive execution is enabled, this enables reducing the number of " + - "post-shuffle partitions based on map output statistics.") - .booleanConf - .createWithDefault(true) - val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions") - .doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.") + .doc("The advisory minimum number of post-shuffle partitions used when " + + s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " + + s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.") .intConf .checkValue(_ > 0, "The minimum shuffle partition number " + "must be a positive integer.") .createWithDefault(1) + val SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE = + buildConf("spark.sql.adaptive.shuffle.targetPostShuffleInputSize") + .doc("The target post-shuffle input size in bytes of a task. This configuration only has " + + s"an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + + s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(64 * 1024 * 1024) + val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS = buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions") .doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " + "This is used as the initial number of pre-shuffle partitions. By default it equals to " + - "spark.sql.shuffle.partitions") + "spark.sql.shuffle.partitions. This configuration only has an effect when " + + s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " + + s"'${REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key}' is enabled.") .intConf .checkValue(_ > 0, "The maximum shuffle partition number " + "must be a positive integer.") @@ -395,9 +403,9 @@ object SQLConf { val LOCAL_SHUFFLE_READER_ENABLED = buildConf("spark.sql.adaptive.shuffle.localShuffleReader.enabled") - .doc("When true and adaptive execution is enabled, this enables the optimization of" + - " converting the shuffle reader to local shuffle reader for the shuffle exchange" + - " of the broadcast hash join in probe side.") + .doc(s"When true and '${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled, this enables the " + + "optimization of converting the shuffle reader to local shuffle reader for the shuffle " + + "exchange of the broadcast hash join in probe side.") .booleanConf .createWithDefault(true) @@ -405,7 +413,8 @@ object SQLConf { buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin") .doc("The relation with a non-empty partition ratio lower than this config will not be " + "considered as the build side of a broadcast-hash join in adaptive execution regardless " + - "of its size.") + "of its size.This configuration only has an effect when " + + s"'${ADAPTIVE_EXECUTION_ENABLED.key}' is enabled.") .doubleConf .checkValue(_ >= 0, "The non-empty partition ratio must be positive number.") .createWithDefault(0.2) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 866b382a1d808..068e0164443dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf */ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { private def defaultNumPreShufflePartitions: Int = - if (conf.adaptiveExecutionEnabled) { + if (conf.adaptiveExecutionEnabled && conf.reducePostShufflePartitionsEnabled) { conf.maxNumPostShufflePartitions } else { conf.numShufflePartitions