Skip to content

Commit 2c66432

Browse files
committed
Follow up PR for SPARK-9853
1 parent 239ee3f commit 2c66432

File tree

3 files changed

+11
-16
lines changed

3 files changed

+11
-16
lines changed

core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
3636
serializerManager: SerializerManager = SparkEnv.get.serializerManager,
3737
blockManager: BlockManager = SparkEnv.get.blockManager,
3838
mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker,
39-
shouldBatchFetch: Boolean)
39+
shouldBatchFetch: Boolean = false)
4040
extends ShuffleReader[K, C] with Logging {
4141

4242
private val dep = handle.dependency

core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext
138138
taskContext,
139139
metrics,
140140
serializerManager,
141-
blockManager,
142-
shouldBatchFetch = false)
141+
blockManager)
143142

144143
assert(shuffleReader.read().length === keyValuePairsPerMap * numMaps)
145144

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,8 @@ object SQLConf {
355355
.bytesConf(ByteUnit.BYTE)
356356
.createWithDefault(64 * 1024 * 1024)
357357

358-
359358
val FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED =
360-
buildConf("spark.sql.adaptive.fetchShuffleBlocksInBatch.enabled")
359+
buildConf("spark.sql.adaptive.shuffle.fetchShuffleBlocksInBatch.enabled")
361360
.doc("Whether to fetch the continuous shuffle blocks in batch. Instead of fetching blocks " +
362361
"one by one, fetching continuous shuffle blocks for the same map task in batch can " +
363362
"reduce IO and improve performance. Note, this feature also depends on a relocatable " +
@@ -371,7 +370,7 @@ object SQLConf {
371370
.createWithDefault(false)
372371

373372
val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
374-
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
373+
buildConf("spark.sql.adaptive.shuffle.nonEmptyPartitionRatioForBroadcastJoin")
375374
.doc("The relation with a non-empty partition ratio lower than this config will not be " +
376375
"considered as the build side of a broadcast-hash join in adaptive execution regardless " +
377376
"of its size.")
@@ -380,22 +379,22 @@ object SQLConf {
380379
.createWithDefault(0.2)
381380

382381
val REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED =
383-
buildConf("spark.sql.adaptive.reducePostShufflePartitions.enabled")
382+
buildConf("spark.sql.adaptive.shuffle.reducePostShufflePartitions.enabled")
384383
.doc("When true and adaptive execution is enabled, this enables reducing the number of " +
385384
"post-shuffle partitions based on map output statistics.")
386385
.booleanConf
387386
.createWithDefault(true)
388387

389388
val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS =
390-
buildConf("spark.sql.adaptive.minNumPostShufflePartitions")
389+
buildConf("spark.sql.adaptive.shuffle.minNumPostShufflePartitions")
391390
.doc("The advisory minimum number of post-shuffle partitions used in adaptive execution.")
392391
.intConf
393392
.checkValue(_ > 0, "The minimum shuffle partition number " +
394393
"must be a positive integer.")
395394
.createWithDefault(1)
396395

397396
val SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS =
398-
buildConf("spark.sql.adaptive.maxNumPostShufflePartitions")
397+
buildConf("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions")
399398
.doc("The advisory maximum number of post-shuffle partitions used in adaptive execution. " +
400399
"This is used as the initial number of pre-shuffle partitions. By default it equals to " +
401400
"spark.sql.shuffle.partitions")
@@ -405,7 +404,7 @@ object SQLConf {
405404
.createOptional
406405

407406
val OPTIMIZE_LOCAL_SHUFFLE_READER_ENABLED =
408-
buildConf("spark.sql.adaptive.optimizedLocalShuffleReader.enabled")
407+
buildConf("spark.sql.adaptive.shuffle.optimizedLocalShuffleReader.enabled")
409408
.doc("When true and adaptive execution is enabled, this enables the optimization of" +
410409
" converting the shuffle reader to local shuffle reader for the shuffle exchange" +
411410
" of the broadcast hash join in probe side.")
@@ -2148,11 +2147,9 @@ class SQLConf extends Serializable with Logging {
21482147

21492148
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
21502149

2151-
def targetPostShuffleInputSize: Long =
2152-
getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
2150+
def targetPostShuffleInputSize: Long = getConf(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
21532151

2154-
def fetchShuffleBlocksInBatchEnabled: Boolean =
2155-
getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)
2152+
def fetchShuffleBlocksInBatchEnabled: Boolean = getConf(FETCH_SHUFFLE_BLOCKS_IN_BATCH_ENABLED)
21562153

21572154
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
21582155

@@ -2161,8 +2158,7 @@ class SQLConf extends Serializable with Logging {
21612158

21622159
def reducePostShufflePartitionsEnabled: Boolean = getConf(REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
21632160

2164-
def minNumPostShufflePartitions: Int =
2165-
getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
2161+
def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS)
21662162

21672163
def maxNumPostShufflePartitions: Int =
21682164
getConf(SHUFFLE_MAX_NUM_POSTSHUFFLE_PARTITIONS).getOrElse(numShufflePartitions)

0 commit comments

Comments
 (0)