Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 45 additions & 7 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add after this line that:

// The memory reserved in this map is expected to be released when the task finishes.

// All accesses of this map are assumed to have manually synchronized on `accountingLock`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
// Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
// Pending unroll memory refers to the intermediate memory occupied by a thread
// after the unroll but before the actual putting of the block in the cache.
// This chunk of memory is expected to be released *as soon as* we finish
// caching the corresponding block as opposed to until after the task finishes.
// This is only used if a block is successfully unrolled in its entirety in
// memory (SPARK-4777).
private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()

/**
* The amount of space ensured for unrolling values in memory, shared across all cores.
Expand Down Expand Up @@ -283,12 +291,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}

} finally {
// If we return an array, the values returned do not depend on the underlying vector and
// we can immediately free up space for other threads. Otherwise, if we return an iterator,
// we release the memory claimed by this thread later on when the task finishes.
// If we return an array, the values returned will later be cached in `tryToPut`.
// In this case, we should release the memory after we cache the block there.
// Otherwise, if we return an iterator, we release the memory reserved here
// later when the task finishes.
if (keepUnrolling) {
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
releaseUnrollMemoryForThisThread(amountToRelease)
accountingLock.synchronized {
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
releaseUnrollMemoryForThisThread(amountToRelease)
reservePendingUnrollMemoryForThisThread(amountToRelease)
}
}
}
}
Expand Down Expand Up @@ -353,6 +365,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
// Release the unroll memory used because we no longer need the underlying Array
releasePendingUnrollMemoryForThisThread()
}
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}
Expand Down Expand Up @@ -381,7 +395,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}

// Take into account the amount of memory currently occupied by unrolling blocks
val actualFreeMemory = freeMemory - currentUnrollMemory
// and minus the pending unroll memory for that block on current thread.
val threadId = Thread.currentThread().getId
val actualFreeMemory = freeMemory - currentUnrollMemory +
pendingUnrollMemoryMap.getOrElse(threadId, 0L)

if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
Expand Down Expand Up @@ -468,11 +485,32 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

/**
* Reserve the unroll memory of current unroll successful block used by this thread
* until actually put the block into memory entry.
*/
def reservePendingUnrollMemoryForThisThread(memory: Long): Unit = {
val threadId = Thread.currentThread().getId
accountingLock.synchronized {
pendingUnrollMemoryMap(threadId) = pendingUnrollMemoryMap.getOrElse(threadId, 0L) + memory
}
}

/**
* Release pending unroll memory of current unroll successful block used by this thread
*/
def releasePendingUnrollMemoryForThisThread(): Unit = {
val threadId = Thread.currentThread().getId
accountingLock.synchronized {
pendingUnrollMemoryMap.remove(threadId)
}
}

/**
* Return the amount of memory currently occupied for unrolling blocks across all threads.
*/
def currentUnrollMemory: Long = accountingLock.synchronized {
unrollMemoryMap.values.sum
unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisThread === 0)
memoryStore.releasePendingUnrollMemoryForThisThread()

// Unroll with not enough space. This should succeed after kicking out someBlock1.
store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
Expand All @@ -1074,6 +1075,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach
assert(droppedBlocks.size === 1)
assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
droppedBlocks.clear()
memoryStore.releasePendingUnrollMemoryForThisThread()

// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
Expand Down