Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

addShutdownHook()
private val shutdownHook = addShutdownHook()

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
Expand Down Expand Up @@ -134,17 +134,22 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}

private def addShutdownHook() {
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
private def addShutdownHook(): Thread = {
val shutdownHook = new Thread("delete Spark local dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DiskBlockManager.this.stop()
}
})
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook
}

/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
Runtime.getRuntime.removeShutdownHook(shutdownHook)

// Only perform cleanup if an external service is not serving our shuffle files.
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
localDirs.foreach { localDir =>
Expand Down