diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d1bee3d2c033..03b2f954cf2d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -837,11 +837,11 @@ private[spark] class BlockManager( * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. * - * Return the block status if the given block has been updated, else None. + * Return the block status and dropped memory size if the given block has been updated, else None. */ def dropFromMemory( blockId: BlockId, - data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { + data: Either[Array[Any], ByteBuffer]): Option[(BlockStatus, Long)] = { logInfo(s"Dropping block $blockId from memory") val info = blockInfo.get(blockId).orNull @@ -873,10 +873,8 @@ private[spark] class BlockManager( } // Actually drop from memory store - val droppedMemorySize = - if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L - val blockIsRemoved = memoryStore.remove(blockId) - if (blockIsRemoved) { + val droppedMemorySize = memoryStore.removeWithoutUpdateMemorySize(blockId) + if (droppedMemorySize > 0) { blockIsUpdated = true } else { logWarning(s"Block $blockId could not be dropped from memory as it does not exist") @@ -891,7 +889,7 @@ private[spark] class BlockManager( blockInfo.remove(blockId) } if (blockIsUpdated) { - return Some(status) + return Some(status -> droppedMemorySize) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 0a09c24d6187..49dd35b4316d 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector -private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) +private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean, var dropping: Boolean = false) /** * Stores blocks in memory, either as Arrays of deserialized Java objects or as @@ -40,6 +40,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) @volatile private var currentMemory = 0L + private var pendingUnrollMemory = 0L + // Ensure only one thread is putting, and if necessary, dropping blocks at any given time private val accountingLock = new Object @@ -184,6 +186,17 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } + def removeWithoutUpdateMemorySize(blockId: BlockId) = { + entries.synchronized { + val entry = entries.remove(blockId) + if (entry != null) { + entry.size + } else { + 0L + } + } + } + override def clear() { entries.synchronized { entries.clear() @@ -239,17 +252,31 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - // Hold the accounting lock, in case another thread concurrently puts a block that - // takes up the unrolling space we just ensured here - accountingLock.synchronized { - if (!reserveUnrollMemoryForThisThread(amountToRequest)) { - // If the first request is not granted, try again after ensuring free space - // If there is still not enough space, give up and drop the partition - val spaceToEnsure = maxUnrollMemory - currentUnrollMemory - if (spaceToEnsure > 0) { - val result = ensureFreeSpace(blockId, spaceToEnsure) - droppedBlocks ++= result.droppedBlocks + val spaceToEnsure = accountingLock.synchronized { + if (reserveUnrollMemoryForThisThread(amountToRequest)) { + 0L + } else if (amountToRequest > maxMemory) { + keepUnrolling = false + 0L + } else { + val memoryNeeded = currentUnrollMemory + amountToRequest + pendingUnrollMemory - freeMemory + val memoryAvailable = maxUnrollMemory - currentUnrollMemory - pendingUnrollMemory + pendingUnrollMemory += amountToRequest + math.min(memoryNeeded, memoryAvailable) + } + } + if (spaceToEnsure > 0) { + val task = planFreeSpace(blockId, spaceToEnsure, true) + if (task.isDefined) { + try { + droppedBlocks ++= task.get.runTask() + } finally { + task.get.updateCurrentMemorySizeInTransaction { + pendingUnrollMemory -= amountToRequest + keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest) + } } + } else { keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest) } } @@ -304,40 +331,33 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) size: Long, deserialized: Boolean): ResultWithDroppedBlocks = { - /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks - * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has - * been released, it must be ensured that those to-be-dropped blocks are not double counted - * for freeing up more space for another block that needs to be put. Only then the actually - * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ - var putSuccess = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - accountingLock.synchronized { - val freeSpaceResult = ensureFreeSpace(blockId, size) - val enoughFreeSpace = freeSpaceResult.success - droppedBlocks ++= freeSpaceResult.droppedBlocks - - if (enoughFreeSpace) { + val task = planFreeSpace(blockId, size) + if (task.isDefined) { + droppedBlocks ++= task.get.runTask() + task.get.updateCurrentMemorySizeInTransaction { val entry = new MemoryEntry(value, size, deserialized) - entries.synchronized { - entries.put(blockId, entry) - currentMemory += size - } - val valuesOrBytes = if (deserialized) "values" else "bytes" - logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( - blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) - putSuccess = true + entries.put(blockId, entry) + currentMemory += size + } + val valuesOrBytes = if (deserialized) "values" else "bytes" + logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( + blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) + putSuccess = true + } else { + // Tell the block manager that we couldn't put it in memory so that it can drop it to + // disk if the block allows disk storage. + val data = if (deserialized) { + Left(value.asInstanceOf[Array[Any]]) } else { - // Tell the block manager that we couldn't put it in memory so that it can drop it to - // disk if the block allows disk storage. - val data = if (deserialized) { - Left(value.asInstanceOf[Array[Any]]) - } else { - Right(value.asInstanceOf[ByteBuffer].duplicate()) - } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + Right(value.asInstanceOf[ByteBuffer].duplicate()) + } + val droppedBlockResult = blockManager.dropFromMemory(blockId, data) + droppedBlockResult.foreach { r => + assert(r._2 == 0) + droppedBlocks += ((blockId, r._1)) } } ResultWithDroppedBlocks(putSuccess, droppedBlocks) @@ -354,66 +374,70 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * * Return whether there is enough free space, along with the blocks dropped in the process. */ - private def ensureFreeSpace( + private def planFreeSpace( blockIdToAdd: BlockId, - space: Long): ResultWithDroppedBlocks = { + space: Long, + mustDrop: Boolean = false): Option[DroppingTask] = { logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory") - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - if (space > maxMemory) { logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit") - return ResultWithDroppedBlocks(success = false, droppedBlocks) + return None } - // Take into account the amount of memory currently occupied by unrolling blocks - val actualFreeMemory = freeMemory - currentUnrollMemory + accountingLock.synchronized { + // Take into account the amount of memory currently occupied by unrolling blocks + val actualFreeMemory = if (mustDrop) 0L else freeMemory - currentUnrollMemory - if (actualFreeMemory < space) { - val rddToAdd = getRddId(blockIdToAdd) - val selectedBlocks = new ArrayBuffer[BlockId] - var selectedMemory = 0L + if (actualFreeMemory < space) { + val rddToAdd = getRddId(blockIdToAdd) + val selectedBlocks = new ArrayBuffer[BlockId] + var selectedMemory = 0L - // This is synchronized to ensure that the set of entries is not changed - // (because of getValue or getBytes) while traversing the iterator, as that - // can lead to exceptions. - entries.synchronized { - val iterator = entries.entrySet().iterator() - while (actualFreeMemory + selectedMemory < space && iterator.hasNext) { - val pair = iterator.next() - val blockId = pair.getKey - if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { - selectedBlocks += blockId - selectedMemory += pair.getValue.size + // This is synchronized to ensure that the set of entries is not changed + // (because of getValue or getBytes) while traversing the iterator, as that + // can lead to exceptions. + entries.synchronized { + val iterator = entries.entrySet().iterator() + while (actualFreeMemory + selectedMemory < space && iterator.hasNext) { + val pair = iterator.next() + val blockId = pair.getKey + val entry = pair.getValue() + if (!entry.dropping && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))) { + selectedBlocks += blockId + selectedMemory += entry.size + } } } - } - if (actualFreeMemory + selectedMemory >= space) { - logInfo(s"${selectedBlocks.size} blocks selected for dropping") - for (blockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(blockId) } - // This should never be null as only one thread should be dropping - // blocks and removing entries. However the check is still here for - // future safety. - if (entry != null) { - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[Array[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + if (actualFreeMemory + selectedMemory >= space) { + logInfo(s"${selectedBlocks.size} blocks selected for dropping") + val toBeDroppedBlocks = new ArrayBuffer[ToBeDroppedBlock] + for (blockId <- selectedBlocks) { + val entry = entries.synchronized { entries.get(blockId) } + // This should never be null as only one thread should be dropping + // blocks and removing entries. However the check is still here for + // future safety. + if (entry != null) { + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[Array[Any]]) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + } + toBeDroppedBlocks += ToBeDroppedBlock(blockId, data) + entry.dropping = true } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } + Some(new DroppingTask(toBeDroppedBlocks)) + } else { + logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " + + "from the same RDD") + None } - return ResultWithDroppedBlocks(success = true, droppedBlocks) } else { - logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " + - "from the same RDD") - return ResultWithDroppedBlocks(success = false, droppedBlocks) + Some(new DroppingTask(Nil)) } } - ResultWithDroppedBlocks(success = true, droppedBlocks) } override def contains(blockId: BlockId): Boolean = { @@ -426,7 +450,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) */ private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = { accountingLock.synchronized { - val granted = freeMemory > currentUnrollMemory + memory + val granted = freeMemory > currentUnrollMemory + memory + pendingUnrollMemory if (granted) { val threadId = Thread.currentThread().getId unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory @@ -467,8 +491,50 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized { unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) } + + private class DroppingTask(blocks: Seq[ToBeDroppedBlock]) { + private var droppedMemorySize = 0L + + def runTask() = { + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + var droppingDoneCount = 0 + try { + blocks.foreach { block => + val dropResult = blockManager.dropFromMemory(block.id, block.data) + dropResult.foreach { r => + droppedBlocks += block.id -> r._1 + droppedMemorySize += r._2 + } + droppingDoneCount += 1 + } + droppedBlocks + } catch { + case e: Exception => + blocks.drop(droppingDoneCount).foreach { block => + entries.synchronized{ + val entry = entries.get(block.id) + if (entry != null) entry.dropping = false + } + } + if (droppedMemorySize > 0) { + entries.synchronized { currentMemory -= droppedMemorySize } + droppedMemorySize = 0 + } + throw e + } + } + + def updateCurrentMemorySizeInTransaction(f: => Unit): Unit = { + entries.synchronized { + currentMemory -= droppedMemorySize + f + } + } + } } private[spark] case class ResultWithDroppedBlocks( success: Boolean, droppedBlocks: Seq[(BlockId, BlockStatus)]) + +private[spark] case class ToBeDroppedBlock(id: BlockId, data: Either[Array[Any], ByteBuffer])