Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
93 commits
Select commit Hold shift + click to select a range
5d130e4
Add block reference counting class.
JoshRosen Jan 8, 2016
423faab
Make the ReferenceCounter generic, since it's not specific to storage…
JoshRosen Jan 8, 2016
1ee665f
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 8, 2016
76cfebd
Integrate reference counter into storage eviction code.
JoshRosen Jan 8, 2016
7265784
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 11, 2016
2fb8c89
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 14, 2016
7cad770
Fix BlockManagerReplicationSuite tests.
JoshRosen Jan 14, 2016
8ae88b0
Add unit test for pinCount > 0 preventing eviction.
JoshRosen Jan 14, 2016
c1a8d85
Minimal changes to release refs on task completion.
JoshRosen Jan 14, 2016
575a47b
Fix Scalastyle.
JoshRosen Jan 15, 2016
0ba8318
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 15, 2016
feb1172
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 19, 2016
90cf403
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 20, 2016
7f28910
Fix CachedTableSuite tests.
JoshRosen Jan 21, 2016
12ed084
Fix TaskResultGetterSuite.
JoshRosen Jan 21, 2016
43e50ed
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 25, 2016
1b18226
Terminology update: reference -> pin.
JoshRosen Jan 25, 2016
8d45da6
More terminology updates.
JoshRosen Jan 25, 2016
8a52f58
Fix flaky BlockManagerSuite test:
JoshRosen Jan 25, 2016
36253df
Update very last occurrences of old terminology.
JoshRosen Jan 25, 2016
77d8c5c
More test flakiness fixes
JoshRosen Jan 25, 2016
e37f003
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Jan 27, 2016
2cf8157
Detect leaked pins at end of tasks.
JoshRosen Jan 27, 2016
150c6e1
Add unpin calls in more places.
JoshRosen Jan 27, 2016
1adbdb9
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Feb 2, 2016
1828757
Disable leak detection in tests for now.
JoshRosen Feb 2, 2016
76fc9f5
More renaming.
JoshRosen Feb 2, 2016
2942b24
WIP.
JoshRosen Feb 3, 2016
62f6671
BlockInfoManager WIP.
JoshRosen Feb 11, 2016
47f3174
Implement lock downgrading
JoshRosen Feb 11, 2016
4591308
Implement remove()
JoshRosen Feb 11, 2016
77939c2
BlockInfoManagerSuite tests now pass.
JoshRosen Feb 11, 2016
a0c5bb3
Fix scalastyle.
JoshRosen Feb 11, 2016
d40e010
Add a bunch of comments.
JoshRosen Feb 11, 2016
3f29595
Add even more comments.
JoshRosen Feb 11, 2016
ef7d885
Update to reflect new semantics for get() of removed block.
JoshRosen Feb 11, 2016
e8d6ec8
Fixes to torrent broadcast block removal.
JoshRosen Feb 12, 2016
9c8d530
Roll back logging change.
JoshRosen Feb 12, 2016
f3fc298
Remove more printlns.
JoshRosen Feb 12, 2016
dd6358c
Add todos.
JoshRosen Feb 12, 2016
ec8cc24
Add missing ManagedBuffer.release() call.
JoshRosen Feb 13, 2016
6134989
Add a test for OneForOneStreamManager.connectionTerminated
JoshRosen Feb 13, 2016
c9726c2
Add tests covering new release() call.
JoshRosen Feb 13, 2016
c629f26
Add defensive check to guard against exiting while loop when info.rem…
JoshRosen Feb 13, 2016
fc19cfd
Merge branch 'add-missing-release-calls-in-network-layer' into pin-pages
JoshRosen Feb 13, 2016
0aa2392
Fix serialization problems in getMatchingBlockIds().
JoshRosen Feb 13, 2016
b273422
Remove bad retain.
JoshRosen Feb 13, 2016
7639e03
Logging improvements that were helpful when debugging tests.
JoshRosen Feb 13, 2016
27e98a3
Fix SparkContext leak in KryoSerializerDistributedSuite
JoshRosen Feb 13, 2016
b72cd7b
Fix block replication bugs.
JoshRosen Feb 15, 2016
5e23177
Fix locking in indirect task result code path.
JoshRosen Feb 15, 2016
f0b6d71
Add a missing release() in ReceivedBlockHandler.
JoshRosen Feb 15, 2016
e549f2f
Add another missing release in WriteAheadLogBasedBlockHandler
JoshRosen Feb 15, 2016
6d09400
Fix SQL test compilation.
JoshRosen Feb 15, 2016
717c476
Add missing lock release in CacheManager.
JoshRosen Feb 15, 2016
e07b62d
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Feb 16, 2016
0c08731
Revert change in network/common/src/test/java/org/apache/spark/networ…
JoshRosen Feb 16, 2016
55b5b19
Check preconditions in remove().
JoshRosen Feb 17, 2016
3a12480
Free locks in dropFromMemory().
JoshRosen Feb 17, 2016
25b09d7
Guard against MemoryStore removing the block first.
JoshRosen Feb 17, 2016
bcb8318
Fix bug in release of locks after network fetch of block data.
JoshRosen Feb 18, 2016
4e11d00
Rename methods.
JoshRosen Feb 18, 2016
ed44f45
Remove outdated block comment.
JoshRosen Feb 18, 2016
7c74591
Merge remote-tracking branch 'origin/master' into pin-pages
JoshRosen Feb 23, 2016
a401adc
Use try-finally in afterEach().
JoshRosen Feb 23, 2016
504986f
Try to clean up confusing release logic related to dropFromMemory().
JoshRosen Feb 23, 2016
66202f2
Push CompletionIterator logic into BlockResult.
JoshRosen Feb 23, 2016
4f620a4
Add scaladoc to BlockManagerManagedBuffer.
JoshRosen Feb 23, 2016
8547841
Document non-blocking tryLock in MemoryStore.
JoshRosen Feb 23, 2016
99c460c
Fix comment typo in BlockDataManager.
JoshRosen Feb 23, 2016
c94984e
Check invariants whenever BlockInfo is mutated.
JoshRosen Feb 23, 2016
ac2b73f
Extract magic writerTask values into constants.
JoshRosen Feb 23, 2016
39b1185
Numerous documentation updates in BlockInfoManager.
JoshRosen Feb 23, 2016
6502047
Add defensive notifyAll() to BlockInfoManager.clear().
JoshRosen Feb 23, 2016
1d903ff
Remove now-redundant info.removed checks in loop body.
JoshRosen Feb 23, 2016
24dbc3d
Clean up BlockInfoManager.entries() typos.
JoshRosen Feb 23, 2016
745c1f9
unlockAllLocksForTask => releaseAllLocksForTask
JoshRosen Feb 23, 2016
5cfbbdb
Address style nit in BlockManager.getMatchingBlockIds().
JoshRosen Feb 23, 2016
9427576
Address confusing "local lock" comment.
JoshRosen Feb 23, 2016
3d377b5
Remove unnecessary notifyAll() in downgradeLock().
JoshRosen Feb 23, 2016
07e0e37
Deduplicate code in TorrentBroadcast and check put() return values.
JoshRosen Feb 23, 2016
697eba2
Torrent broadcast pieces need to be stored in serialized form.
JoshRosen Feb 23, 2016
f5f089d
Don't acquire lock in dropFromMemory().
JoshRosen Feb 23, 2016
0b7281b
Simplify confusing getOrElseUpdate in lockNewBlockForWriting().
JoshRosen Feb 23, 2016
68b9e83
Roll back checking of blockWasSuccessfullyStored in replication code.
JoshRosen Feb 23, 2016
a5ef11b
Explain seemingly-unreachable error handling code.
JoshRosen Feb 23, 2016
b9d6e18
Require tasks to explicitly register themselves with the BlockManager.
JoshRosen Feb 24, 2016
5df7284
DeMorgan.
JoshRosen Feb 24, 2016
eab288c
Synchronize BlockInfoManager.registerTask()
JoshRosen Feb 24, 2016
06ebef5
Minor comment fixes.
JoshRosen Feb 24, 2016
0628a33
Check lockForReading outcome in downgradeLock()
JoshRosen Feb 24, 2016
b963178
More logTrace detail in lockNewBlockForWriting
JoshRosen Feb 24, 2016
9becde3
Move registration of task with BlockManager into Task.run()
JoshRosen Feb 24, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable

