Skip to content

Commit 9d210f5

Browse files
committed
Code review feedback, remove regex we don't need, reduce some log levels, add a Since annotaiton.
1 parent eb43f20 commit 9d210f5

File tree

5 files changed

+19
-22
lines changed

5 files changed

+19
-22
lines changed

core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import org.apache.spark.util.Utils
4747
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
4848
private[spark] class IndexShuffleBlockResolver(
4949
conf: SparkConf,
50+
// var for testing
5051
var _blockManager: BlockManager = null)
5152
extends ShuffleBlockResolver
5253
with Logging with MigratableResolver {
@@ -61,19 +62,14 @@ private[spark] class IndexShuffleBlockResolver(
6162
/**
6263
* Get the shuffle files that are stored locally. Used for block migrations.
6364
*/
64-
override def getStoredShuffles(): Set[ShuffleBlockInfo] = {
65-
// Matches ShuffleIndexBlockId name
66-
val pattern = "shuffle_(\\d+)_(\\d+)_.+\\.index".r
67-
val rootDirs = blockManager.diskBlockManager.localDirs
68-
// ExecutorDiskUtil puts things inside one level hashed sub directories
69-
val searchDirs = rootDirs.flatMap(_.listFiles()).filter(_.isDirectory()) ++ rootDirs
70-
val filenames = searchDirs.flatMap(_.list())
71-
logDebug(s"Got block files ${filenames.toList}")
72-
filenames.flatMap { fname =>
73-
pattern.findAllIn(fname).matchData.map {
74-
matched => ShuffleBlockInfo(matched.group(1).toInt, matched.group(2).toLong)
75-
}
76-
}.toSet
65+
override def getStoredShuffles(): Seq[ShuffleBlockInfo] = {
66+
val allBlocks = blockManager.diskBlockManager.getAllBlocks()
67+
allBlocks.flatMap {
68+
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
69+
Some(ShuffleBlockInfo(shuffleId, mapId))
70+
case _ =>
71+
None
72+
}
7773
}
7874

7975
/**

core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.shuffle
1919

20-
import org.apache.spark.annotation.Experimental
20+
import org.apache.spark.annotation.{Experimental, Since}
2121
import org.apache.spark.network.buffer.ManagedBuffer
2222
import org.apache.spark.network.client.StreamCallbackWithID
2323
import org.apache.spark.serializer.SerializerManager
@@ -28,11 +28,12 @@ import org.apache.spark.storage.BlockId
2828
* An experimental trait to allow Spark to migrate shuffle blocks.
2929
*/
3030
@Experimental
31+
@Since("3.1.0")
3132
trait MigratableResolver {
3233
/**
3334
* Get the shuffle ids that are stored locally. Used for block migrations.
3435
*/
35-
def getStoredShuffles(): Set[ShuffleBlockInfo]
36+
def getStoredShuffles(): Seq[ShuffleBlockInfo]
3637

3738
/**
3839
* Write a provided shuffle block as a stream. Used for block migrations.

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ private[spark] class BlockManager(
668668
}
669669

670670
if (blockId.isShuffle) {
671-
logInfo(s"Putting shuffle block ${blockId}")
671+
logDebug(s"Putting shuffle block ${blockId}")
672672
try {
673673
return migratableResolver.putShuffleBlockAsStream(blockId, serializerManager)
674674
} catch {
@@ -677,7 +677,7 @@ private[spark] class BlockManager(
677677
s"resolver ${shuffleManager.shuffleBlockResolver}")
678678
}
679679
}
680-
logInfo(s"Putting regular block ${blockId}")
680+
logDebug(s"Putting regular block ${blockId}")
681681
// All other blocks
682682
val (_, tmpFile) = diskBlockManager.createTempLocalBlock()
683683
val channel = new CountingWritableChannel(

core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ import org.apache.spark.util.ThreadUtils
3535
* It creates a Thread to retry offloading all RDD cache and Shuffle blocks
3636
*/
3737
private[storage] class BlockManagerDecommissioner(
38-
conf: SparkConf,
39-
bm: BlockManager) extends Logging {
38+
conf: SparkConf,
39+
bm: BlockManager) extends Logging {
4040

4141
private val maxReplicationFailuresForDecommission =
4242
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
@@ -195,7 +195,7 @@ private[storage] class BlockManagerDecommissioner(
195195
private[storage] def refreshOffloadingShuffleBlocks(): Unit = {
196196
// Update the queue of shuffles to be migrated
197197
logInfo("Offloading shuffle blocks")
198-
val localShuffles = bm.migratableResolver.getStoredShuffles()
198+
val localShuffles = bm.migratableResolver.getStoredShuffles().toSet
199199
val newShufflesToMigrate = localShuffles.diff(migratingShuffles).toSeq
200200
shufflesToMigrate.addAll(newShufflesToMigrate.map(x => (x, 0)).asJava)
201201
migratingShuffles ++= newShufflesToMigrate
@@ -223,7 +223,7 @@ private[storage] class BlockManagerDecommissioner(
223223
private[storage] def stopOffloadingShuffleBlocks(): Unit = {
224224
logInfo("Stopping offloading shuffle blocks.")
225225
// Stop as gracefully as possible.
226-
migrationPeers.values.foreach{_.running = false}
226+
migrationPeers.values.foreach{ _.running = false }
227227
shuffleMigrationPool.shutdown()
228228
shuffleMigrationPool.shutdownNow()
229229
}

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite with Matchers {
4444
ids: Set[(Int, Long, Int)]): Unit = {
4545

4646
when(mockMigratableShuffleResolver.getStoredShuffles())
47-
.thenReturn(ids.map(triple => ShuffleBlockInfo(triple._1, triple._2)).toSet)
47+
.thenReturn(ids.map(triple => ShuffleBlockInfo(triple._1, triple._2)).toSeq)
4848

4949
ids.foreach { case (shuffleId: Int, mapId: Long, reduceId: Int) =>
5050
when(mockMigratableShuffleResolver.getMigrationBlocks(mc.any()))

0 commit comments

Comments
 (0)