Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 42 additions & 45 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private[spark] class BlockManager(
logInfo(s"Reporting ${blockInfoManager.size} blocks to the master.")
for ((blockId, info) <- blockInfoManager.entries) {
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
if (info.tellMaster && !tryToReportBlockStatus(blockId, status)) {
logError(s"Failed to report $blockId to master; giving up.")
return
}
Expand Down Expand Up @@ -298,7 +298,7 @@ private[spark] class BlockManager(

/**
* Get the BlockStatus for the block identified by the given ID, if it exists.
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
* NOTE: This is mainly for testing.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfoManager.get(blockId).map { info =>
Expand Down Expand Up @@ -333,10 +333,9 @@ private[spark] class BlockManager(
*/
private def reportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Unit = {
val needReregister = !tryToReportBlockStatus(blockId, info, status, droppedMemorySize)
val needReregister = !tryToReportBlockStatus(blockId, status, droppedMemorySize)
if (needReregister) {
logInfo(s"Got told to re-register updating block $blockId")
// Re-registering will report our new block for free.
Expand All @@ -352,17 +351,12 @@ private[spark] class BlockManager(
*/
private def tryToReportBlockStatus(
blockId: BlockId,
info: BlockInfo,
status: BlockStatus,
droppedMemorySize: Long = 0L): Boolean = {
if (info.tellMaster) {
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val onDiskSize = status.diskSize
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
} else {
true
}
val storageLevel = status.storageLevel
val inMemSize = Math.max(status.memSize, droppedMemorySize)
val onDiskSize = status.diskSize
master.updateBlockInfo(blockManagerId, blockId, storageLevel, inMemSize, onDiskSize)
}

/**
Expand All @@ -374,7 +368,7 @@ private[spark] class BlockManager(
info.synchronized {
info.level match {
case null =>
BlockStatus(StorageLevel.NONE, memSize = 0L, diskSize = 0L)
BlockStatus.empty
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
Expand Down Expand Up @@ -807,12 +801,10 @@ private[spark] class BlockManager(
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
if (tellMaster) {
reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
if (tellMaster && info.tellMaster) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing. What is the difference between tellMaster and info.tellMaster ? For instance, why isn't this tellMaster || info.tellMaster. (I understand that && is the current behavior, but I'm not sure why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In BlockInfo, which tracks metadata about an individual block (such as the desired storage level that the block should be stored at), the tellMaster field tracks whether the master should be informed of state changes to this block. This appears to be false only for blocks which are deserialized copies of TorrentBroadcasts (see the putSingle calls in TorrentBroadcast.scala).

The tellMaster parameter, on the other hand, controls whether this particular block-status-changing operation should send a metadata update to the master. The only place where this seems to be false is in the removeRdd code path, which is used for bulk-removal of an RDD's cached blocks. In this path, the master first performs a bulk deletion of block statuses in its own metadata table and then asynchronously deletes the blocks from block managers. I think the goal here is to avoid sending one status update per deleted block since that might result in a huge flood of RPC traffic at the master and could cause bad message queueing (since the block manager metadata-handling endpoint is single-threaded).

If we go way back, I think that one original rationale of this may have been to avoid sending status updates for map outputs, which at one time may have been persisted on disk via the BlockManager rather than bypassing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, looks like my memory was correct:

Take a look at ShuffleMapTask in Spark v0.6.0:

val blockManager = SparkEnv.get.blockManager
    for (i <- 0 until numOutputSplits) {
      val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
      // Get a Scala iterator from Java map
      val iter: Iterator[(Any, Any)] = bucketIterators(i)
      val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
      compressedSizes(i) = MapOutputTracker.compressSize(size)
    }

Here, false is the tellMaster parameter.

This functionality in the BlockManager dates back to the original Spark Streaming engine improvement patch: 63051dd

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds like this is not trivial to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we rename the flag to removedByMaster or similar? Might also consider renaming tellMaster to trackedByMaster but that's probably a bigger change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that a big clarifying change would be to internally refactor this so that the tellMaster parameter is omitted from all calls and so that the tellMaster = false case for cached and reassembled broadcast blocks is inferred automatically from the block id. This would clean up a lot of the function signatures and would make the weird "don't tell the master" exception much more readily apparent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to rename / clarify this, I'd prefer to just do that bigger change separately in a patch which isn't intended for branch-2.0.

reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
Expand Down Expand Up @@ -961,15 +953,12 @@ private[spark] class BlockManager(
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory, externalBlockStore, or disk store,
// tell the master about it.
// Now that the block is in either the memory or disk store, tell the master about it.
info.size = size
if (tellMaster) {
reportBlockStatus(blockId, info, putBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> putBlockStatus)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
val remoteStartTime = System.currentTimeMillis
Expand Down Expand Up @@ -1271,12 +1260,10 @@ private[spark] class BlockManager(

val status = getCurrentBlockStatus(blockId, info)
if (info.tellMaster) {
reportBlockStatus(blockId, info, status, droppedMemorySize)
reportBlockStatus(blockId, status, droppedMemorySize)
}
if (blockIsUpdated) {
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
addUpdatedBlockStatusToTaskMetrics(blockId, status)
}
status.storageLevel
}
Expand Down Expand Up @@ -1316,21 +1303,31 @@ private[spark] class BlockManager(
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found in either " +
"the disk, memory, or external block store")
}
blockInfoManager.removeBlock(blockId)
val removeBlockStatus = getCurrentBlockStatus(blockId, info)
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, info, removeBlockStatus)
}
Option(TaskContext.get()).foreach { c =>
c.taskMetrics().incUpdatedBlockStatuses(blockId -> removeBlockStatus)
}
removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
}
}

/**
* Internal version of [[removeBlock()]] which assumes that the caller already holds a write
* lock on the block.
*/
private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
val removedFromMemory = memoryStore.remove(blockId)
val removedFromDisk = diskStore.remove(blockId)
if (!removedFromMemory && !removedFromDisk) {
logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
}
blockInfoManager.removeBlock(blockId)
if (tellMaster) {
reportBlockStatus(blockId, BlockStatus.empty)
}
}

private def addUpdatedBlockStatusToTaskMetrics(blockId: BlockId, status: BlockStatus): Unit = {
Option(TaskContext.get()).foreach { c =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One new change: I decided to move this duplicated logic into this helper function. See c3cc277. This is going to make it easier to skip the block status update in certain places in my next patch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

c.taskMetrics().incUpdatedBlockStatuses(blockId -> status)
}
}

Expand Down