import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.CompletionIterator

/**
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
Expand All @@ -47,6 +48,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
existingMetrics.incBytesReadInternal(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]

new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsReadInternal(1)
Expand Down Expand Up @@ -156,7 +158,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
CompletionIterator[T, Iterator[T]](
arr.iterator.asInstanceOf[Iterator[T]],
blockManager.releaseLock(key))
case Right(it) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.Random

import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.io.ByteArrayChunkOutputStream

Expand Down Expand Up @@ -90,22 +90,29 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)

/**
* Divide the object into multiple blocks and put those blocks in the block manager.
*
* @param value the object to divide
* @return number of blocks this broadcast variable is divided into
*/
private def writeBlocks(value: T): Int = {
import StorageLevel._
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
tellMaster = false)
val blockManager = SparkEnv.get.blockManager
if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
blockManager.releaseLock(broadcastId)
} else {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
val pieceId = BroadcastBlockId(id, "piece" + i)
if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
blockManager.releaseLock(pieceId)
} else {
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
}
blocks.length
}
Expand All @@ -127,16 +134,18 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
SparkEnv.get.blockManager.putBytes(
pieceId,
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(
s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
block
}
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
// At this point we are guaranteed to hold a read lock, since we either got the block locally
// or stored the remotely-fetched block and automatically downgraded the write lock.
blocks(pid) = block
releaseLock(pieceId)
}
blocks
}
Expand Down Expand Up @@ -165,8 +174,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
private def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
val blockManager = SparkEnv.get.blockManager
blockManager.getLocal(broadcastId).map(_.data.next()) match {
case Some(x) =>
releaseLock(broadcastId)
x.asInstanceOf[T]

case None =>
Expand All @@ -179,13 +190,36 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
blocks, SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
SparkEnv.get.blockManager.putSingle(
broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
val storageLevel = StorageLevel.MEMORY_AND_DISK
if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
releaseLock(broadcastId)
} else {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
obj
}
}
}

