Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -837,11 +837,11 @@ private[spark] class BlockManager(
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
* Return the block status if the given block has been updated, else None.
* Return the block status and dropped memory size if the given block has been updated, else None.
*/
def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
data: Either[Array[Any], ByteBuffer]): Option[(BlockStatus, Long)] = {

logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
Expand Down Expand Up @@ -873,10 +873,8 @@ private[spark] class BlockManager(
}

// Actually drop from memory store
val droppedMemorySize =
if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val blockIsRemoved = memoryStore.remove(blockId)
if (blockIsRemoved) {
val droppedMemorySize = memoryStore.removeWithoutUpdateMemorySize(blockId)
if (droppedMemorySize > 0) {
blockIsUpdated = true
} else {
logWarning(s"Block $blockId could not be dropped from memory as it does not exist")
Expand All @@ -891,7 +889,7 @@ private[spark] class BlockManager(
blockInfo.remove(blockId)
}
if (blockIsUpdated) {
return Some(status)
return Some(status -> droppedMemorySize)
}
}
}
Expand Down
232 changes: 149 additions & 83 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector

private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean, var dropping: Boolean = false)

/**
* Stores blocks in memory, either as Arrays of deserialized Java objects or as
Expand All @@ -40,6 +40,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)

@volatile private var currentMemory = 0L

private var pendingUnrollMemory = 0L

// Ensure only one thread is putting, and if necessary, dropping blocks at any given time
private val accountingLock = new Object

Expand Down Expand Up @@ -184,6 +186,17 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

def removeWithoutUpdateMemorySize(blockId: BlockId) = {
entries.synchronized {
val entry = entries.remove(blockId)
if (entry != null) {
entry.size
} else {
0L
}
}
}

override def clear() {
entries.synchronized {
entries.clear()
Expand Down Expand Up @@ -239,17 +252,31 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
// Hold the accounting lock, in case another thread concurrently puts a block that
// takes up the unrolling space we just ensured here
accountingLock.synchronized {
if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
// If the first request is not granted, try again after ensuring free space
// If there is still not enough space, give up and drop the partition
val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
if (spaceToEnsure > 0) {
val result = ensureFreeSpace(blockId, spaceToEnsure)
droppedBlocks ++= result.droppedBlocks
val spaceToEnsure = accountingLock.synchronized {
if (reserveUnrollMemoryForThisThread(amountToRequest)) {
0L
} else if (amountToRequest > maxMemory) {
keepUnrolling = false
0L
} else {
val memoryNeeded = currentUnrollMemory + amountToRequest + pendingUnrollMemory - freeMemory
val memoryAvailable = maxUnrollMemory - currentUnrollMemory - pendingUnrollMemory
pendingUnrollMemory += amountToRequest
math.min(memoryNeeded, memoryAvailable)
}
}
if (spaceToEnsure > 0) {
val task = planFreeSpace(blockId, spaceToEnsure, true)
if (task.isDefined) {
try {
droppedBlocks ++= task.get.runTask()
} finally {
task.get.updateCurrentMemorySizeInTransaction {
pendingUnrollMemory -= amountToRequest
keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
}
}
} else {
keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
}
}
Expand Down Expand Up @@ -304,40 +331,33 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {

/* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
* to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
* been released, it must be ensured that those to-be-dropped blocks are not double counted
* for freeing up more space for another block that needs to be put. Only then the actually
* dropping of blocks (and writing to disk if necessary) can proceed in parallel. */

var putSuccess = false
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

accountingLock.synchronized {
val freeSpaceResult = ensureFreeSpace(blockId, size)
val enoughFreeSpace = freeSpaceResult.success
droppedBlocks ++= freeSpaceResult.droppedBlocks

if (enoughFreeSpace) {
val task = planFreeSpace(blockId, size)
if (task.isDefined) {
droppedBlocks ++= task.get.runTask()
task.get.updateCurrentMemorySizeInTransaction {
val entry = new MemoryEntry(value, size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
putSuccess = true
entries.put(blockId, entry)
currentMemory += size
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
putSuccess = true
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
Left(value.asInstanceOf[Array[Any]])
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
Left(value.asInstanceOf[Array[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockResult = blockManager.dropFromMemory(blockId, data)
droppedBlockResult.foreach { r =>
assert(r._2 == 0)
droppedBlocks += ((blockId, r._1))
}
}
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
Expand All @@ -354,66 +374,70 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
*
* Return whether there is enough free space, along with the blocks dropped in the process.
*/
private def ensureFreeSpace(
private def planFreeSpace(
blockIdToAdd: BlockId,
space: Long): ResultWithDroppedBlocks = {
space: Long,
mustDrop: Boolean = false): Option[DroppingTask] = {
logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")

val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

if (space > maxMemory) {
logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
return None
}

// Take into account the amount of memory currently occupied by unrolling blocks
val actualFreeMemory = freeMemory - currentUnrollMemory
accountingLock.synchronized {
// Take into account the amount of memory currently occupied by unrolling blocks
val actualFreeMemory = if (mustDrop) 0L else freeMemory - currentUnrollMemory

if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L
if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L

// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
selectedMemory += pair.getValue.size
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
val entry = pair.getValue()
if (!entry.dropping && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))) {
selectedBlocks += blockId
selectedMemory += entry.size
}
}
}
}

if (actualFreeMemory + selectedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one thread should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
if (actualFreeMemory + selectedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
val toBeDroppedBlocks = new ArrayBuffer[ToBeDroppedBlock]
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one thread should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
toBeDroppedBlocks += ToBeDroppedBlock(blockId, data)
entry.dropping = true
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
Some(new DroppingTask(toBeDroppedBlocks))
} else {
logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " +
"from the same RDD")
None
}
return ResultWithDroppedBlocks(success = true, droppedBlocks)
} else {
logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " +
"from the same RDD")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
Some(new DroppingTask(Nil))
}
}
ResultWithDroppedBlocks(success = true, droppedBlocks)
}

override def contains(blockId: BlockId): Boolean = {
Expand All @@ -426,7 +450,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
*/
private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
accountingLock.synchronized {
val granted = freeMemory > currentUnrollMemory + memory
val granted = freeMemory > currentUnrollMemory + memory + pendingUnrollMemory
if (granted) {
val threadId = Thread.currentThread().getId
unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
Expand Down Expand Up @@ -467,8 +491,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
}

private class DroppingTask(blocks: Seq[ToBeDroppedBlock]) {
private var droppedMemorySize = 0L

def runTask() = {
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
var droppingDoneCount = 0
try {
blocks.foreach { block =>
val dropResult = blockManager.dropFromMemory(block.id, block.data)
dropResult.foreach { r =>
droppedBlocks += block.id -> r._1
droppedMemorySize += r._2
}
droppingDoneCount += 1
}
droppedBlocks
} catch {
case e: Exception =>
blocks.drop(droppingDoneCount).foreach { block =>
entries.synchronized{
val entry = entries.get(block.id)
if (entry != null) entry.dropping = false
}
}
if (droppedMemorySize > 0) {
entries.synchronized { currentMemory -= droppedMemorySize }
droppedMemorySize = 0
}
throw e
}
}

def updateCurrentMemorySizeInTransaction(f: => Unit): Unit = {
entries.synchronized {
currentMemory -= droppedMemorySize
f
}
}
}
}

private[spark] case class ResultWithDroppedBlocks(
success: Boolean,
droppedBlocks: Seq[(BlockId, BlockStatus)])

private[spark] case class ToBeDroppedBlock(id: BlockId, data: Either[Array[Any], ByteBuffer])