diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 4cc5bcb7f9ba..5774133fab4e 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -246,18 +246,18 @@ private[spark] class MemoryStore( val amountToRequest = size - unrollMemoryUsedByThisBlock keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest + unrollMemoryUsedByThisBlock = size } + } else if (size < unrollMemoryUsedByThisBlock) { + releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock - size) + unrollMemoryUsedByThisBlock = size } if (keepUnrolling) { val entry = entryBuilder.build() - // Synchronize so that transfer is atomic - memoryManager.synchronized { - releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) - val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) - assert(success, "transferring unroll memory to storage memory failed") - } + // In fact, unroll memory is also storage memory, it is unnecessary to + // release unroll memory really + releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock, false) entries.synchronized { entries.put(blockId, entry) @@ -565,7 +565,8 @@ private[spark] class MemoryStore( * Release memory used by this task for unrolling blocks. * If the amount is not specified, remove the current task's allocation altogether. */ - def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = { + def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue, + releaseMemoryReally: Boolean = true): Unit = { val taskAttemptId = currentTaskAttemptId() memoryManager.synchronized { val unrollMemoryMap = memoryMode match { @@ -576,7 +577,9 @@ private[spark] class MemoryStore( val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { unrollMemoryMap(taskAttemptId) -= memoryToRelease - memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode) + if (releaseMemoryReally) { + memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode) + } } if (unrollMemoryMap(taskAttemptId) == 0) { unrollMemoryMap.remove(taskAttemptId)