Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private[spark] abstract class Task[T](
Utils.tryLogNonFatalError {
// Release memory used by this thread for unrolling blocks
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We ended up removing the concept of "pending unroll memory" in Spark 2.x so this line will be omitted from the PR that I'm going to open against master.

// Notify any tasks waiting for execution memory to be freed to wake up and try to
// acquire memory again. This makes impossible the scenario where a task sleeps forever
// because there are no other tasks left to notify it. Since this is safe to do but may
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
unrollMemoryMap(taskAttemptId) -= memoryToRelease
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
memoryManager.releaseUnrollMemory(memoryToRelease)
}
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
}
}
}
Expand All @@ -530,11 +530,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
pendingUnrollMemoryMap.remove(taskAttemptId)
}
memoryManager.releaseUnrollMemory(memoryToRelease)
}
if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
pendingUnrollMemoryMap.remove(taskAttemptId)
}
}
}
}
Expand Down