From da2afad1e9db52e199c25ac624f8426225d86ff7 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Tue, 26 Aug 2014 18:20:30 +0800 Subject: [PATCH 1/9] [SPARK-3000][CORE] drop old blocks to disk in parallel when memory is not large enough for caching new blocks Currently, old blocks dropping for new blocks' caching are processed by one thread at the same time. Which can not fully utilize the disk throughput. If the to be dropped block size is huge, then the dropping time will be very long. We need to make it processed in parallel. In this patch, dropping blocks operation are processed in multiple threads, before dropping, each thread will select the blocks that to be dropped for itself. --- .../org/apache/spark/executor/Executor.scala | 4 +- .../apache/spark/storage/MemoryStore.scala | 650 +++++++++++++----- .../spark/storage/BlockManagerSuite.scala | 161 +++-- docs/configuration.md | 9 - 4 files changed, 552 insertions(+), 272 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 14f99a464b6e..76739bc07772 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -292,8 +292,8 @@ private[spark] class Executor( } finally { // Release memory used by this thread for shuffles env.shuffleMemoryManager.releaseMemoryForThisThread() - // Release memory used by this thread for unrolling blocks - env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() + // cleanup maintained memory infomation for this thread + env.blockManager.memoryStore.cleanupForThisThread() // Release memory used by this thread for accumulators Accumulators.clear() runningTasks.remove(taskId) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index ed609772e697..53e0c825e314 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.util.LinkedHashMap import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashSet} import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector @@ -37,32 +37,49 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) private val conf = blockManager.conf private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) + // a Set maintains old blocks that will be dropped + private val tobeDroppedBlocksSet = new HashSet[BlockId] + // currentMemory is actually memory that is already used for caching blocks @volatile private var currentMemory = 0L - // Ensure only one thread is putting, and if necessary, dropping blocks at any given time + // Ensure only one thread updating the information, information including memory to drop, + // memory unrolled, etc. private val accountingLock = new Object - // A mapping from thread ID to amount of memory used for unrolling a block (in bytes) + // A mapping from thread ID to amount of memory used for unrolling a block (in bytes). + // The memory is only reserved for unrolling, not actually occupied by blocks. // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val unrollMemoryMap = mutable.HashMap[Long, Long]() - // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. - // Pending unroll memory refers to the intermediate memory occupied by a thread - // after the unroll but before the actual putting of the block in the cache. - // This chunk of memory is expected to be released *as soon as* we finish - // caching the corresponding block as opposed to until after the task finishes. - // This is only used if a block is successfully unrolled in its entirety in - // memory (SPARK-4777). - private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() - - /** - * The amount of space ensured for unrolling values in memory, shared across all cores. - * This space is not reserved in advance, but allocated dynamically by dropping existing blocks. - */ - private val maxUnrollMemory: Long = { - val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2) - (maxMemory * unrollFraction).toLong - } + + // A mapping from thread ID to amount of memory used for preUnroll a block in marking phase. + // When unrolling a block when there is not enough free memory, we will mark old blocks + // to dropped to free more memory. The reservedUnrollMemory is the memory reserved, but the + // corresponding marked "tobeDropped" blocks has not beed dropped yet. + // Used for keeping "freeMemory" correctly when actually try to put new blocks. + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val reservedUnrollMemoryMap = mutable.HashMap[Long, Long]() + + // A mapping from thread ID to amount of memory reserved for unrolling part of a block (in + // bytes), when the block is not able to fully put into memory, will return an iterator when + // unrolling, but the memory still need to reserved before the block is dropping from memory. + // All accesses of this map are assumed to have manually synchronized on `accountingLoc + private val iteratorUnrollMemoryMap = mutable.HashMap[Long, Long]() + + // A mapping from thread ID to amount of memory to be dropped for new blocks (in bytes). + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val toDropMemoryMap = mutable.HashMap[Long, Long]() + + // A mapping from thread ID to a blockId Set that to be dropped for new blocks (in bytes). + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val toDropBlocksMap = mutable.HashMap[Long, HashSet[BlockId]]() + + // A mapping from thread ID to amount of memory reserved by new blocks to put (in bytes). + // The memory is reserved before blocks actually put into. + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val tryToPutMemoryMap = mutable.HashMap[Long, Long]() + + logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) // Initial memory to request before unrolling any block private val unrollMemoryThreshold: Long = @@ -77,7 +94,23 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */ - def freeMemory: Long = maxMemory - currentMemory + def actualFreeMemory: Long = maxMemory - currentMemory + + /** + * Free memory that can be used when new blocks are trying to put into memory. The value + * includes the memory that are marked to be dropped. + */ + def freeMemory: Long = maxMemory - ( + currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + + currentReservedUnrollMemory - currentToDropMemory) + + /** + * Free memory that can used when new blocks are unrolling to the memory. The value includes + * the memory that are marked to be dropped, but not include the memory for Unrolling. + */ + def freeMemoryForUnroll: Long = maxMemory - ( + currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + + currentUnrollMemory - currentToDropMemory) override def getSize(blockId: BlockId): Long = { entries.synchronized { @@ -172,7 +205,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") - val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) + val res = blockManager.diskStore.putIterator(blockId, + iteratorValues, level, returnValues) PutResult(res.size, res.data, droppedBlocks) } else { PutResult(0, Left(iteratorValues), droppedBlocks) @@ -209,10 +243,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def remove(blockId: BlockId): Boolean = { entries.synchronized { + // Every time when removing blocks from memory, the infomation about blocks that to be + // dropped need to be refreshed. + tobeDroppedBlocksSet.remove(blockId) + delToDropBlocksMapForThisThread(blockId) val entry = entries.remove(blockId) if (entry != null) { + decreaseToDropMemoryForThisThread(entry.size) currentMemory -= entry.size - logDebug(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)") true } else { false @@ -223,7 +261,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def clear() { entries.synchronized { entries.clear() + tobeDroppedBlocksSet.clear() currentMemory = 0 + unrollMemoryMap.clear() + reservedUnrollMemoryMap.clear() + iteratorUnrollMemoryMap.clear() + toDropMemoryMap.clear() + toDropBlocksMap.clear() + tryToPutMemoryMap.clear() } logInfo("MemoryStore cleared") } @@ -236,72 +281,96 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * checking whether the memory restrictions for unrolling blocks are still satisfied, * stopping immediately if not. This check is a safeguard against the scenario in which * there is not enough free memory to accommodate the entirety of a single block. + * + * When there is not enough memory for unrolling blocks, old blocks will be dropped from + * memory. The dropping operation is in parallel to fully utilized the disk throughput + * when there are multiple disks. And befor dropping, each thread will mark the old blocks + * that can be dropped. * * This method returns either an array with the contents of the entire block or an iterator * containing the values of the block (if the array would have exceeded available memory). */ + def unrollSafely( - blockId: BlockId, - values: Iterator[Any], - droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) - : Either[Array[Any], Iterator[Any]] = { + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Either[Array[Any], Iterator[Any]] = { // Number of elements unrolled so far - var elementsUnrolled = 0 + var elementsUnrolled = 0L // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true - // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing. - val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory val memoryCheckPeriod = 16 // Memory currently reserved by this thread for this particular unrolling operation - var memoryThreshold = initialMemoryThreshold + // Initial value is 0 means don't reserve memory originally, only reserve dynamically + var memoryThreshold = 0L // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 - // Previous unroll memory held by this thread, for releasing later (only at the very end) - val previousMemoryReserved = currentUnrollMemoryForThisThread // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] - - // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold) - - if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + - s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") - } - - // Unroll this block safely, checking whether we have exceeded our threshold periodically + + // preUnroll this block safely, checking whether we have exceeded our threshold periodically try { while (values.hasNext && keepUnrolling) { vector += values.next() - if (elementsUnrolled % memoryCheckPeriod == 0) { + // Every checking period reaches or the iterator reaches the end, we check whether extra + // memory is needed. + if (elementsUnrolled % memoryCheckPeriod == 0 || !values.hasNext) { // If our vector's size has exceeded the threshold, request more memory val currentSize = vector.estimateSize() - if (currentSize >= memoryThreshold) { + if (currentSize > memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - // Hold the accounting lock, in case another thread concurrently puts a block that - // takes up the unrolling space we just ensured here - accountingLock.synchronized { - if (!reserveUnrollMemoryForThisThread(amountToRequest)) { - // If the first request is not granted, try again after ensuring free space - // If there is still not enough space, give up and drop the partition - val spaceToEnsure = maxUnrollMemory - currentUnrollMemory - if (spaceToEnsure > 0) { - val result = ensureFreeSpace(blockId, spaceToEnsure) - droppedBlocks ++= result.droppedBlocks + if (freeMemoryForUnroll < amountToRequest) { + var selectedMemory = 0L + val selectedBlocks = new ArrayBuffer[BlockId]() + val ensureSpaceResult = ensureFreeSpace( + blockId, amountToRequest, freeMemoryForUnroll, true) + val enoughFreeSpace = ensureSpaceResult.success + + if (enoughFreeSpace) { + selectedBlocks ++= ensureSpaceResult.toDropBlocksId + selectedMemory = ensureSpaceResult.selectedMemory + if (!selectedBlocks.isEmpty) { + // drop old block in parallel to free memory for new blocks to unroll + for (selectedblockId <- selectedBlocks) { + val entry = entries.synchronized { entries.get(selectedblockId) } + if (entry != null) { + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[Array[Any]]) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + } + val droppedBlockStatus = blockManager.dropFromMemory(selectedblockId, data) + droppedBlockStatus.foreach { status => droppedBlocks += ((selectedblockId, + status)) } + } + } } - keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest) + // update reservedUnrollMemoryMap, indicate the tobeDroppedBlocks that marked by + // current thread has been dropped + decreaseReservedUnrollMemoryForThisThread(amountToRequest) + } else { + keepUnrolling = false } + } else { + increaseUnrollMemoryForThisThread(amountToRequest) + } + + if (keepUnrolling) { + memoryThreshold += amountToRequest } - // New threshold is currentSize * memoryGrowthFactor - memoryThreshold += amountToRequest } } elementsUnrolled += 1 } if (keepUnrolling) { + // to free up memory that requested more than needed + decreaseUnrollMemoryForThisThread(memoryThreshold - SizeEstimator.estimate( + vector.toArray.asInstanceOf[AnyRef])) + logInfo(s"Successfully unrolloing the block ${blockId} to memory, block size is " + + s"${SizeEstimator.estimate(vector.toArray.asInstanceOf[AnyRef])}") // We successfully unrolled the entirety of this block Left(vector.toArray) } else { @@ -309,18 +378,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) logUnrollFailureMessage(blockId, vector.estimateSize()) Right(vector.iterator ++ values) } - } finally { - // If we return an array, the values returned will later be cached in `tryToPut`. - // In this case, we should release the memory after we cache the block there. - // Otherwise, if we return an iterator, we release the memory reserved here - // later when the task finishes. - if (keepUnrolling) { - accountingLock.synchronized { - val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved - releaseUnrollMemoryForThisThread(amountToRelease) - reservePendingUnrollMemoryForThisThread(amountToRelease) + accountingLock.synchronized { + // If we return an iterator, that means the blocks is not able to put into memory, and + // will be dropped to disk if it can or just dropped from memory. The memory reserved + // for unrolling will not be released because it depending on the underlying vector. + // The memory size will be maintained from unrollMemoryMap to iteratorUnrollMemoryMap. + if (!keepUnrolling) { + reserveIteratorUnrollMemoryForThisThread() + removeUnrollMemoryForThisThread() } + // whatever we return, blocks that marked "to-be-dropped" should always have been dropped + removeToDropMemoryForThisThread() + // We will finally reset the ReservedUnrollMemory for current thread. The memory should + // always be 0 after dropping the selected blocks. + removeReservedUnrollMemoryForThisThread() } } } @@ -345,59 +417,72 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size * must also be passed by the caller. * - * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be - * created to avoid OOM since it may be a big ByteBuffer. - * - * Synchronize on `accountingLock` to ensure that all the put requests and its associated block - * dropping is done by only on thread at a time. Otherwise while one thread is dropping - * blocks to free memory for one block, another thread may use up the freed space for - * another block. - * + * In order to drop old blocks in parallel, we will first mark the blocks that can be dropped + * when there is not enough memory. + * * Return whether put was successful, along with the blocks dropped in the process. */ - private def tryToPut( - blockId: BlockId, - value: () => Any, - size: Long, - deserialized: Boolean): ResultWithDroppedBlocks = { - /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks - * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has - * been released, it must be ensured that those to-be-dropped blocks are not double counted - * for freeing up more space for another block that needs to be put. Only then the actually - * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ + private def tryToPut( + blockId: BlockId, + value: Any, + size: Long, + deserialized: Boolean): ResultWithDroppedBlocks = { var putSuccess = false + var enoughFreeSpace = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - accountingLock.synchronized { - val freeSpaceResult = ensureFreeSpace(blockId, size) - val enoughFreeSpace = freeSpaceResult.success - droppedBlocks ++= freeSpaceResult.droppedBlocks - - if (enoughFreeSpace) { - val entry = new MemoryEntry(value(), size, deserialized) - entries.synchronized { - entries.put(blockId, entry) - currentMemory += size + var selectedMemory = 0L + val selectedBlocks = new ArrayBuffer[BlockId]() + + val freeSpaceResult = ensureFreeSpace(blockId, size, freeMemory, false) + enoughFreeSpace = freeSpaceResult.success + if (enoughFreeSpace) { + selectedBlocks ++= freeSpaceResult.toDropBlocksId + selectedMemory = freeSpaceResult.selectedMemory + if (!selectedBlocks.isEmpty) { + for (selectedblockId <- selectedBlocks) { + val entry = entries.synchronized { entries.get(selectedblockId) } + // drop old block in parallel to free memory for new blocks to put + if (entry != null) { + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[Array[Any]]) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + } + val droppedBlockStatus = blockManager.dropFromMemory(selectedblockId, data) + droppedBlockStatus.foreach { status => droppedBlocks += ((selectedblockId, status)) } + } } - val valuesOrBytes = if (deserialized) "values" else "bytes" - logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( - blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) - putSuccess = true + } + + val entry = new MemoryEntry(value, size, deserialized) + entries.synchronized { + entries.put(blockId, entry) + decreaseTryToPutMemoryForThisThread(size) + currentMemory += size + } + val valuesOrBytes = if (deserialized) "values" else "bytes" + logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( + blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) + putSuccess = true + } else { + logInfo(s"Failed to put block ${blockId} to memory, block size is ${size}.") + // For some reason, blocks might still not be able to put into memory even unroll + // successfully.If so, we need to clear reserved unroll memory and to be dropped blocks for + // this thread. + removeUnrollMemoryForThisThread() + removeToDropMemoryForThisThread() + // Tell the block manager that we couldn't put it in memory so that it can drop it to + // disk if the block allows disk storage. + val data = if (deserialized) { + Left(value.asInstanceOf[Array[Any]]) } else { - // Tell the block manager that we couldn't put it in memory so that it can drop it to - // disk if the block allows disk storage. - lazy val data = if (deserialized) { - Left(value().asInstanceOf[Array[Any]]) - } else { - Right(value().asInstanceOf[ByteBuffer].duplicate()) - } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + Right(value.asInstanceOf[ByteBuffer].duplicate()) } - // Release the unroll memory used because we no longer need the underlying Array - releasePendingUnrollMemoryForThisThread() + val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) + droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } ResultWithDroppedBlocks(putSuccess, droppedBlocks) } @@ -408,106 +493,189 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * + * In this method each thread only make the marking operations on blocks to see whether + * there will be enough memory if dropping the selected blocks. The acturally dropping + * operation will begin if the marking operation succeed. + * * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping * blocks. Otherwise, the freed space may fill up before the caller puts in their new value. * - * Return whether there is enough free space, along with the blocks dropped in the process. + * Return whether there is enough free space, along with the blocks marked as "to-be-dropped" + * and the memory that can be freed if the "to-be-dropped" blocks are actually dropped. */ private def ensureFreeSpace( - blockIdToAdd: BlockId, - space: Long): ResultWithDroppedBlocks = { - logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory") + blockIdToAdd: BlockId, + size: Long, + memoryFree: Long, + isUnroll: Boolean): ResultBlocksIdMemory = { + logInfo(s"ensureFreeSpace($size) called with curMem=$currentMemory, maxMem=$maxMemory") + var putSuccess = false + var enoughFreeSpace = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + var selectedMemory = 0L + val selectedBlocks = new ArrayBuffer[BlockId]() - if (space > maxMemory) { + if (size > maxMemory) { logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit") - return ResultWithDroppedBlocks(success = false, droppedBlocks) - } - - // Take into account the amount of memory currently occupied by unrolling blocks - // and minus the pending unroll memory for that block on current thread. - val threadId = Thread.currentThread().getId - val actualFreeMemory = freeMemory - currentUnrollMemory + - pendingUnrollMemoryMap.getOrElse(threadId, 0L) - - if (actualFreeMemory < space) { - val rddToAdd = getRddId(blockIdToAdd) - val selectedBlocks = new ArrayBuffer[BlockId] - var selectedMemory = 0L - + ResultBlocksIdMemory(success = false, selectedBlocks.toSeq, selectedMemory) + } else { // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that // can lead to exceptions. entries.synchronized { - val iterator = entries.entrySet().iterator() - while (actualFreeMemory + selectedMemory < space && iterator.hasNext) { - val pair = iterator.next() - val blockId = pair.getKey - if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { - selectedBlocks += blockId - selectedMemory += pair.getValue.size + if (memoryFree < size) { + val rddToAdd = getRddId(blockIdToAdd) + val iterator = entries.entrySet().iterator() + while (memoryFree + selectedMemory < size && iterator.hasNext) { + val pair = iterator.next() + val blockId = pair.getKey + if (!tobeDroppedBlocksSet.contains(blockId)) { + if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { + selectedBlocks += blockId + selectedMemory += pair.getValue.size + } + } } } - } - - if (actualFreeMemory + selectedMemory >= space) { - logInfo(s"${selectedBlocks.size} blocks selected for dropping") - for (blockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(blockId) } - // This should never be null as only one thread should be dropping - // blocks and removing entries. However the check is still here for - // future safety. - if (entry != null) { - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[Array[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) - } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + if (memoryFree + selectedMemory >= size) { + tobeDroppedBlocksSet ++= selectedBlocks + addToDropBlocksMapForThisThread(selectedBlocks.toArray) + increaseToDropMemoryForThisThread(selectedMemory) + if (isUnroll) { + increaseUnrollMemoryForThisThread(size) + increaseReservedUnrollMemoryForThisThread(size) + } else { + increaseTryToPutMemoryForThisThread(size) + decreaseUnrollMemoryForThisThread(size) } + enoughFreeSpace = true + logInfo(selectedBlocks.size + " blocks selected for dropping") + ResultBlocksIdMemory(success = true, selectedBlocks.toSeq, selectedMemory) + } else { + logInfo(s"Will not store $blockIdToAdd as it would require" + + s" dropping another block from the same RDD") + ResultBlocksIdMemory(success = false, selectedBlocks.toSeq, selectedMemory) } - return ResultWithDroppedBlocks(success = true, droppedBlocks) - } else { - logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " + - "from the same RDD") - return ResultWithDroppedBlocks(success = false, droppedBlocks) } } - ResultWithDroppedBlocks(success = true, droppedBlocks) } override def contains(blockId: BlockId): Boolean = { entries.synchronized { entries.containsKey(blockId) } } + private[spark] def cleanupForThisThread(): Unit = { + removeUnrollMemoryForThisThread() + removeIteratorUnrollMemoryForThisThread() + removeToDropMemoryForThisThread() + removeTryToPutMemoryForThisThread() + } /** - * Reserve additional memory for unrolling blocks used by this thread. - * Return whether the request is granted. + * Increase memory size that will be dropped by this thread in future, which means more + * old blocks are marked as "to-be-dropped" for this thread. */ - def reserveUnrollMemoryForThisThread(memory: Long): Boolean = { + private[spark] def increaseToDropMemoryForThisThread(memory: Long): Unit = { + val threadId = Thread.currentThread().getId accountingLock.synchronized { - val granted = freeMemory > currentUnrollMemory + memory - if (granted) { - val threadId = Thread.currentThread().getId - unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory + toDropMemoryMap(threadId) = toDropMemoryMap.getOrElse(threadId, 0L) + memory + } + } + + /** + * Decrease memory size that will be dropped by this thread in future, which means some + * old blocks marked as "to-be-dropped" are finished dropping. + */ + private[spark] def decreaseToDropMemoryForThisThread(memory: Long = -1L): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + if (memory > 0) { + toDropMemoryMap(threadId) = toDropMemoryMap.getOrElse(threadId, 0L) - memory + // If this thread claims no more unroll memory, release it completely + if (toDropMemoryMap(threadId) <= 0) { + toDropMemoryMap.remove(threadId) + cleanToDropBlocksMapForThisThread() + } } - granted } } /** - * Release memory used by this thread for unrolling blocks. + * Return the amount of memory currently totally to be dropped for unrolling blocks or for + * putting blocks across all threads. + */ + private[spark] def currentToDropMemory: Long = accountingLock.synchronized { + toDropMemoryMap.values.sum + } + + /** + * Remove all memory that will be dropped for this thread, also the old blocks marked as + * "to-be-dropped" for this thread will remove the marking. + */ + private[spark] def removeToDropMemoryForThisThread(): Unit = accountingLock.synchronized { + toDropMemoryMap.remove(Thread.currentThread().getId) + cleanToDropBlocksMapForThisThread() + } + + /** + * Mark more old blocks as "to-be-dropped" for this thread. + */ + private[spark] def addToDropBlocksMapForThisThread(blocksId: Array[BlockId]): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()) ++= blocksId + } + } + + /** + * Remove a specified block from the map that marked as "to-be-dropped" from this thread, + * which means the blocks has been dropped from the memory. + */ + private[spark] def delToDropBlocksMapForThisThread(blockId: BlockId): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()).remove(blockId) + } + } + + /** + * Remove all block that marked as "to-be-dropped" from the map for this thread, which means + * either the blocks has been dropped from memory or the the marking is invalid. + */ + private[spark] def cleanToDropBlocksMapForThisThread(): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + val blockIdSet = toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()) + if (!blockIdSet.isEmpty) { + val itr = blockIdSet.iterator + while (itr.hasNext) { + val blockId = itr.next() + tobeDroppedBlocksSet.remove(blockId) + } + toDropBlocksMap.remove(threadId) + } + } + } + + /** + * Reserve additional memory for unrolling blocks reserved by this thread. + */ + private[spark] def increaseUnrollMemoryForThisThread(memory: Long): Unit = { + accountingLock.synchronized { + val threadId = Thread.currentThread().getId + unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory + } + } + + /** + * Release memory reserved by this thread for unrolling blocks. * If the amount is not specified, remove the current thread's allocation altogether. */ - def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { + private[spark] def decreaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { - if (memory < 0) { - unrollMemoryMap.remove(threadId) - } else { - unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory + if (memory > 0) { + unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) - memory // If this thread claims no more unroll memory, release it completely if (unrollMemoryMap(threadId) <= 0) { unrollMemoryMap.remove(threadId) @@ -517,38 +685,141 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** - * Reserve the unroll memory of current unroll successful block used by this thread - * until actually put the block into memory entry. + * Return the amount of memory currently totally reserved for unrolling blocks across + * all threads. The urolling blocks are blocks that have been confirmed can putting into + * the memory. + */ + def currentUnrollMemory: Long = accountingLock.synchronized { + unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum + } + + /** + * Rmove the memory from unrolling from the map, which means either the blocks has been marked + * as "try-to-put" or blocks has been put into memory or the unrolled memory is invalid. */ - def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = { + private[spark] def removeUnrollMemoryForThisThread(): Unit = accountingLock.synchronized { + unrollMemoryMap.remove(Thread.currentThread().getId) + } + + /** + * Increase memory for reservedUnroll for this thread. This only happen when there is not enough + * space for unrolling new block and need to drop old block for more space. So, each + * reservedUnrollMemory will correspond to some amount "to-be-dropped" memory, and after the + * corresponding "to-be-dropped" blocks are dropped from memroy, reservedunrollMemory should + * also be refreshed. + */ + private[spark] def increaseReservedUnrollMemoryForThisThread(memory: Long): Unit = { + accountingLock.synchronized { + val threadId = Thread.currentThread().getId + reservedUnrollMemoryMap(threadId) = reservedUnrollMemoryMap.getOrElse(threadId, 0L) + memory + } + } + + /** + * Release memory used for reserveUnroll by this thread. Which means the corresponding + * "to-be-dropped" blocks has been dropped from the memory. + */ + private[spark] def decreaseReservedUnrollMemoryForThisThread(memory: Long = -1L): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { - pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory + if (memory > 0) { + reservedUnrollMemoryMap(threadId) = reservedUnrollMemoryMap.getOrElse( + threadId, 0L) - memory + // If this thread claims no more reservedUnroll memory, release it completely + if (reservedUnrollMemoryMap(threadId) <= 0) { + reservedUnrollMemoryMap.remove(threadId) + } + } } } /** - * Release pending unroll memory of current unroll successful block used by this thread + * Return the amount of memory currently totally reserved for reservedUnrolling across + * all threads. + */ + private[spark] def currentReservedUnrollMemory: Long = accountingLock.synchronized { + reservedUnrollMemoryMap.values.sum + } + + /** + * clean the reservedunrollMemoryMap for this thread, each time after the unrolling process, + * this method need to be called. + */ + private[spark] def removeReservedUnrollMemoryForThisThread(): Unit = accountingLock.synchronized { + reservedUnrollMemoryMap.remove(Thread.currentThread().getId) + } + + /** + * When a block can not unroll into memory, the memory size it has already reserved should + * maintained in iteratorUnrollMemoryMap. */ - def releasePendingUnrollMemoryForThisThread(): Unit = { + private[spark] def reserveIteratorUnrollMemoryForThisThread(): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { - pendingUnrollMemoryMap.remove(threadId) + val unrolledMem = unrollMemoryMap.getOrElse(threadId, 0L) + iteratorUnrollMemoryMap(threadId) = iteratorUnrollMemoryMap.getOrElse( + threadId, 0L) + unrolledMem + } + } + /** + * Return the amount of memory currently totally reserved for part of blocks that can not put + * into the memory (will drop to disk or just drop from meory in future) across all threads. + */ + private[spark] def currentIteratorUnrollMemory: Long = accountingLock.synchronized { + iteratorUnrollMemoryMap.values.sum + } + + /** + * After the block dropped from memory, should clean the reservedUnrollMemoryMap, which will + * free up memory for new blocks to unroll or to tryToPut. + */ + private[spark] def removeIteratorUnrollMemoryForThisThread(): Unit = accountingLock.synchronized { + iteratorUnrollMemoryMap.remove(Thread.currentThread().getId) + } + + + /** + * Reserve additional memory for putting blocks for this thread. That meand more blocks are + * waiting to put into memory, and before putting into memory, it will reserve some memory first. + */ + private[spark] def increaseTryToPutMemoryForThisThread(memory: Long): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + tryToPutMemoryMap(threadId) = tryToPutMemoryMap.getOrElse(threadId, 0L) + memory } } /** - * Return the amount of memory currently occupied for unrolling blocks across all threads. + * Release used by this thread for putting new blocks, which means new block has been put into + * memory already. */ - def currentUnrollMemory: Long = accountingLock.synchronized { - unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum + private[spark] def decreaseTryToPutMemoryForThisThread(memory: Long = -1L): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + if (memory > 0) { + tryToPutMemoryMap(threadId) = tryToPutMemoryMap.getOrElse(threadId, memory) - memory + // If this thread claims no more unroll memory, release it completely + if (tryToPutMemoryMap(threadId) <= 0) { + tryToPutMemoryMap.remove(threadId) + } + } + } } /** - * Return the amount of memory currently occupied for unrolling blocks by this thread. + * Return the amount of memory currently reserved for putting new blocks across all threads. + */ + private[spark] def currentTryToPutMemory: Long = accountingLock.synchronized { + tryToPutMemoryMap.values.sum + } + + /** + * Clean all memory reserved for putting new blocks, at the same time, marking of blocks + * marked as "to-be-dropped" for this thread will be cleaned. */ - def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized { - unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) + private [spark] def removeTryToPutMemoryForThisThread(): Unit = { + tryToPutMemoryMap.remove(Thread.currentThread().getId) + removeToDropMemoryForThisThread() } /** @@ -589,3 +860,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) private[spark] case class ResultWithDroppedBlocks( success: Boolean, droppedBlocks: Seq[(BlockId, BlockStatus)]) + +private[spark] case class ResultBlocksIdMemory( + success: Boolean, + toDropBlocksId: Seq[BlockId], + selectedMemory: Long) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 545722b050ee..a510084ce597 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -999,36 +999,33 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") } - test("reserve/release unroll memory") { + test("increase/decrease unroll memory") { store = makeBlockManager(12000) val memoryStore = store.memoryStore assert(memoryStore.currentUnrollMemory === 0) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) - - // Reserve - memoryStore.reserveUnrollMemoryForThisThread(100) - assert(memoryStore.currentUnrollMemoryForThisThread === 100) - memoryStore.reserveUnrollMemoryForThisThread(200) - assert(memoryStore.currentUnrollMemoryForThisThread === 300) - memoryStore.reserveUnrollMemoryForThisThread(500) - assert(memoryStore.currentUnrollMemoryForThisThread === 800) - memoryStore.reserveUnrollMemoryForThisThread(1000000) - assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted - // Release - memoryStore.releaseUnrollMemoryForThisThread(100) - assert(memoryStore.currentUnrollMemoryForThisThread === 700) - memoryStore.releaseUnrollMemoryForThisThread(100) - assert(memoryStore.currentUnrollMemoryForThisThread === 600) - // Reserve again - memoryStore.reserveUnrollMemoryForThisThread(4400) - assert(memoryStore.currentUnrollMemoryForThisThread === 5000) - memoryStore.reserveUnrollMemoryForThisThread(20000) - assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted - // Release again - memoryStore.releaseUnrollMemoryForThisThread(1000) - assert(memoryStore.currentUnrollMemoryForThisThread === 4000) - memoryStore.releaseUnrollMemoryForThisThread() // release all - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Increase + memoryStore.increaseUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemory === 100) + memoryStore.increaseUnrollMemoryForThisThread(200) + assert(memoryStore.currentUnrollMemory === 300) + memoryStore.increaseUnrollMemoryForThisThread(500) + assert(memoryStore.currentUnrollMemory === 800) + // Decrease + memoryStore.decreaseUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemory === 700) + memoryStore.decreaseUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemory === 600) + memoryStore.decreaseUnrollMemoryForThisThread(700) + assert(memoryStore.currentUnrollMemory === 0) // decrease too much + // Increase again + memoryStore.increaseUnrollMemoryForThisThread(4400) + assert(memoryStore.currentUnrollMemory === 4400) + // Decrease again + memoryStore.decreaseUnrollMemoryForThisThread(1000) + assert(memoryStore.currentUnrollMemory === 3400) + memoryStore.removeUnrollMemoryForThisThread() // release all + assert(memoryStore.currentUnrollMemory === 0) } /** @@ -1056,37 +1053,46 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach test("safely unroll blocks") { store = makeBlockManager(12000) val smallList = List.fill(40)(new Array[Byte](100)) - val bigList = List.fill(40)(new Array[Byte](1000)) + val midList = List.fill(60)(new Array[Byte](100)) + val bigList = List.fill(400)(new Array[Byte](100)) val memoryStore = store.memoryStore val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) // Unroll with all the space in the world. This should succeed and return an array. var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) - memoryStore.releasePendingUnrollMemoryForThisThread() + assert(memoryStore.currentUnrollMemory > 0) + memoryStore.removeUnrollMemoryForThisThread() + assert(memoryStore.currentUnrollMemory === 0) // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) + unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) assert(droppedBlocks.size === 1) assert(droppedBlocks.head._1 === TestBlockId("someBlock1")) droppedBlocks.clear() - memoryStore.releasePendingUnrollMemoryForThisThread() - - // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = - // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. - // In the mean time, however, we kicked out someBlock2 before giving up. - store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) + memoryStore.removeUnrollMemoryForThisThread() + memoryStore.remove("someBlock2") + assert(!memoryStore.contains("someblock2")) + + // memoryGrowthFactor is 1.5, put midList fist, will must drop someBlock3 when putting + // huge block. + store.putIterator("someBlock3", midList.iterator, StorageLevel.MEMORY_ONLY) + // Unroll huge block with not enough space. Even after ensuring free space by dropping old + // block "someBlock3", there is still not enough room to unroll this block. + // This returns an iterator. In the mean time, however, we kicked out "someBlock3" before + // giving up. unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks) verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) - assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + assert(memoryStore.currentUnrollMemory === 0) // we returned an iterator + // reserved memory maintained in IteratorUnrollMemoryMap + assert(memoryStore.currentIteratorUnrollMemory > 0) assert(droppedBlocks.size === 1) - assert(droppedBlocks.head._1 === TestBlockId("someBlock2")) + assert(droppedBlocks.head._1 === TestBlockId("someBlock3")) droppedBlocks.clear() } @@ -1095,10 +1101,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val memOnly = StorageLevel.MEMORY_ONLY val memoryStore = store.memoryStore val smallList = List.fill(40)(new Array[Byte](100)) - val bigList = List.fill(40)(new Array[Byte](1000)) - def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] - def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + val midList = List.fill(60)(new Array[Byte](100)) + val bigList = List.fill(400)(new Array[Byte](100)) + def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]] + def midIterator = midList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemory === 0) // Unroll with plenty of space. This should succeed and cache both blocks. val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true) @@ -1109,7 +1117,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(result2.size > 0) assert(result1.data.isLeft) // unroll did not drop this block to disk assert(result2.data.isLeft) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) // Re-put these two blocks so block manager knows about them too. Otherwise, block manager // would not know how to drop them from memory later. @@ -1125,19 +1133,23 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(memoryStore.contains("b3")) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) + assert(memoryStore.currentIteratorUnrollMemory === 0) memoryStore.remove("b3") - store.putIterator("b3", smallIterator, memOnly) + store.putIterator("b3", midIterator, memOnly) - // Unroll huge block with not enough space. This should fail and kick out b2 in the process. + // Unroll huge block with not enough space. This should fail and kick out b2 and b3 in + // the process. val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true) assert(result4.size === 0) // unroll was unsuccessful assert(result4.data.isLeft) assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) - assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) - assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + assert(memoryStore.currentUnrollMemory === 0) // we returned an iterator + // iterator Unroll memory is not released because unrollSafely returned an iterator + assert(memoryStore.currentIteratorUnrollMemory > 0) } /** @@ -1149,10 +1161,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val memoryStore = store.memoryStore val diskStore = store.diskStore val smallList = List.fill(40)(new Array[Byte](100)) - val bigList = List.fill(40)(new Array[Byte](1000)) - def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] - def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + val midList = List.fill(60)(new Array[Byte](100)) + val bigList = List.fill(400)(new Array[Byte](100)) + def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]] + def midIterator = midList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]] + assert(memoryStore.currentUnrollMemory === 0) store.putIterator("b1", smallIterator, memAndDisk) store.putIterator("b2", smallIterator, memAndDisk) @@ -1168,24 +1182,26 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(!diskStore.contains("b2")) assert(!diskStore.contains("b3")) memoryStore.remove("b3") - store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + store.putIterator("b3", midIterator, StorageLevel.MEMORY_ONLY) + assert(memoryStore.currentUnrollMemory === 0) // Unroll huge block with not enough space. This should fail and drop the new block to disk - // directly in addition to kicking out b2 in the process. Memory store should contain only - // b3, while disk store should contain b1, b2 and b4. + // directly in addition to kicking out b2 and b3 in the process. Memory store should contain + // nothing, while disk store should contain b1, b2 and b4. val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true) assert(result4.size > 0) assert(result4.data.isRight) // unroll returned bytes from disk assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) - assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) assert(diskStore.contains("b1")) assert(diskStore.contains("b2")) assert(!diskStore.contains("b3")) assert(diskStore.contains("b4")) - assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + assert(memoryStore.currentUnrollMemory === 0) // we returned an iterator + // iterator Unroll memory is not released because unrollSafely returned an iterator + assert(memoryStore.currentIteratorUnrollMemory > 0) } test("multiple unrolls by the same thread") { @@ -1194,32 +1210,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val memoryStore = store.memoryStore val smallList = List.fill(40)(new Array[Byte](100)) def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) // All unroll memory used is released because unrollSafely returned an array memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) - // Unroll memory is not released because unrollSafely returned an iterator - // that still depends on the underlying vector used in the process + // There will always be enouth space by dropping old blocks memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread - assert(unrollMemoryAfterB3 > 0) + val unrollMemoryAfterB3 = memoryStore.currentUnrollMemory + assert(unrollMemoryAfterB3 === 0) - // The unroll memory owned by this thread builds on top of its value after the previous unrolls memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread - assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) + val unrollMemoryAfterB4 = memoryStore.currentUnrollMemory + assert(unrollMemoryAfterB4 === unrollMemoryAfterB3) - // ... but only to a certain extent (until we run out of free space to grant new unroll memory) memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread + val unrollMemoryAfterB5 = memoryStore.currentUnrollMemory memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread + val unrollMemoryAfterB6 = memoryStore.currentUnrollMemory memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread + val unrollMemoryAfterB7 = memoryStore.currentUnrollMemory assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) diff --git a/docs/configuration.md b/docs/configuration.md index 7fe11475212b..6fd605bc29fb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -802,15 +802,6 @@ Apart from these, the following properties are also available, and may be useful mapping has high overhead for blocks close to or below the page size of the operating system. - - spark.storage.unrollFraction - 0.2 - - Fraction of spark.storage.memoryFraction to use for unrolling blocks in memory. - This is dynamically allocated by dropping existing blocks when there is not enough free - storage space to unroll the new block in its entirety. - - spark.tachyonStore.baseDir System.getProperty("java.io.tmpdir") From 4f76f4cf09367879600c302f69a6da3c377c41df Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Wed, 27 Aug 2014 08:29:49 +0800 Subject: [PATCH 2/9] add logInfo back when dropping a block from memory --- core/src/main/scala/org/apache/spark/storage/MemoryStore.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 53e0c825e314..7f54a00c76ed 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -251,6 +251,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (entry != null) { decreaseToDropMemoryForThisThread(entry.size) currentMemory -= entry.size + logInfo(s"Block $blockId of size ${entry.size} dropped from memory") true } else { false From cd0f5470606a3365883cf3b8fb632ef1e67c3786 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Fri, 29 Aug 2014 09:27:19 +0800 Subject: [PATCH 3/9] indentation correction and some comments modified --- .../apache/spark/storage/MemoryStore.scala | 47 ++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 7f54a00c76ed..329873ea076b 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -63,14 +63,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // A mapping from thread ID to amount of memory reserved for unrolling part of a block (in // bytes), when the block is not able to fully put into memory, will return an iterator when // unrolling, but the memory still need to reserved before the block is dropping from memory. - // All accesses of this map are assumed to have manually synchronized on `accountingLoc + // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val iteratorUnrollMemoryMap = mutable.HashMap[Long, Long]() // A mapping from thread ID to amount of memory to be dropped for new blocks (in bytes). // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val toDropMemoryMap = mutable.HashMap[Long, Long]() - // A mapping from thread ID to a blockId Set that to be dropped for new blocks (in bytes). + // A mapping from thread ID to a blockId Set that to be dropped for new blocks. // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val toDropBlocksMap = mutable.HashMap[Long, HashSet[BlockId]]() @@ -101,16 +101,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * includes the memory that are marked to be dropped. */ def freeMemory: Long = maxMemory - ( - currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + - currentReservedUnrollMemory - currentToDropMemory) - + currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + + currentReservedUnrollMemory - currentToDropMemory) + /** * Free memory that can used when new blocks are unrolling to the memory. The value includes * the memory that are marked to be dropped, but not include the memory for Unrolling. */ def freeMemoryForUnroll: Long = maxMemory - ( - currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + - currentUnrollMemory - currentToDropMemory) + currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + + currentUnrollMemory - currentToDropMemory) override def getSize(blockId: BlockId): Long = { entries.synchronized { @@ -243,7 +243,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def remove(blockId: BlockId): Boolean = { entries.synchronized { - // Every time when removing blocks from memory, the infomation about blocks that to be + // Every time when removing blocks from memory, the information about blocks that to be // dropped need to be refreshed. tobeDroppedBlocksSet.remove(blockId) delToDropBlocksMapForThisThread(blockId) @@ -282,7 +282,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * checking whether the memory restrictions for unrolling blocks are still satisfied, * stopping immediately if not. This check is a safeguard against the scenario in which * there is not enough free memory to accommodate the entirety of a single block. - * + * * When there is not enough memory for unrolling blocks, old blocks will be dropped from * memory. The dropping operation is in parallel to fully utilized the disk throughput * when there are multiple disks. And befor dropping, each thread will mark the old blocks @@ -293,9 +293,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) */ def unrollSafely( - blockId: BlockId, - values: Iterator[Any], - droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Either[Array[Any], Iterator[Any]] = { + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) + : Either[Array[Any], Iterator[Any]] = { // Number of elements unrolled so far var elementsUnrolled = 0L @@ -425,10 +426,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) */ private def tryToPut( - blockId: BlockId, - value: Any, - size: Long, - deserialized: Boolean): ResultWithDroppedBlocks = { + blockId: BlockId, + value: Any, + size: Long, + deserialized: Boolean): ResultWithDroppedBlocks = { var putSuccess = false var enoughFreeSpace = false @@ -521,9 +522,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit") ResultBlocksIdMemory(success = false, selectedBlocks.toSeq, selectedMemory) } else { - // This is synchronized to ensure that the set of entries is not changed - // (because of getValue or getBytes) while traversing the iterator, as that - // can lead to exceptions. + // This is synchronized with two purpose, one is to ensure that the set of entries + // is not changed (because of getValue or getBytes) while traversing the iterator, + // as that can lead to exceptions. Two is to ensure that only one thread is traversing + // the entry to select "to-be-dropped" blocks and update the map information, to avoid + // same block is selected by multiple threads. entries.synchronized { if (memoryFree < size) { val rddToAdd = getRddId(blockIdToAdd) @@ -746,7 +749,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * clean the reservedunrollMemoryMap for this thread, each time after the unrolling process, * this method need to be called. */ - private[spark] def removeReservedUnrollMemoryForThisThread(): Unit = accountingLock.synchronized { + private[spark] def removeReservedUnrollMemoryForThisThread() + : Unit = accountingLock.synchronized { reservedUnrollMemoryMap.remove(Thread.currentThread().getId) } @@ -774,7 +778,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * After the block dropped from memory, should clean the reservedUnrollMemoryMap, which will * free up memory for new blocks to unroll or to tryToPut. */ - private[spark] def removeIteratorUnrollMemoryForThisThread(): Unit = accountingLock.synchronized { + private[spark] def removeIteratorUnrollMemoryForThisThread() + : Unit = accountingLock.synchronized { iteratorUnrollMemoryMap.remove(Thread.currentThread().getId) } From 5079e41bcb06b3a741957e6352bdd62b56e018cc Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Tue, 2 Sep 2014 13:58:33 +0800 Subject: [PATCH 4/9] handle exception when dropping blocks accroding comments in [SPARK-1888] PR#791 --- .../apache/spark/storage/MemoryStore.scala | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 329873ea076b..78e2b3bf8294 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -443,22 +443,36 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (enoughFreeSpace) { selectedBlocks ++= freeSpaceResult.toDropBlocksId selectedMemory = freeSpaceResult.selectedMemory - if (!selectedBlocks.isEmpty) { - for (selectedblockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(selectedblockId) } - // drop old block in parallel to free memory for new blocks to put - if (entry != null) { - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[Array[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + try { + if (!selectedBlocks.isEmpty) { + for (selectedblockId <- selectedBlocks) { + val entry = entries.synchronized { entries.get(selectedblockId) } + // drop old block in parallel to free memory for new blocks to put + if (entry != null) { + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[Array[Any]]) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) + } + val droppedBlockStatus = blockManager.dropFromMemory(selectedblockId, data) + droppedBlockStatus.foreach { status => droppedBlocks += ((selectedblockId, status)) } } - val droppedBlockStatus = blockManager.dropFromMemory(selectedblockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((selectedblockId, status)) } } } + } catch { + // if there is exception, the current block will never put into Memory + case e: Exception => + { + decreaseTryToPutMemoryForThisThread(size) + } + throw e + } finally { + // whatever there is exception or not, blocks selected by this thread to drop should + // already been dropped, and Unrolled Memory for this thread should also be 0. + removeUnrollMemoryForThisThread() + removeToDropMemoryForThisThread() } - + val entry = new MemoryEntry(value, size, deserialized) entries.synchronized { entries.put(blockId, entry) @@ -472,7 +486,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } else { logInfo(s"Failed to put block ${blockId} to memory, block size is ${size}.") // For some reason, blocks might still not be able to put into memory even unroll - // successfully.If so, we need to clear reserved unroll memory and to be dropped blocks for + // successfully. If so, we need to clear reserved unroll memory and to-be-dropped blocks for // this thread. removeUnrollMemoryForThisThread() removeToDropMemoryForThisThread() @@ -524,9 +538,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } else { // This is synchronized with two purpose, one is to ensure that the set of entries // is not changed (because of getValue or getBytes) while traversing the iterator, - // as that can lead to exceptions. Two is to ensure that only one thread is traversing - // the entry to select "to-be-dropped" blocks and update the map information, to avoid - // same block is selected by multiple threads. + // as that can lead to exceptions. The other is to ensure that only one thread is + // traversing the entry to select "to-be-dropped" blocks and update the map information, + // to avoid same block is selected by multiple threads. entries.synchronized { if (memoryFree < size) { val rddToAdd = getRddId(blockIdToAdd) @@ -534,6 +548,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) while (memoryFree + selectedMemory < size && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey + // only blocks that has not been selected can be selected if (!tobeDroppedBlocksSet.contains(blockId)) { if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { selectedBlocks += blockId From 0fe07e612386e3af7e3b2a9700b2132588096029 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Tue, 2 Sep 2014 14:10:24 +0800 Subject: [PATCH 5/9] refine code style --- .../main/scala/org/apache/spark/storage/MemoryStore.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 78e2b3bf8294..20ffd3dcc68c 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -461,11 +461,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } catch { // if there is exception, the current block will never put into Memory - case e: Exception => - { - decreaseTryToPutMemoryForThisThread(size) - } + case e: Exception => { + decreaseTryToPutMemoryForThisThread(size) throw e + } } finally { // whatever there is exception or not, blocks selected by this thread to drop should // already been dropped, and Unrolled Memory for this thread should also be 0. From 4390f15b10dab8db5a586e782c7d0b8f1952c1f0 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Wed, 3 Sep 2014 10:27:03 +0800 Subject: [PATCH 6/9] make logic correctly when unrolling memory if encounter exceptions --- .../src/main/scala/org/apache/spark/storage/MemoryStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 20ffd3dcc68c..5129dceaf879 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -324,6 +324,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (currentSize > memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong if (freeMemoryForUnroll < amountToRequest) { + keepUnrolling = false var selectedMemory = 0L val selectedBlocks = new ArrayBuffer[BlockId]() val ensureSpaceResult = ensureFreeSpace( @@ -352,8 +353,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // update reservedUnrollMemoryMap, indicate the tobeDroppedBlocks that marked by // current thread has been dropped decreaseReservedUnrollMemoryForThisThread(amountToRequest) - } else { - keepUnrolling = false + keepUnrolling = true } } else { increaseUnrollMemoryForThisThread(amountToRequest) From ab1f9f8cbf954457c8e47f46cb2a53ebb1acdd59 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Thu, 6 Nov 2014 17:35:57 +0800 Subject: [PATCH 7/9] rebased and isolate each threads operation, the freed memory by droppig old blocks can only be used by current blocks, in this way to avoid OOM risk --- .../apache/spark/storage/MemoryStore.scala | 390 +++++++----------- .../spark/storage/BlockManagerSuite.scala | 1 + 2 files changed, 159 insertions(+), 232 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 5129dceaf879..a153b8da2800 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -48,17 +48,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) private val accountingLock = new Object // A mapping from thread ID to amount of memory used for unrolling a block (in bytes). - // The memory is only reserved for unrolling, not actually occupied by blocks. + // The memory is only for unrolling, not actually occupied by blocks. // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val unrollMemoryMap = mutable.HashMap[Long, Long]() - // A mapping from thread ID to amount of memory used for preUnroll a block in marking phase. - // When unrolling a block when there is not enough free memory, we will mark old blocks - // to dropped to free more memory. The reservedUnrollMemory is the memory reserved, but the - // corresponding marked "tobeDropped" blocks has not beed dropped yet. - // Used for keeping "freeMemory" correctly when actually try to put new blocks. + // A mapping from thread ID to amount of memory that is free but is reserved for Unroll + // or tryToPut. The memory is free but has been reserved by current thread so that other + // thread would not know this free memory. After Unroll or tryToPut, this free memory will + // be used or released for other threads. // All accesses of this map are assumed to have manually synchronized on `accountingLock` - private val reservedUnrollMemoryMap = mutable.HashMap[Long, Long]() + private val reservedFreeMemoryMap = mutable.HashMap[Long, Long]() // A mapping from thread ID to amount of memory reserved for unrolling part of a block (in // bytes), when the block is not able to fully put into memory, will return an iterator when @@ -66,10 +65,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val iteratorUnrollMemoryMap = mutable.HashMap[Long, Long]() - // A mapping from thread ID to amount of memory to be dropped for new blocks (in bytes). - // All accesses of this map are assumed to have manually synchronized on `accountingLock` - private val toDropMemoryMap = mutable.HashMap[Long, Long]() - // A mapping from thread ID to a blockId Set that to be dropped for new blocks. // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val toDropBlocksMap = mutable.HashMap[Long, HashSet[BlockId]]() @@ -81,36 +76,27 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) - // Initial memory to request before unrolling any block - private val unrollMemoryThreshold: Long = - conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) - - if (maxMemory < unrollMemoryThreshold) { - logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " + - s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " + - s"memory. Please configure Spark with more memory.") - } - - logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) - - /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */ - def actualFreeMemory: Long = maxMemory - currentMemory + /** + * Free memory not occupied by existing blocks. Note that this includes all memory that is + * in free state logically. + */ + def freeMemory: Long = maxMemory - currentMemory /** * Free memory that can be used when new blocks are trying to put into memory. The value - * includes the memory that are marked to be dropped. + * includes unroll memory. */ - def freeMemory: Long = maxMemory - ( + def freeMemoryForTryToPut: Long = maxMemory - ( currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + - currentReservedUnrollMemory - currentToDropMemory) - + currentReservedFreeMemory) + /** - * Free memory that can used when new blocks are unrolling to the memory. The value includes - * the memory that are marked to be dropped, but not include the memory for Unrolling. + * Free memory that can used when new blocks are unrolling to the memory. The memory only + * includes the memory that is in free state physically. */ def freeMemoryForUnroll: Long = maxMemory - ( currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + - currentUnrollMemory - currentToDropMemory) + currentReservedFreeMemory + currentUnrollMemory) override def getSize(blockId: BlockId): Long = { entries.synchronized { @@ -249,7 +235,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) delToDropBlocksMapForThisThread(blockId) val entry = entries.remove(blockId) if (entry != null) { - decreaseToDropMemoryForThisThread(entry.size) + // all memory that obtained by dropping old blocks should be reserved if necessary. + if (reservedFreeMemoryMap.contains(Thread.currentThread().getId)) { + increaseReservedFreeMemoryForThisThread(entry.size) + } currentMemory -= entry.size logInfo(s"Block $blockId of size ${entry.size} dropped from memory") true @@ -265,9 +254,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) tobeDroppedBlocksSet.clear() currentMemory = 0 unrollMemoryMap.clear() - reservedUnrollMemoryMap.clear() + reservedFreeMemoryMap.clear() iteratorUnrollMemoryMap.clear() - toDropMemoryMap.clear() toDropBlocksMap.clear() tryToPutMemoryMap.clear() } @@ -285,8 +273,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * * When there is not enough memory for unrolling blocks, old blocks will be dropped from * memory. The dropping operation is in parallel to fully utilized the disk throughput - * when there are multiple disks. And befor dropping, each thread will mark the old blocks - * that can be dropped. + * when there are multiple disks. Each thread will drop blocks selected by itself, and the + * freed memory by dropping old blocks can only be used by this thread before it finish + * unrolling. * * This method returns either an array with the contents of the entire block or an iterator * containing the values of the block (if the array would have exceeded available memory). @@ -312,53 +301,45 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] - // preUnroll this block safely, checking whether we have exceeded our threshold periodically + // Unroll this block safely, checking whether we have exceeded our threshold periodically try { while (values.hasNext && keepUnrolling) { vector += values.next() - // Every checking period reaches or the iterator reaches the end, we check whether extra + // Every checking period reaches or the iterator is exhausted, we check whether extra // memory is needed. if (elementsUnrolled % memoryCheckPeriod == 0 || !values.hasNext) { // If our vector's size has exceeded the threshold, request more memory val currentSize = vector.estimateSize() if (currentSize > memoryThreshold) { - val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong + val amountToRequest = values.hasNext match { + case true => + (currentSize * memoryGrowthFactor - memoryThreshold).toLong + case false => + // no need to request more memory than needed if iterator is exhausted + (currentSize - memoryThreshold).toLong + } if (freeMemoryForUnroll < amountToRequest) { keepUnrolling = false - var selectedMemory = 0L val selectedBlocks = new ArrayBuffer[BlockId]() val ensureSpaceResult = ensureFreeSpace( - blockId, amountToRequest, freeMemoryForUnroll, true) + blockId, amountToRequest, freeMemoryForUnroll) val enoughFreeSpace = ensureSpaceResult.success if (enoughFreeSpace) { selectedBlocks ++= ensureSpaceResult.toDropBlocksId - selectedMemory = ensureSpaceResult.selectedMemory if (!selectedBlocks.isEmpty) { - // drop old block in parallel to free memory for new blocks to unroll - for (selectedblockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(selectedblockId) } - if (entry != null) { - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[Array[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) - } - val droppedBlockStatus = blockManager.dropFromMemory(selectedblockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((selectedblockId, - status)) } - } - } + droppedBlocks ++= doDrop(selectedBlocks) } - // update reservedUnrollMemoryMap, indicate the tobeDroppedBlocks that marked by - // current thread has been dropped - decreaseReservedUnrollMemoryForThisThread(amountToRequest) + // blocks that selected "to-be-dropped" has been dropped, and memory is free for + // unroll. At the same time, memory that reserved for unroll ("to-be-dropped" + // block memory size) should be released. + increaseUnrollMemoryForThisThread(amountToRequest) + removeReservedFreeMemoryForThisThread() keepUnrolling = true } } else { increaseUnrollMemoryForThisThread(amountToRequest) } - if (keepUnrolling) { memoryThreshold += amountToRequest } @@ -368,7 +349,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } if (keepUnrolling) { - // to free up memory that requested more than needed + // to free up memory that requested more than needed, the value might be negative decreaseUnrollMemoryForThisThread(memoryThreshold - SizeEstimator.estimate( vector.toArray.asInstanceOf[AnyRef])) logInfo(s"Successfully unrolloing the block ${blockId} to memory, block size is " + @@ -390,11 +371,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) reserveIteratorUnrollMemoryForThisThread() removeUnrollMemoryForThisThread() } - // whatever we return, blocks that marked "to-be-dropped" should always have been dropped - removeToDropMemoryForThisThread() + // whatever we return, blocks that marked"to-be-dropped" should always have been dropped + cleanToDropBlocksMapForThisThread() // We will finally reset the ReservedUnrollMemory for current thread. The memory should - // always be 0 after dropping the selected blocks. - removeReservedUnrollMemoryForThisThread() + // always be 0 after unrolling. + removeReservedFreeMemoryForThisThread() } } } @@ -419,8 +400,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size * must also be passed by the caller. * - * In order to drop old blocks in parallel, we will first mark the blocks that can be dropped - * when there is not enough memory. + * The tryToPut operation is processed in parallel like Unroll process. In most case, the free + * memory are ready for tryToPut after Unroll. * * Return whether put was successful, along with the blocks dropped in the process. */ @@ -434,31 +415,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) var putSuccess = false var enoughFreeSpace = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - - var selectedMemory = 0L val selectedBlocks = new ArrayBuffer[BlockId]() - val freeSpaceResult = ensureFreeSpace(blockId, size, freeMemory, false) + val freeSpaceResult = ensureFreeSpace(blockId, size, freeMemoryForTryToPut) enoughFreeSpace = freeSpaceResult.success if (enoughFreeSpace) { selectedBlocks ++= freeSpaceResult.toDropBlocksId - selectedMemory = freeSpaceResult.selectedMemory try { if (!selectedBlocks.isEmpty) { - for (selectedblockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(selectedblockId) } - // drop old block in parallel to free memory for new blocks to put - if (entry != null) { - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[Array[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) - } - val droppedBlockStatus = blockManager.dropFromMemory(selectedblockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((selectedblockId, status)) } - } - } + droppedBlocks ++= doDrop(selectedBlocks) } + increaseTryToPutMemoryForThisThread(size) + decreaseUnrollMemoryForThisThread(size) } catch { // if there is exception, the current block will never put into Memory case e: Exception => { @@ -469,7 +437,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // whatever there is exception or not, blocks selected by this thread to drop should // already been dropped, and Unrolled Memory for this thread should also be 0. removeUnrollMemoryForThisThread() - removeToDropMemoryForThisThread() + removeReservedFreeMemoryForThisThread() + cleanToDropBlocksMapForThisThread() } val entry = new MemoryEntry(value, size, deserialized) @@ -488,7 +457,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // successfully. If so, we need to clear reserved unroll memory and to-be-dropped blocks for // this thread. removeUnrollMemoryForThisThread() - removeToDropMemoryForThisThread() + removeReservedFreeMemoryForThisThread() + cleanToDropBlocksMapForThisThread() // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. val data = if (deserialized) { @@ -508,9 +478,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * - * In this method each thread only make the marking operations on blocks to see whether - * there will be enough memory if dropping the selected blocks. The acturally dropping - * operation will begin if the marking operation succeed. + * In this method each thread will select blocks that can be dropped until the selected blocks + * memory is enough, and then in the caller, each thread will drop the blocks itself. The + * freed memory by dropping old blocks can only be used by current thread, other threads are + * not aware of the freed memory. * * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping * blocks. Otherwise, the freed space may fill up before the caller puts in their new value. @@ -521,8 +492,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) private def ensureFreeSpace( blockIdToAdd: BlockId, size: Long, - memoryFree: Long, - isUnroll: Boolean): ResultBlocksIdMemory = { + memoryFree: Long): ResultBlocksIdMemory = { logInfo(s"ensureFreeSpace($size) called with curMem=$currentMemory, maxMem=$maxMemory") var putSuccess = false @@ -533,9 +503,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (size > maxMemory) { logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit") - ResultBlocksIdMemory(success = false, selectedBlocks.toSeq, selectedMemory) + ResultBlocksIdMemory(enoughFreeSpace, selectedBlocks.toSeq) } else { - // This is synchronized with two purpose, one is to ensure that the set of entries + // This is synchronized with two purposes, one is to ensure that the set of entries // is not changed (because of getValue or getBytes) while traversing the iterator, // as that can lead to exceptions. The other is to ensure that only one thread is // traversing the entry to select "to-be-dropped" blocks and update the map information, @@ -548,7 +518,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val pair = iterator.next() val blockId = pair.getKey // only blocks that has not been selected can be selected - if (!tobeDroppedBlocksSet.contains(blockId)) { + if (!tobeDroppedBlocksSet(blockId)) { if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { selectedBlocks += blockId selectedMemory += pair.getValue.size @@ -559,86 +529,58 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) if (memoryFree + selectedMemory >= size) { tobeDroppedBlocksSet ++= selectedBlocks addToDropBlocksMapForThisThread(selectedBlocks.toArray) - increaseToDropMemoryForThisThread(selectedMemory) - if (isUnroll) { - increaseUnrollMemoryForThisThread(size) - increaseReservedUnrollMemoryForThisThread(size) - } else { - increaseTryToPutMemoryForThisThread(size) - decreaseUnrollMemoryForThisThread(size) + // reserve the free memory that can only be used by current thread + if (size - selectedMemory > 0) { + increaseReservedFreeMemoryForThisThread(size - selectedMemory) } enoughFreeSpace = true logInfo(selectedBlocks.size + " blocks selected for dropping") - ResultBlocksIdMemory(success = true, selectedBlocks.toSeq, selectedMemory) + ResultBlocksIdMemory(enoughFreeSpace, selectedBlocks.toSeq) } else { logInfo(s"Will not store $blockIdToAdd as it would require" + s" dropping another block from the same RDD") - ResultBlocksIdMemory(success = false, selectedBlocks.toSeq, selectedMemory) + ResultBlocksIdMemory(enoughFreeSpace, selectedBlocks.toSeq) } } } } - override def contains(blockId: BlockId): Boolean = { - entries.synchronized { entries.containsKey(blockId) } - } - - private[spark] def cleanupForThisThread(): Unit = { - removeUnrollMemoryForThisThread() - removeIteratorUnrollMemoryForThisThread() - removeToDropMemoryForThisThread() - removeTryToPutMemoryForThisThread() - } - /** - * Increase memory size that will be dropped by this thread in future, which means more - * old blocks are marked as "to-be-dropped" for this thread. - */ - private[spark] def increaseToDropMemoryForThisThread(memory: Long): Unit = { - val threadId = Thread.currentThread().getId - accountingLock.synchronized { - toDropMemoryMap(threadId) = toDropMemoryMap.getOrElse(threadId, 0L) + memory - } - } - /** - * Decrease memory size that will be dropped by this thread in future, which means some - * old blocks marked as "to-be-dropped" are finished dropping. + * dropping the blocks from memory, blocks will be dropped to disk if storage is useDisk */ - private[spark] def decreaseToDropMemoryForThisThread(memory: Long = -1L): Unit = { - val threadId = Thread.currentThread().getId - accountingLock.synchronized { - if (memory > 0) { - toDropMemoryMap(threadId) = toDropMemoryMap.getOrElse(threadId, 0L) - memory - // If this thread claims no more unroll memory, release it completely - if (toDropMemoryMap(threadId) <= 0) { - toDropMemoryMap.remove(threadId) - cleanToDropBlocksMapForThisThread() + private def doDrop(toDropBlocks: Seq[BlockId]): ArrayBuffer[(BlockId, BlockStatus)] = { + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + for (blocks <- toDropBlocks) { + val entry = entries.synchronized { entries.get(blocks) } + // drop old block to free memory for new blocks to put. + if (entry != null) { + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[Array[Any]]) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } + val droppedBlockStatus = blockManager.dropFromMemory(blocks, data) + droppedBlockStatus.foreach { status => droppedBlocks += ((blocks, status)) } } } + droppedBlocks } - /** - * Return the amount of memory currently totally to be dropped for unrolling blocks or for - * putting blocks across all threads. - */ - private[spark] def currentToDropMemory: Long = accountingLock.synchronized { - toDropMemoryMap.values.sum + override def contains(blockId: BlockId): Boolean = { + entries.synchronized { entries.containsKey(blockId) } } - /** - * Remove all memory that will be dropped for this thread, also the old blocks marked as - * "to-be-dropped" for this thread will remove the marking. - */ - private[spark] def removeToDropMemoryForThisThread(): Unit = accountingLock.synchronized { - toDropMemoryMap.remove(Thread.currentThread().getId) + def cleanupForThisThread(): Unit = { + removeUnrollMemoryForThisThread() + removeIteratorUnrollMemoryForThisThread() cleanToDropBlocksMapForThisThread() + removeTryToPutMemoryForThisThread() } - + /** - * Mark more old blocks as "to-be-dropped" for this thread. + * Add more old blocks as "to-be-dropped" for this thread. */ - private[spark] def addToDropBlocksMapForThisThread(blocksId: Array[BlockId]): Unit = { + def addToDropBlocksMapForThisThread(blocksId: Array[BlockId]): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()) ++= blocksId @@ -649,7 +591,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Remove a specified block from the map that marked as "to-be-dropped" from this thread, * which means the blocks has been dropped from the memory. */ - private[spark] def delToDropBlocksMapForThisThread(blockId: BlockId): Unit = { + def delToDropBlocksMapForThisThread(blockId: BlockId): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()).remove(blockId) @@ -657,10 +599,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** - * Remove all block that marked as "to-be-dropped" from the map for this thread, which means - * either the blocks has been dropped from memory or the the marking is invalid. + * Remove all blocks that selected as "to-be-dropped" for this thread, which means + * either the blocks has been dropped from memory or there are some exceptions. */ - private[spark] def cleanToDropBlocksMapForThisThread(): Unit = { + def cleanToDropBlocksMapForThisThread(): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { val blockIdSet = toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()) @@ -674,11 +616,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } } + + /** + * Remove the current thread's allocation for unrolling. + */ + def releaseUnrollMemoryForThisThread(): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + unrollMemoryMap.remove(threadId) + } + } /** - * Reserve additional memory for unrolling blocks reserved by this thread. + * Get additional memory for unrolling for this thread. */ - private[spark] def increaseUnrollMemoryForThisThread(memory: Long): Unit = { + def increaseUnrollMemoryForThisThread(memory: Long): Unit = { accountingLock.synchronized { val threadId = Thread.currentThread().getId unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory @@ -686,25 +638,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** - * Release memory reserved by this thread for unrolling blocks. - * If the amount is not specified, remove the current thread's allocation altogether. + * Decrease memory reserved by this thread for unrolling blocks. */ - private[spark] def decreaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { + def decreaseUnrollMemoryForThisThread(memory: Long): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { - if (memory > 0) { - unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) - memory - // If this thread claims no more unroll memory, release it completely - if (unrollMemoryMap(threadId) <= 0) { - unrollMemoryMap.remove(threadId) - } + unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) - memory + // If this thread claims no more unroll memory, release it completely + if (unrollMemoryMap(threadId) < 0) { + unrollMemoryMap.remove(threadId) } } } - + /** * Return the amount of memory currently totally reserved for unrolling blocks across - * all threads. The urolling blocks are blocks that have been confirmed can putting into + * all threads. The unrolling blocks are blocks that have been confirmed can putting into * the memory. */ def currentUnrollMemory: Long = accountingLock.synchronized { @@ -712,67 +661,49 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** - * Rmove the memory from unrolling from the map, which means either the blocks has been marked - * as "try-to-put" or blocks has been put into memory or the unrolled memory is invalid. + * Remove the memory from unrolling from the map, which means either the blocks has been + * selected as "try-to-put" or blocks has been put into memory or the unrolled memory + * is invalid. */ - private[spark] def removeUnrollMemoryForThisThread(): Unit = accountingLock.synchronized { + def removeUnrollMemoryForThisThread(): Unit = accountingLock.synchronized { unrollMemoryMap.remove(Thread.currentThread().getId) } /** - * Increase memory for reservedUnroll for this thread. This only happen when there is not enough - * space for unrolling new block and need to drop old block for more space. So, each - * reservedUnrollMemory will correspond to some amount "to-be-dropped" memory, and after the - * corresponding "to-be-dropped" blocks are dropped from memroy, reservedunrollMemory should - * also be refreshed. + * Increase reserved free memory for this thread. This only happen when there is not enough + * space for unrolling new block and need to drop old block for more space. Once the selected + * blocks of this thread is dropped from memory, the freed memory should be reserved by this + * thread to avoid being used by other threads. reservedFreeMemory should also be refreshed + * after finished unrolling or tryToPut */ - private[spark] def increaseReservedUnrollMemoryForThisThread(memory: Long): Unit = { + def increaseReservedFreeMemoryForThisThread(memory: Long): Unit = { accountingLock.synchronized { val threadId = Thread.currentThread().getId - reservedUnrollMemoryMap(threadId) = reservedUnrollMemoryMap.getOrElse(threadId, 0L) + memory - } - } - - /** - * Release memory used for reserveUnroll by this thread. Which means the corresponding - * "to-be-dropped" blocks has been dropped from the memory. - */ - private[spark] def decreaseReservedUnrollMemoryForThisThread(memory: Long = -1L): Unit = { - val threadId = Thread.currentThread().getId - accountingLock.synchronized { - if (memory > 0) { - reservedUnrollMemoryMap(threadId) = reservedUnrollMemoryMap.getOrElse( - threadId, 0L) - memory - // If this thread claims no more reservedUnroll memory, release it completely - if (reservedUnrollMemoryMap(threadId) <= 0) { - reservedUnrollMemoryMap.remove(threadId) - } - } + reservedFreeMemoryMap(threadId) = reservedFreeMemoryMap.getOrElse(threadId, 0L) + memory } } /** - * Return the amount of memory currently totally reserved for reservedUnrolling across - * all threads. + * Return the amount of free memory reserved by all threads */ - private[spark] def currentReservedUnrollMemory: Long = accountingLock.synchronized { - reservedUnrollMemoryMap.values.sum + def currentReservedFreeMemory: Long = accountingLock.synchronized { + reservedFreeMemoryMap.values.sum } /** - * clean the reservedunrollMemoryMap for this thread, each time after the unrolling process, - * this method need to be called. + * clean the reservedFreeMemoryMap for this thread, each time after the unrolling process and + * tryToPutprocess, this method need to be called. */ - private[spark] def removeReservedUnrollMemoryForThisThread() + def removeReservedFreeMemoryForThisThread() : Unit = accountingLock.synchronized { - reservedUnrollMemoryMap.remove(Thread.currentThread().getId) + reservedFreeMemoryMap.remove(Thread.currentThread().getId) } /** - * When a block can not unroll into memory, the memory size it has already reserved should - * maintained in iteratorUnrollMemoryMap. + * When a block can not unroll into memory, the memory size that used to store the computed + * part should maintained in iteratorUnrollMemoryMap. */ - private[spark] def reserveIteratorUnrollMemoryForThisThread(): Unit = { + def reserveIteratorUnrollMemoryForThisThread(): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { val unrolledMem = unrollMemoryMap.getOrElse(threadId, 0L) @@ -780,29 +711,29 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) threadId, 0L) + unrolledMem } } + /** - * Return the amount of memory currently totally reserved for part of blocks that can not put + * Return the amount of memory currently totally used for part of blocks that can not put * into the memory (will drop to disk or just drop from meory in future) across all threads. */ - private[spark] def currentIteratorUnrollMemory: Long = accountingLock.synchronized { + def currentIteratorUnrollMemory: Long = accountingLock.synchronized { iteratorUnrollMemoryMap.values.sum } /** - * After the block dropped from memory, should clean the reservedUnrollMemoryMap, which will - * free up memory for new blocks to unroll or to tryToPut. + * After the block put into disk or dropped from memory, reservedIteratorUnrollMemoryMap should + * refresh the value for this thread. */ - private[spark] def removeIteratorUnrollMemoryForThisThread() + def removeIteratorUnrollMemoryForThisThread() : Unit = accountingLock.synchronized { iteratorUnrollMemoryMap.remove(Thread.currentThread().getId) } - - + /** - * Reserve additional memory for putting blocks for this thread. That meand more blocks are + * Reserve additional memory for putting blocks for this thread. That means more blocks are * waiting to put into memory, and before putting into memory, it will reserve some memory first. */ - private[spark] def increaseTryToPutMemoryForThisThread(memory: Long): Unit = { + def increaseTryToPutMemoryForThisThread(memory: Long): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { tryToPutMemoryMap(threadId) = tryToPutMemoryMap.getOrElse(threadId, 0L) + memory @@ -810,18 +741,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** - * Release used by this thread for putting new blocks, which means new block has been put into - * memory already. + * Release memory used by this thread for putting new blocks, which means new block has been + * finished putting into memory already. */ - private[spark] def decreaseTryToPutMemoryForThisThread(memory: Long = -1L): Unit = { + def decreaseTryToPutMemoryForThisThread(memory: Long): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { - if (memory > 0) { - tryToPutMemoryMap(threadId) = tryToPutMemoryMap.getOrElse(threadId, memory) - memory - // If this thread claims no more unroll memory, release it completely - if (tryToPutMemoryMap(threadId) <= 0) { - tryToPutMemoryMap.remove(threadId) - } + tryToPutMemoryMap(threadId) = tryToPutMemoryMap.getOrElse(threadId, memory) - memory + // If this thread claims no more unroll memory, release it completely + if (tryToPutMemoryMap(threadId) < 0) { + tryToPutMemoryMap.remove(threadId) } } } @@ -829,17 +758,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) /** * Return the amount of memory currently reserved for putting new blocks across all threads. */ - private[spark] def currentTryToPutMemory: Long = accountingLock.synchronized { + def currentTryToPutMemory: Long = accountingLock.synchronized { tryToPutMemoryMap.values.sum } /** - * Clean all memory reserved for putting new blocks, at the same time, marking of blocks - * marked as "to-be-dropped" for this thread will be cleaned. + * Clean all memory reserved for putting new blocks. */ - private [spark] def removeTryToPutMemoryForThisThread(): Unit = { + def removeTryToPutMemoryForThisThread(): Unit = { tryToPutMemoryMap.remove(Thread.currentThread().getId) - removeToDropMemoryForThisThread() } /** @@ -883,5 +810,4 @@ private[spark] case class ResultWithDroppedBlocks( private[spark] case class ResultBlocksIdMemory( success: Boolean, - toDropBlocksId: Seq[BlockId], - selectedMemory: Long) + toDropBlocksId: Seq[BlockId]) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a510084ce597..16a706d9e931 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1082,6 +1082,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach // memoryGrowthFactor is 1.5, put midList fist, will must drop someBlock3 when putting // huge block. store.putIterator("someBlock3", midList.iterator, StorageLevel.MEMORY_ONLY) + assert(memoryStore.contains("someBlock3")) // Unroll huge block with not enough space. Even after ensuring free space by dropping old // block "someBlock3", there is still not enough room to unroll this block. // This returns an iterator. In the mean time, however, we kicked out "someBlock3" before From 3192a6de7254107c038ab2b0b6868295bee11231 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Thu, 16 Apr 2015 13:21:50 +0800 Subject: [PATCH 8/9] rebase again, and add some changes back after rebase. --- .../org/apache/spark/storage/MemoryStore.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index a153b8da2800..4dc900616ac2 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -399,6 +399,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Try to put in a set of values, if we can free up enough space. The value should either be * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size * must also be passed by the caller. + * + * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be + * created to avoid OOM since it may be a big ByteBuffer. * * The tryToPut operation is processed in parallel like Unroll process. In most case, the free * memory are ready for tryToPut after Unroll. @@ -408,7 +411,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) private def tryToPut( blockId: BlockId, - value: Any, + value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = { @@ -441,7 +444,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) cleanToDropBlocksMapForThisThread() } - val entry = new MemoryEntry(value, size, deserialized) + val entry = new MemoryEntry(value(), size, deserialized) entries.synchronized { entries.put(blockId, entry) decreaseTryToPutMemoryForThisThread(size) @@ -461,12 +464,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) cleanToDropBlocksMapForThisThread() // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. - val data = if (deserialized) { - Left(value.asInstanceOf[Array[Any]]) + lazy val data = if (deserialized) { + Left(value().asInstanceOf[Array[Any]]) } else { - Right(value.asInstanceOf[ByteBuffer].duplicate()) + Right(value().asInstanceOf[ByteBuffer].duplicate()) } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) + val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } ResultWithDroppedBlocks(putSuccess, droppedBlocks) @@ -657,7 +660,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * the memory. */ def currentUnrollMemory: Long = accountingLock.synchronized { - unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum + unrollMemoryMap.values.sum } /** From c24815624f9e51debfe1ca8e4859c1a58b30a42a Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Thu, 16 Apr 2015 14:03:42 +0800 Subject: [PATCH 9/9] fix test code style --- .../org/apache/spark/storage/BlockManagerSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 16a706d9e931..5bdb76b80889 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1104,9 +1104,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val smallList = List.fill(40)(new Array[Byte](100)) val midList = List.fill(60)(new Array[Byte](100)) val bigList = List.fill(400)(new Array[Byte](100)) - def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]] - def midIterator = midList.iterator.asInstanceOf[Iterator[Any]] - def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]] + def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] + def midIterator: Iterator[Any] = midList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemory === 0) // Unroll with plenty of space. This should succeed and cache both blocks. @@ -1164,9 +1164,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val smallList = List.fill(40)(new Array[Byte](100)) val midList = List.fill(60)(new Array[Byte](100)) val bigList = List.fill(400)(new Array[Byte](100)) - def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]] - def midIterator = midList.iterator.asInstanceOf[Iterator[Any]] - def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]] + def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] + def midIterator: Iterator[Any] = midList.iterator.asInstanceOf[Iterator[Any]] + def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] assert(memoryStore.currentUnrollMemory === 0) store.putIterator("b1", smallIterator, memAndDisk)