@@ -63,14 +63,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
6363 // A mapping from thread ID to amount of memory reserved for unrolling part of a block (in
6464 // bytes), when the block is not able to fully put into memory, will return an iterator when
6565 // unrolling, but the memory still need to reserved before the block is dropping from memory.
66- // All accesses of this map are assumed to have manually synchronized on `accountingLoc
66+ // All accesses of this map are assumed to have manually synchronized on `accountingLock`
6767 private val iteratorUnrollMemoryMap = mutable.HashMap [Long , Long ]()
6868
6969 // A mapping from thread ID to amount of memory to be dropped for new blocks (in bytes).
7070 // All accesses of this map are assumed to have manually synchronized on `accountingLock`
7171 private val toDropMemoryMap = mutable.HashMap [Long , Long ]()
7272
73- // A mapping from thread ID to a blockId Set that to be dropped for new blocks (in bytes) .
73+ // A mapping from thread ID to a blockId Set that to be dropped for new blocks.
7474 // All accesses of this map are assumed to have manually synchronized on `accountingLock`
7575 private val toDropBlocksMap = mutable.HashMap [Long , HashSet [BlockId ]]()
7676
@@ -86,15 +86,15 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
8686 * includes the memory that are marked to be dropped.
8787 */
8888 def freeMemory : Long = maxMemory - (
89- currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory +
90- currentReservedUnrollMemory - currentToDropMemory)
89+ currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory +
90+ currentReservedUnrollMemory - currentToDropMemory)
9191 /**
9292 * Free memory that can used when new blocks are unrolling to the memory. The value includes
9393 * the memory that are marked to be dropped, but not include the memory for Unrolling.
9494 */
9595 def freeMemoryForUnroll : Long = maxMemory - (
96- currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory +
97- currentUnrollMemory - currentToDropMemory)
96+ currentMemory + currentTryToPutMemory + currentIteratorUnrollMemory +
97+ currentUnrollMemory - currentToDropMemory)
9898
9999 override def getSize (blockId : BlockId ): Long = {
100100 entries.synchronized {
@@ -209,7 +209,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
209209
210210 override def remove (blockId : BlockId ): Boolean = {
211211 entries.synchronized {
212- // Every time when removing blocks from memory, the infomation about blocks that to be
212+ // Every time when removing blocks from memory, the information about blocks that to be
213213 // dropped need to be refreshed.
214214 tobeDroppedBlocksSet.remove(blockId)
215215 delToDropBlocksMapForThisThread(blockId)
@@ -248,7 +248,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
248248 * checking whether the memory restrictions for unrolling blocks are still satisfied,
249249 * stopping immediately if not. This check is a safeguard against the scenario in which
250250 * there is not enough free memory to accommodate the entirety of a single block.
251- *
251+ *
252252 * When there is not enough memory for unrolling blocks, old blocks will be dropped from
253253 * memory. The dropping operation is in parallel to fully utilized the disk throughput
254254 * when there are multiple disks. And befor dropping, each thread will mark the old blocks
@@ -259,9 +259,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
259259 */
260260
261261 def unrollSafely (
262- blockId : BlockId ,
263- values : Iterator [Any ],
264- droppedBlocks : ArrayBuffer [(BlockId , BlockStatus )]): Either [Array [Any ], Iterator [Any ]] = {
262+ blockId : BlockId ,
263+ values : Iterator [Any ],
264+ droppedBlocks : ArrayBuffer [(BlockId , BlockStatus )])
265+ : Either [Array [Any ], Iterator [Any ]] = {
265266
266267 // Number of elements unrolled so far
267268 var elementsUnrolled = 0L
@@ -383,10 +384,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
383384 */
384385
385386 private def tryToPut (
386- blockId : BlockId ,
387- value : Any ,
388- size : Long ,
389- deserialized : Boolean ): ResultWithDroppedBlocks = {
387+ blockId : BlockId ,
388+ value : Any ,
389+ size : Long ,
390+ deserialized : Boolean ): ResultWithDroppedBlocks = {
390391
391392 var putSuccess = false
392393 var enoughFreeSpace = false
@@ -478,9 +479,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
478479 logInfo(s " Will not store $blockIdToAdd as it is larger than our memory limit " )
479480 ResultBlocksIdMemory (success = false , selectedBlocks.toSeq, selectedMemory)
480481 } else {
481- // This is synchronized to ensure that the set of entries is not changed
482- // (because of getValue or getBytes) while traversing the iterator, as that
483- // can lead to exceptions.
482+ // This is synchronized with two purpose, one is to ensure that the set of entries
483+ // is not changed (because of getValue or getBytes) while traversing the iterator,
484+ // as that can lead to exceptions. Two is to ensure that only one thread is traversing
485+ // the entry to select "to-be-dropped" blocks and update the map information, to avoid
486+ // same block is selected by multiple threads.
484487 entries.synchronized {
485488 if (memoryFree < size) {
486489 val rddToAdd = getRddId(blockIdToAdd)
@@ -703,7 +706,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
703706 * clean the reservedunrollMemoryMap for this thread, each time after the unrolling process,
704707 * this method need to be called.
705708 */
706- private [spark] def removeReservedUnrollMemoryForThisThread (): Unit = accountingLock.synchronized {
709+ private [spark] def removeReservedUnrollMemoryForThisThread ()
710+ : Unit = accountingLock.synchronized {
707711 reservedUnrollMemoryMap.remove(Thread .currentThread().getId)
708712 }
709713
@@ -731,7 +735,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
731735 * After the block dropped from memory, should clean the reservedUnrollMemoryMap, which will
732736 * free up memory for new blocks to unroll or to tryToPut.
733737 */
734- private [spark] def removeIteratorUnrollMemoryForThisThread (): Unit = accountingLock.synchronized {
738+ private [spark] def removeIteratorUnrollMemoryForThisThread ()
739+ : Unit = accountingLock.synchronized {
735740 iteratorUnrollMemoryMap.remove(Thread .currentThread().getId)
736741 }
737742
0 commit comments