diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 14f99a464b6e..76739bc07772 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -292,8 +292,8 @@ private[spark] class Executor( } finally { // Release memory used by this thread for shuffles env.shuffleMemoryManager.releaseMemoryForThisThread() - // Release memory used by this thread for unrolling blocks - env.blockManager.memoryStore.releaseUnrollMemoryForThisThread() + // cleanup maintained memory infomation for this thread + env.blockManager.memoryStore.cleanupForThisThread() // Release memory used by this thread for accumulators Accumulators.clear() runningTasks.remove(taskId) diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index ed609772e697..4dc900616ac2 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.util.LinkedHashMap import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashSet} import org.apache.spark.util.{SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector @@ -37,47 +37,66 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) private val conf = blockManager.conf private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) + // a Set maintains old blocks that will be dropped + private val tobeDroppedBlocksSet = new HashSet[BlockId] + // currentMemory is actually memory that is already used for caching blocks @volatile private var currentMemory = 0L - // Ensure only one thread is putting, and if necessary, dropping blocks at any given time + // Ensure only one thread updating the information, information including memory to drop, + // memory unrolled, etc. private val accountingLock = new Object - // A mapping from thread ID to amount of memory used for unrolling a block (in bytes) + // A mapping from thread ID to amount of memory used for unrolling a block (in bytes). + // The memory is only for unrolling, not actually occupied by blocks. // All accesses of this map are assumed to have manually synchronized on `accountingLock` private val unrollMemoryMap = mutable.HashMap[Long, Long]() - // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. - // Pending unroll memory refers to the intermediate memory occupied by a thread - // after the unroll but before the actual putting of the block in the cache. - // This chunk of memory is expected to be released *as soon as* we finish - // caching the corresponding block as opposed to until after the task finishes. - // This is only used if a block is successfully unrolled in its entirety in - // memory (SPARK-4777). - private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]() + + // A mapping from thread ID to amount of memory that is free but is reserved for Unroll + // or tryToPut. The memory is free but has been reserved by current thread so that other + // thread would not know this free memory. After Unroll or tryToPut, this free memory will + // be used or released for other threads. + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val reservedFreeMemoryMap = mutable.HashMap[Long, Long]() + + // A mapping from thread ID to amount of memory reserved for unrolling part of a block (in + // bytes), when the block is not able to fully put into memory, will return an iterator when + // unrolling, but the memory still need to reserved before the block is dropping from memory. + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val iteratorUnrollMemoryMap = mutable.HashMap[Long, Long]() + + // A mapping from thread ID to a blockId Set that to be dropped for new blocks. + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val toDropBlocksMap = mutable.HashMap[Long, HashSet[BlockId]]() + + // A mapping from thread ID to amount of memory reserved by new blocks to put (in bytes). + // The memory is reserved before blocks actually put into. + // All accesses of this map are assumed to have manually synchronized on `accountingLock` + private val tryToPutMemoryMap = mutable.HashMap[Long, Long]() + + logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) + /** + * Free memory not occupied by existing blocks. Note that this includes all memory that is + * in free state logically. + */ + def freeMemory: Long = maxMemory - currentMemory + /** - * The amount of space ensured for unrolling values in memory, shared across all cores. - * This space is not reserved in advance, but allocated dynamically by dropping existing blocks. + * Free memory that can be used when new blocks are trying to put into memory. The value + * includes unroll memory. */ - private val maxUnrollMemory: Long = { - val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2) - (maxMemory * unrollFraction).toLong - } + def freeMemoryForTryToPut: Long = maxMemory - ( + currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + + currentReservedFreeMemory) - // Initial memory to request before unrolling any block - private val unrollMemoryThreshold: Long = - conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) - - if (maxMemory < unrollMemoryThreshold) { - logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " + - s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " + - s"memory. Please configure Spark with more memory.") - } - - logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory))) - - /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */ - def freeMemory: Long = maxMemory - currentMemory + /** + * Free memory that can used when new blocks are unrolling to the memory. The memory only + * includes the memory that is in free state physically. + */ + def freeMemoryForUnroll: Long = maxMemory - ( + currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory + + currentReservedFreeMemory + currentUnrollMemory) override def getSize(blockId: BlockId): Long = { entries.synchronized { @@ -172,7 +191,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") - val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) + val res = blockManager.diskStore.putIterator(blockId, + iteratorValues, level, returnValues) PutResult(res.size, res.data, droppedBlocks) } else { PutResult(0, Left(iteratorValues), droppedBlocks) @@ -209,10 +229,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def remove(blockId: BlockId): Boolean = { entries.synchronized { + // Every time when removing blocks from memory, the information about blocks that to be + // dropped need to be refreshed. + tobeDroppedBlocksSet.remove(blockId) + delToDropBlocksMapForThisThread(blockId) val entry = entries.remove(blockId) if (entry != null) { + // all memory that obtained by dropping old blocks should be reserved if necessary. + if (reservedFreeMemoryMap.contains(Thread.currentThread().getId)) { + increaseReservedFreeMemoryForThisThread(entry.size) + } currentMemory -= entry.size - logDebug(s"Block $blockId of size ${entry.size} dropped from memory (free $freeMemory)") + logInfo(s"Block $blockId of size ${entry.size} dropped from memory") true } else { false @@ -223,7 +251,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def clear() { entries.synchronized { entries.clear() + tobeDroppedBlocksSet.clear() currentMemory = 0 + unrollMemoryMap.clear() + reservedFreeMemoryMap.clear() + iteratorUnrollMemoryMap.clear() + toDropBlocksMap.clear() + tryToPutMemoryMap.clear() } logInfo("MemoryStore cleared") } @@ -237,9 +271,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * stopping immediately if not. This check is a safeguard against the scenario in which * there is not enough free memory to accommodate the entirety of a single block. * + * When there is not enough memory for unrolling blocks, old blocks will be dropped from + * memory. The dropping operation is in parallel to fully utilized the disk throughput + * when there are multiple disks. Each thread will drop blocks selected by itself, and the + * freed memory by dropping old blocks can only be used by this thread before it finish + * unrolling. + * * This method returns either an array with the contents of the entire block or an iterator * containing the values of the block (if the array would have exceeded available memory). */ + def unrollSafely( blockId: BlockId, values: Iterator[Any], @@ -247,61 +288,72 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) : Either[Array[Any], Iterator[Any]] = { // Number of elements unrolled so far - var elementsUnrolled = 0 + var elementsUnrolled = 0L // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true - // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing. - val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory val memoryCheckPeriod = 16 // Memory currently reserved by this thread for this particular unrolling operation - var memoryThreshold = initialMemoryThreshold + // Initial value is 0 means don't reserve memory originally, only reserve dynamically + var memoryThreshold = 0L // Memory to request as a multiple of current vector size val memoryGrowthFactor = 1.5 - // Previous unroll memory held by this thread, for releasing later (only at the very end) - val previousMemoryReserved = currentUnrollMemoryForThisThread // Underlying vector for unrolling the block var vector = new SizeTrackingVector[Any] - - // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold) - - if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + - s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") - } - + // Unroll this block safely, checking whether we have exceeded our threshold periodically try { while (values.hasNext && keepUnrolling) { vector += values.next() - if (elementsUnrolled % memoryCheckPeriod == 0) { + // Every checking period reaches or the iterator is exhausted, we check whether extra + // memory is needed. + if (elementsUnrolled % memoryCheckPeriod == 0 || !values.hasNext) { // If our vector's size has exceeded the threshold, request more memory val currentSize = vector.estimateSize() - if (currentSize >= memoryThreshold) { - val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - // Hold the accounting lock, in case another thread concurrently puts a block that - // takes up the unrolling space we just ensured here - accountingLock.synchronized { - if (!reserveUnrollMemoryForThisThread(amountToRequest)) { - // If the first request is not granted, try again after ensuring free space - // If there is still not enough space, give up and drop the partition - val spaceToEnsure = maxUnrollMemory - currentUnrollMemory - if (spaceToEnsure > 0) { - val result = ensureFreeSpace(blockId, spaceToEnsure) - droppedBlocks ++= result.droppedBlocks + if (currentSize > memoryThreshold) { + val amountToRequest = values.hasNext match { + case true => + (currentSize * memoryGrowthFactor - memoryThreshold).toLong + case false => + // no need to request more memory than needed if iterator is exhausted + (currentSize - memoryThreshold).toLong + } + if (freeMemoryForUnroll < amountToRequest) { + keepUnrolling = false + val selectedBlocks = new ArrayBuffer[BlockId]() + val ensureSpaceResult = ensureFreeSpace( + blockId, amountToRequest, freeMemoryForUnroll) + val enoughFreeSpace = ensureSpaceResult.success + + if (enoughFreeSpace) { + selectedBlocks ++= ensureSpaceResult.toDropBlocksId + if (!selectedBlocks.isEmpty) { + droppedBlocks ++= doDrop(selectedBlocks) } - keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest) + // blocks that selected "to-be-dropped" has been dropped, and memory is free for + // unroll. At the same time, memory that reserved for unroll ("to-be-dropped" + // block memory size) should be released. + increaseUnrollMemoryForThisThread(amountToRequest) + removeReservedFreeMemoryForThisThread() + keepUnrolling = true } + } else { + increaseUnrollMemoryForThisThread(amountToRequest) + } + if (keepUnrolling) { + memoryThreshold += amountToRequest } - // New threshold is currentSize * memoryGrowthFactor - memoryThreshold += amountToRequest } } elementsUnrolled += 1 } if (keepUnrolling) { + // to free up memory that requested more than needed, the value might be negative + decreaseUnrollMemoryForThisThread(memoryThreshold - SizeEstimator.estimate( + vector.toArray.asInstanceOf[AnyRef])) + logInfo(s"Successfully unrolloing the block ${blockId} to memory, block size is " + + s"${SizeEstimator.estimate(vector.toArray.asInstanceOf[AnyRef])}") // We successfully unrolled the entirety of this block Left(vector.toArray) } else { @@ -309,18 +361,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) logUnrollFailureMessage(blockId, vector.estimateSize()) Right(vector.iterator ++ values) } - } finally { - // If we return an array, the values returned will later be cached in `tryToPut`. - // In this case, we should release the memory after we cache the block there. - // Otherwise, if we return an iterator, we release the memory reserved here - // later when the task finishes. - if (keepUnrolling) { - accountingLock.synchronized { - val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved - releaseUnrollMemoryForThisThread(amountToRelease) - reservePendingUnrollMemoryForThisThread(amountToRelease) + accountingLock.synchronized { + // If we return an iterator, that means the blocks is not able to put into memory, and + // will be dropped to disk if it can or just dropped from memory. The memory reserved + // for unrolling will not be released because it depending on the underlying vector. + // The memory size will be maintained from unrollMemoryMap to iteratorUnrollMemoryMap. + if (!keepUnrolling) { + reserveIteratorUnrollMemoryForThisThread() + removeUnrollMemoryForThisThread() } + // whatever we return, blocks that marked"to-be-dropped" should always have been dropped + cleanToDropBlocksMapForThisThread() + // We will finally reset the ReservedUnrollMemory for current thread. The memory should + // always be 0 after unrolling. + removeReservedFreeMemoryForThisThread() } } } @@ -344,60 +399,78 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * Try to put in a set of values, if we can free up enough space. The value should either be * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size * must also be passed by the caller. - * + * * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be * created to avoid OOM since it may be a big ByteBuffer. * - * Synchronize on `accountingLock` to ensure that all the put requests and its associated block - * dropping is done by only on thread at a time. Otherwise while one thread is dropping - * blocks to free memory for one block, another thread may use up the freed space for - * another block. - * + * The tryToPut operation is processed in parallel like Unroll process. In most case, the free + * memory are ready for tryToPut after Unroll. + * * Return whether put was successful, along with the blocks dropped in the process. */ + private def tryToPut( blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = { - /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks - * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has - * been released, it must be ensured that those to-be-dropped blocks are not double counted - * for freeing up more space for another block that needs to be put. Only then the actually - * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ - var putSuccess = false + var enoughFreeSpace = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - - accountingLock.synchronized { - val freeSpaceResult = ensureFreeSpace(blockId, size) - val enoughFreeSpace = freeSpaceResult.success - droppedBlocks ++= freeSpaceResult.droppedBlocks - - if (enoughFreeSpace) { - val entry = new MemoryEntry(value(), size, deserialized) - entries.synchronized { - entries.put(blockId, entry) - currentMemory += size + val selectedBlocks = new ArrayBuffer[BlockId]() + + val freeSpaceResult = ensureFreeSpace(blockId, size, freeMemoryForTryToPut) + enoughFreeSpace = freeSpaceResult.success + if (enoughFreeSpace) { + selectedBlocks ++= freeSpaceResult.toDropBlocksId + try { + if (!selectedBlocks.isEmpty) { + droppedBlocks ++= doDrop(selectedBlocks) } - val valuesOrBytes = if (deserialized) "values" else "bytes" - logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( - blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) - putSuccess = true - } else { - // Tell the block manager that we couldn't put it in memory so that it can drop it to - // disk if the block allows disk storage. - lazy val data = if (deserialized) { - Left(value().asInstanceOf[Array[Any]]) - } else { - Right(value().asInstanceOf[ByteBuffer].duplicate()) + increaseTryToPutMemoryForThisThread(size) + decreaseUnrollMemoryForThisThread(size) + } catch { + // if there is exception, the current block will never put into Memory + case e: Exception => { + decreaseTryToPutMemoryForThisThread(size) + throw e } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + } finally { + // whatever there is exception or not, blocks selected by this thread to drop should + // already been dropped, and Unrolled Memory for this thread should also be 0. + removeUnrollMemoryForThisThread() + removeReservedFreeMemoryForThisThread() + cleanToDropBlocksMapForThisThread() + } + + val entry = new MemoryEntry(value(), size, deserialized) + entries.synchronized { + entries.put(blockId, entry) + decreaseTryToPutMemoryForThisThread(size) + currentMemory += size } - // Release the unroll memory used because we no longer need the underlying Array - releasePendingUnrollMemoryForThisThread() + val valuesOrBytes = if (deserialized) "values" else "bytes" + logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format( + blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory))) + putSuccess = true + } else { + logInfo(s"Failed to put block ${blockId} to memory, block size is ${size}.") + // For some reason, blocks might still not be able to put into memory even unroll + // successfully. If so, we need to clear reserved unroll memory and to-be-dropped blocks for + // this thread. + removeUnrollMemoryForThisThread() + removeReservedFreeMemoryForThisThread() + cleanToDropBlocksMapForThisThread() + // Tell the block manager that we couldn't put it in memory so that it can drop it to + // disk if the block allows disk storage. + lazy val data = if (deserialized) { + Left(value().asInstanceOf[Array[Any]]) + } else { + Right(value().asInstanceOf[ByteBuffer].duplicate()) + } + val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) + droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } ResultWithDroppedBlocks(putSuccess, droppedBlocks) } @@ -408,147 +481,295 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * + * In this method each thread will select blocks that can be dropped until the selected blocks + * memory is enough, and then in the caller, each thread will drop the blocks itself. The + * freed memory by dropping old blocks can only be used by current thread, other threads are + * not aware of the freed memory. + * * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping * blocks. Otherwise, the freed space may fill up before the caller puts in their new value. * - * Return whether there is enough free space, along with the blocks dropped in the process. + * Return whether there is enough free space, along with the blocks marked as "to-be-dropped" + * and the memory that can be freed if the "to-be-dropped" blocks are actually dropped. */ private def ensureFreeSpace( - blockIdToAdd: BlockId, - space: Long): ResultWithDroppedBlocks = { - logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory") + blockIdToAdd: BlockId, + size: Long, + memoryFree: Long): ResultBlocksIdMemory = { + logInfo(s"ensureFreeSpace($size) called with curMem=$currentMemory, maxMem=$maxMemory") + var putSuccess = false + var enoughFreeSpace = false val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + var selectedMemory = 0L + val selectedBlocks = new ArrayBuffer[BlockId]() - if (space > maxMemory) { + if (size > maxMemory) { logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit") - return ResultWithDroppedBlocks(success = false, droppedBlocks) - } - - // Take into account the amount of memory currently occupied by unrolling blocks - // and minus the pending unroll memory for that block on current thread. - val threadId = Thread.currentThread().getId - val actualFreeMemory = freeMemory - currentUnrollMemory + - pendingUnrollMemoryMap.getOrElse(threadId, 0L) - - if (actualFreeMemory < space) { - val rddToAdd = getRddId(blockIdToAdd) - val selectedBlocks = new ArrayBuffer[BlockId] - var selectedMemory = 0L - - // This is synchronized to ensure that the set of entries is not changed - // (because of getValue or getBytes) while traversing the iterator, as that - // can lead to exceptions. + ResultBlocksIdMemory(enoughFreeSpace, selectedBlocks.toSeq) + } else { + // This is synchronized with two purposes, one is to ensure that the set of entries + // is not changed (because of getValue or getBytes) while traversing the iterator, + // as that can lead to exceptions. The other is to ensure that only one thread is + // traversing the entry to select "to-be-dropped" blocks and update the map information, + // to avoid same block is selected by multiple threads. entries.synchronized { - val iterator = entries.entrySet().iterator() - while (actualFreeMemory + selectedMemory < space && iterator.hasNext) { - val pair = iterator.next() - val blockId = pair.getKey - if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { - selectedBlocks += blockId - selectedMemory += pair.getValue.size + if (memoryFree < size) { + val rddToAdd = getRddId(blockIdToAdd) + val iterator = entries.entrySet().iterator() + while (memoryFree + selectedMemory < size && iterator.hasNext) { + val pair = iterator.next() + val blockId = pair.getKey + // only blocks that has not been selected can be selected + if (!tobeDroppedBlocksSet(blockId)) { + if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) { + selectedBlocks += blockId + selectedMemory += pair.getValue.size + } + } } } + if (memoryFree + selectedMemory >= size) { + tobeDroppedBlocksSet ++= selectedBlocks + addToDropBlocksMapForThisThread(selectedBlocks.toArray) + // reserve the free memory that can only be used by current thread + if (size - selectedMemory > 0) { + increaseReservedFreeMemoryForThisThread(size - selectedMemory) + } + enoughFreeSpace = true + logInfo(selectedBlocks.size + " blocks selected for dropping") + ResultBlocksIdMemory(enoughFreeSpace, selectedBlocks.toSeq) + } else { + logInfo(s"Will not store $blockIdToAdd as it would require" + + s" dropping another block from the same RDD") + ResultBlocksIdMemory(enoughFreeSpace, selectedBlocks.toSeq) + } } + } + } - if (actualFreeMemory + selectedMemory >= space) { - logInfo(s"${selectedBlocks.size} blocks selected for dropping") - for (blockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(blockId) } - // This should never be null as only one thread should be dropping - // blocks and removing entries. However the check is still here for - // future safety. - if (entry != null) { - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[Array[Any]]) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) - } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } - } + /** + * dropping the blocks from memory, blocks will be dropped to disk if storage is useDisk + */ + private def doDrop(toDropBlocks: Seq[BlockId]): ArrayBuffer[(BlockId, BlockStatus)] = { + val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + for (blocks <- toDropBlocks) { + val entry = entries.synchronized { entries.get(blocks) } + // drop old block to free memory for new blocks to put. + if (entry != null) { + val data = if (entry.deserialized) { + Left(entry.value.asInstanceOf[Array[Any]]) + } else { + Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } - return ResultWithDroppedBlocks(success = true, droppedBlocks) - } else { - logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " + - "from the same RDD") - return ResultWithDroppedBlocks(success = false, droppedBlocks) + val droppedBlockStatus = blockManager.dropFromMemory(blocks, data) + droppedBlockStatus.foreach { status => droppedBlocks += ((blocks, status)) } } } - ResultWithDroppedBlocks(success = true, droppedBlocks) + droppedBlocks } override def contains(blockId: BlockId): Boolean = { entries.synchronized { entries.containsKey(blockId) } } + def cleanupForThisThread(): Unit = { + removeUnrollMemoryForThisThread() + removeIteratorUnrollMemoryForThisThread() + cleanToDropBlocksMapForThisThread() + removeTryToPutMemoryForThisThread() + } + /** - * Reserve additional memory for unrolling blocks used by this thread. - * Return whether the request is granted. + * Add more old blocks as "to-be-dropped" for this thread. */ - def reserveUnrollMemoryForThisThread(memory: Long): Boolean = { + def addToDropBlocksMapForThisThread(blocksId: Array[BlockId]): Unit = { + val threadId = Thread.currentThread().getId accountingLock.synchronized { - val granted = freeMemory > currentUnrollMemory + memory - if (granted) { - val threadId = Thread.currentThread().getId - unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory - } - granted + toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()) ++= blocksId + } + } + + /** + * Remove a specified block from the map that marked as "to-be-dropped" from this thread, + * which means the blocks has been dropped from the memory. + */ + def delToDropBlocksMapForThisThread(blockId: BlockId): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()).remove(blockId) + } + } + + /** + * Remove all blocks that selected as "to-be-dropped" for this thread, which means + * either the blocks has been dropped from memory or there are some exceptions. + */ + def cleanToDropBlocksMapForThisThread(): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + val blockIdSet = toDropBlocksMap.getOrElse(threadId, new HashSet[BlockId]()) + if (!blockIdSet.isEmpty) { + val itr = blockIdSet.iterator + while (itr.hasNext) { + val blockId = itr.next() + tobeDroppedBlocksSet.remove(blockId) + } + toDropBlocksMap.remove(threadId) + } } } /** - * Release memory used by this thread for unrolling blocks. - * If the amount is not specified, remove the current thread's allocation altogether. + * Remove the current thread's allocation for unrolling. */ - def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = { + def releaseUnrollMemoryForThisThread(): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { - if (memory < 0) { + unrollMemoryMap.remove(threadId) + } + } + + /** + * Get additional memory for unrolling for this thread. + */ + def increaseUnrollMemoryForThisThread(memory: Long): Unit = { + accountingLock.synchronized { + val threadId = Thread.currentThread().getId + unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory + } + } + + /** + * Decrease memory reserved by this thread for unrolling blocks. + */ + def decreaseUnrollMemoryForThisThread(memory: Long): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) - memory + // If this thread claims no more unroll memory, release it completely + if (unrollMemoryMap(threadId) < 0) { unrollMemoryMap.remove(threadId) - } else { - unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory - // If this thread claims no more unroll memory, release it completely - if (unrollMemoryMap(threadId) <= 0) { - unrollMemoryMap.remove(threadId) - } } } } + + /** + * Return the amount of memory currently totally reserved for unrolling blocks across + * all threads. The unrolling blocks are blocks that have been confirmed can putting into + * the memory. + */ + def currentUnrollMemory: Long = accountingLock.synchronized { + unrollMemoryMap.values.sum + } /** - * Reserve the unroll memory of current unroll successful block used by this thread - * until actually put the block into memory entry. + * Remove the memory from unrolling from the map, which means either the blocks has been + * selected as "try-to-put" or blocks has been put into memory or the unrolled memory + * is invalid. */ - def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = { - val threadId = Thread.currentThread().getId + def removeUnrollMemoryForThisThread(): Unit = accountingLock.synchronized { + unrollMemoryMap.remove(Thread.currentThread().getId) + } + + /** + * Increase reserved free memory for this thread. This only happen when there is not enough + * space for unrolling new block and need to drop old block for more space. Once the selected + * blocks of this thread is dropped from memory, the freed memory should be reserved by this + * thread to avoid being used by other threads. reservedFreeMemory should also be refreshed + * after finished unrolling or tryToPut + */ + def increaseReservedFreeMemoryForThisThread(memory: Long): Unit = { accountingLock.synchronized { - pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory + val threadId = Thread.currentThread().getId + reservedFreeMemoryMap(threadId) = reservedFreeMemoryMap.getOrElse(threadId, 0L) + memory } } /** - * Release pending unroll memory of current unroll successful block used by this thread + * Return the amount of free memory reserved by all threads + */ + def currentReservedFreeMemory: Long = accountingLock.synchronized { + reservedFreeMemoryMap.values.sum + } + + /** + * clean the reservedFreeMemoryMap for this thread, each time after the unrolling process and + * tryToPutprocess, this method need to be called. + */ + def removeReservedFreeMemoryForThisThread() + : Unit = accountingLock.synchronized { + reservedFreeMemoryMap.remove(Thread.currentThread().getId) + } + + /** + * When a block can not unroll into memory, the memory size that used to store the computed + * part should maintained in iteratorUnrollMemoryMap. + */ + def reserveIteratorUnrollMemoryForThisThread(): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + val unrolledMem = unrollMemoryMap.getOrElse(threadId, 0L) + iteratorUnrollMemoryMap(threadId) = iteratorUnrollMemoryMap.getOrElse( + threadId, 0L) + unrolledMem + } + } + + /** + * Return the amount of memory currently totally used for part of blocks that can not put + * into the memory (will drop to disk or just drop from meory in future) across all threads. + */ + def currentIteratorUnrollMemory: Long = accountingLock.synchronized { + iteratorUnrollMemoryMap.values.sum + } + + /** + * After the block put into disk or dropped from memory, reservedIteratorUnrollMemoryMap should + * refresh the value for this thread. + */ + def removeIteratorUnrollMemoryForThisThread() + : Unit = accountingLock.synchronized { + iteratorUnrollMemoryMap.remove(Thread.currentThread().getId) + } + + /** + * Reserve additional memory for putting blocks for this thread. That means more blocks are + * waiting to put into memory, and before putting into memory, it will reserve some memory first. */ - def releasePendingUnrollMemoryForThisThread(): Unit = { + def increaseTryToPutMemoryForThisThread(memory: Long): Unit = { val threadId = Thread.currentThread().getId accountingLock.synchronized { - pendingUnrollMemoryMap.remove(threadId) + tryToPutMemoryMap(threadId) = tryToPutMemoryMap.getOrElse(threadId, 0L) + memory } } /** - * Return the amount of memory currently occupied for unrolling blocks across all threads. + * Release memory used by this thread for putting new blocks, which means new block has been + * finished putting into memory already. */ - def currentUnrollMemory: Long = accountingLock.synchronized { - unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum + def decreaseTryToPutMemoryForThisThread(memory: Long): Unit = { + val threadId = Thread.currentThread().getId + accountingLock.synchronized { + tryToPutMemoryMap(threadId) = tryToPutMemoryMap.getOrElse(threadId, memory) - memory + // If this thread claims no more unroll memory, release it completely + if (tryToPutMemoryMap(threadId) < 0) { + tryToPutMemoryMap.remove(threadId) + } + } } /** - * Return the amount of memory currently occupied for unrolling blocks by this thread. + * Return the amount of memory currently reserved for putting new blocks across all threads. + */ + def currentTryToPutMemory: Long = accountingLock.synchronized { + tryToPutMemoryMap.values.sum + } + + /** + * Clean all memory reserved for putting new blocks. */ - def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized { - unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L) + def removeTryToPutMemoryForThisThread(): Unit = { + tryToPutMemoryMap.remove(Thread.currentThread().getId) } /** @@ -589,3 +810,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) private[spark] case class ResultWithDroppedBlocks( success: Boolean, droppedBlocks: Seq[(BlockId, BlockStatus)]) + +private[spark] case class ResultBlocksIdMemory( + success: Boolean, + toDropBlocksId: Seq[BlockId]) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 545722b050ee..5bdb76b80889 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -999,36 +999,33 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store") } - test("reserve/release unroll memory") { + test("increase/decrease unroll memory") { store = makeBlockManager(12000) val memoryStore = store.memoryStore assert(memoryStore.currentUnrollMemory === 0) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) - - // Reserve - memoryStore.reserveUnrollMemoryForThisThread(100) - assert(memoryStore.currentUnrollMemoryForThisThread === 100) - memoryStore.reserveUnrollMemoryForThisThread(200) - assert(memoryStore.currentUnrollMemoryForThisThread === 300) - memoryStore.reserveUnrollMemoryForThisThread(500) - assert(memoryStore.currentUnrollMemoryForThisThread === 800) - memoryStore.reserveUnrollMemoryForThisThread(1000000) - assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted - // Release - memoryStore.releaseUnrollMemoryForThisThread(100) - assert(memoryStore.currentUnrollMemoryForThisThread === 700) - memoryStore.releaseUnrollMemoryForThisThread(100) - assert(memoryStore.currentUnrollMemoryForThisThread === 600) - // Reserve again - memoryStore.reserveUnrollMemoryForThisThread(4400) - assert(memoryStore.currentUnrollMemoryForThisThread === 5000) - memoryStore.reserveUnrollMemoryForThisThread(20000) - assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted - // Release again - memoryStore.releaseUnrollMemoryForThisThread(1000) - assert(memoryStore.currentUnrollMemoryForThisThread === 4000) - memoryStore.releaseUnrollMemoryForThisThread() // release all - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + + // Increase + memoryStore.increaseUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemory === 100) + memoryStore.increaseUnrollMemoryForThisThread(200) + assert(memoryStore.currentUnrollMemory === 300) + memoryStore.increaseUnrollMemoryForThisThread(500) + assert(memoryStore.currentUnrollMemory === 800) + // Decrease + memoryStore.decreaseUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemory === 700) + memoryStore.decreaseUnrollMemoryForThisThread(100) + assert(memoryStore.currentUnrollMemory === 600) + memoryStore.decreaseUnrollMemoryForThisThread(700) + assert(memoryStore.currentUnrollMemory === 0) // decrease too much + // Increase again + memoryStore.increaseUnrollMemoryForThisThread(4400) + assert(memoryStore.currentUnrollMemory === 4400) + // Decrease again + memoryStore.decreaseUnrollMemoryForThisThread(1000) + assert(memoryStore.currentUnrollMemory === 3400) + memoryStore.removeUnrollMemoryForThisThread() // release all + assert(memoryStore.currentUnrollMemory === 0) } /** @@ -1056,37 +1053,47 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach test("safely unroll blocks") { store = makeBlockManager(12000) val smallList = List.fill(40)(new Array[Byte](100)) - val bigList = List.fill(40)(new Array[Byte](1000)) + val midList = List.fill(60)(new Array[Byte](100)) + val bigList = List.fill(400)(new Array[Byte](100)) val memoryStore = store.memoryStore val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) // Unroll with all the space in the world. This should succeed and return an array. var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) - memoryStore.releasePendingUnrollMemoryForThisThread() + assert(memoryStore.currentUnrollMemory > 0) + memoryStore.removeUnrollMemoryForThisThread() + assert(memoryStore.currentUnrollMemory === 0) // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) + unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) assert(droppedBlocks.size === 1) assert(droppedBlocks.head._1 === TestBlockId("someBlock1")) droppedBlocks.clear() - memoryStore.releasePendingUnrollMemoryForThisThread() - - // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = - // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. - // In the mean time, however, we kicked out someBlock2 before giving up. - store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) + memoryStore.removeUnrollMemoryForThisThread() + memoryStore.remove("someBlock2") + assert(!memoryStore.contains("someblock2")) + + // memoryGrowthFactor is 1.5, put midList fist, will must drop someBlock3 when putting + // huge block. + store.putIterator("someBlock3", midList.iterator, StorageLevel.MEMORY_ONLY) + assert(memoryStore.contains("someBlock3")) + // Unroll huge block with not enough space. Even after ensuring free space by dropping old + // block "someBlock3", there is still not enough room to unroll this block. + // This returns an iterator. In the mean time, however, we kicked out "someBlock3" before + // giving up. unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks) verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) - assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + assert(memoryStore.currentUnrollMemory === 0) // we returned an iterator + // reserved memory maintained in IteratorUnrollMemoryMap + assert(memoryStore.currentIteratorUnrollMemory > 0) assert(droppedBlocks.size === 1) - assert(droppedBlocks.head._1 === TestBlockId("someBlock2")) + assert(droppedBlocks.head._1 === TestBlockId("someBlock3")) droppedBlocks.clear() } @@ -1095,10 +1102,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val memOnly = StorageLevel.MEMORY_ONLY val memoryStore = store.memoryStore val smallList = List.fill(40)(new Array[Byte](100)) - val bigList = List.fill(40)(new Array[Byte](1000)) + val midList = List.fill(60)(new Array[Byte](100)) + val bigList = List.fill(400)(new Array[Byte](100)) def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] + def midIterator: Iterator[Any] = midList.iterator.asInstanceOf[Iterator[Any]] def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) // Unroll with plenty of space. This should succeed and cache both blocks. val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true) @@ -1109,7 +1118,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(result2.size > 0) assert(result1.data.isLeft) // unroll did not drop this block to disk assert(result2.data.isLeft) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) // Re-put these two blocks so block manager knows about them too. Otherwise, block manager // would not know how to drop them from memory later. @@ -1125,19 +1134,23 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(!memoryStore.contains("b1")) assert(memoryStore.contains("b2")) assert(memoryStore.contains("b3")) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) + assert(memoryStore.currentIteratorUnrollMemory === 0) memoryStore.remove("b3") - store.putIterator("b3", smallIterator, memOnly) + store.putIterator("b3", midIterator, memOnly) - // Unroll huge block with not enough space. This should fail and kick out b2 in the process. + // Unroll huge block with not enough space. This should fail and kick out b2 and b3 in + // the process. val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true) assert(result4.size === 0) // unroll was unsuccessful assert(result4.data.isLeft) assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) - assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) - assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + assert(memoryStore.currentUnrollMemory === 0) // we returned an iterator + // iterator Unroll memory is not released because unrollSafely returned an iterator + assert(memoryStore.currentIteratorUnrollMemory > 0) } /** @@ -1149,10 +1162,12 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val memoryStore = store.memoryStore val diskStore = store.diskStore val smallList = List.fill(40)(new Array[Byte](100)) - val bigList = List.fill(40)(new Array[Byte](1000)) + val midList = List.fill(60)(new Array[Byte](100)) + val bigList = List.fill(400)(new Array[Byte](100)) def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] + def midIterator: Iterator[Any] = midList.iterator.asInstanceOf[Iterator[Any]] def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]] - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) store.putIterator("b1", smallIterator, memAndDisk) store.putIterator("b2", smallIterator, memAndDisk) @@ -1168,24 +1183,26 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(!diskStore.contains("b2")) assert(!diskStore.contains("b3")) memoryStore.remove("b3") - store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + store.putIterator("b3", midIterator, StorageLevel.MEMORY_ONLY) + assert(memoryStore.currentUnrollMemory === 0) // Unroll huge block with not enough space. This should fail and drop the new block to disk - // directly in addition to kicking out b2 in the process. Memory store should contain only - // b3, while disk store should contain b1, b2 and b4. + // directly in addition to kicking out b2 and b3 in the process. Memory store should contain + // nothing, while disk store should contain b1, b2 and b4. val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true) assert(result4.size > 0) assert(result4.data.isRight) // unroll returned bytes from disk assert(!memoryStore.contains("b1")) assert(!memoryStore.contains("b2")) - assert(memoryStore.contains("b3")) + assert(!memoryStore.contains("b3")) assert(!memoryStore.contains("b4")) assert(diskStore.contains("b1")) assert(diskStore.contains("b2")) assert(!diskStore.contains("b3")) assert(diskStore.contains("b4")) - assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator + assert(memoryStore.currentUnrollMemory === 0) // we returned an iterator + // iterator Unroll memory is not released because unrollSafely returned an iterator + assert(memoryStore.currentIteratorUnrollMemory > 0) } test("multiple unrolls by the same thread") { @@ -1194,32 +1211,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val memoryStore = store.memoryStore val smallList = List.fill(40)(new Array[Byte](100)) def smallIterator: Iterator[Any] = smallList.iterator.asInstanceOf[Iterator[Any]] - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) // All unroll memory used is released because unrollSafely returned an array memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true) - assert(memoryStore.currentUnrollMemoryForThisThread === 0) + assert(memoryStore.currentUnrollMemory === 0) - // Unroll memory is not released because unrollSafely returned an iterator - // that still depends on the underlying vector used in the process + // There will always be enouth space by dropping old blocks memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread - assert(unrollMemoryAfterB3 > 0) + val unrollMemoryAfterB3 = memoryStore.currentUnrollMemory + assert(unrollMemoryAfterB3 === 0) - // The unroll memory owned by this thread builds on top of its value after the previous unrolls memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread - assert(unrollMemoryAfterB4 > unrollMemoryAfterB3) + val unrollMemoryAfterB4 = memoryStore.currentUnrollMemory + assert(unrollMemoryAfterB4 === unrollMemoryAfterB3) - // ... but only to a certain extent (until we run out of free space to grant new unroll memory) memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread + val unrollMemoryAfterB5 = memoryStore.currentUnrollMemory memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread + val unrollMemoryAfterB6 = memoryStore.currentUnrollMemory memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true) - val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread + val unrollMemoryAfterB7 = memoryStore.currentUnrollMemory assert(unrollMemoryAfterB5 === unrollMemoryAfterB4) assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) diff --git a/docs/configuration.md b/docs/configuration.md index 7fe11475212b..6fd605bc29fb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -802,15 +802,6 @@ Apart from these, the following properties are also available, and may be useful mapping has high overhead for blocks close to or below the page size of the operating system. -
spark.storage.unrollFractionspark.storage.memoryFraction to use for unrolling blocks in memory.
- This is dynamically allocated by dropping existing blocks when there is not enough free
- storage space to unroll the new block in its entirety.
- spark.tachyonStore.baseDir