/**
* If running in a task, register the given block's locks for release upon task completion.
* Otherwise, if not running in a task then immediately release the lock.
*/
private def releaseLock(blockId: BlockId): Unit = {
val blockManager = SparkEnv.get.blockManager
Option(TaskContext.get()) match {
case Some(taskContext) =>
taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId))
case None =>
// This should only happen on the driver, where broadcast variables may be accessed
// outside of running tasks (e.g. when computing rdd.partitions()). In order to allow
// broadcast variables to be garbage collected we need to free the reference here
// which is slightly unsafe but is technically okay because broadcast variables aren't
// stored off-heap.
blockManager.releaseLock(blockId)
}
}

}


Expand Down
18 changes: 17 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ private[spark] class Executor(
threwException = false
res
} finally {
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
Expand All @@ -227,6 +229,17 @@ private[spark] class Executor(
logError(errMsg)
}
}

if (releasedLocks.nonEmpty) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we enable this in tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't because of the limit() / take() issue.

throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()

Expand Down Expand Up @@ -266,8 +279,11 @@ private[spark] class Executor(
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
val putSucceeded = env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
if (putSucceeded) {
env.blockManager.releaseLock(blockId)
}
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ trait BlockDataManager {

/**
* Put the block locally, using the given storage level.
*
* Returns true if the block was stored and false if the put operation failed or the block
* already existed.
*/
def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Unit
def putBlockData(blockId: BlockId, data: ManagedBuffer, level: StorageLevel): Boolean

/**
* Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
*/
def releaseLock(blockId: BlockId): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ class NettyBlockRpcServer(
val level: StorageLevel =
serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
blockManager.putBlockData(BlockId(uploadBlock.blockId), data, level)
val blockId = BlockId(uploadBlock.blockId)
val putSucceeded = blockManager.putBlockData(blockId, data, level)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to do this in a bunch of places, perhaps consider the switched semantics that put does not retain the lock by default. Not sure if this is better, something to think about for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's revisit this in a followup. Swapping the default might be a good move since I think the current semantics are only really useful in the CacheManager.

if (putSucceeded) {
blockManager.releaseLock(blockId)
}
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[spark] abstract class Task[T](
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem): T = {
SparkEnv.get.blockManager.registerTask(taskAttemptId)
context = new TaskContextImpl(
stageId,
partitionId,
Expand Down
83 changes: 0 additions & 83 deletions core/src/main/scala/org/apache/spark/storage/BlockInfo.scala

This file was deleted.

Loading