From 0cc0257769070843e05ec010220a78827001f519 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 28 Feb 2015 14:49:32 +0800 Subject: [PATCH 1/3] Fix a potential OOM issue when StorageLevel is MEMORY_AND_DISK_SER --- .../apache/spark/storage/BlockManager.scala | 9 ++++--- .../apache/spark/storage/MemoryStore.scala | 16 ++++++++++-- .../spark/storage/BlockManagerSuite.scala | 26 +++++++++++++++++++ 3 files changed, 45 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 86dbd89f0ffb8..07ef0395ef786 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -535,9 +535,10 @@ private[spark] class BlockManager( /* We'll store the bytes in memory if the block's storage level includes * "memory serialized", or if it should be cached as objects in memory * but we only requested its serialized bytes. */ - val copyForMemory = ByteBuffer.allocate(bytes.limit) - copyForMemory.put(bytes) - memoryStore.putBytes(blockId, copyForMemory, level) + memoryStore.putBytes(blockId, bytes.limit) { + val copyForMemory = ByteBuffer.allocate(bytes.limit) + copyForMemory.put(bytes) + } bytes.rewind() } if (!asBlockResult) { @@ -999,7 +1000,7 @@ private[spark] class BlockManager( */ def dropFromMemory( blockId: BlockId, - data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { + data: => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { logInfo(s"Dropping block $blockId from memory") val info = blockInfo.get(blockId).orNull diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 71305a46bf570..ef538da0a15cc 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -90,6 +90,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } } + /** + * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and + * put it into MemoryStore. Otherwise, the ByteBuffer won't be created. + */ + def putBytes(blockId: BlockId, size: Long)(_bytes: => ByteBuffer): PutResult = { + // Work on a duplicate - since the original input might be used elsewhere. + lazy val bytes = _bytes.duplicate().rewind().asInstanceOf[ByteBuffer] + val putAttempt = tryToPut(blockId, bytes, size, deserialized = false) + val data = if (putAttempt.success) Right(bytes.duplicate()) else null + PutResult(size, data, putAttempt.droppedBlocks) + } + override def putArray( blockId: BlockId, values: Array[Any], @@ -314,7 +326,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) */ private def tryToPut( blockId: BlockId, - value: Any, + value: => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = { @@ -345,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } else { // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. - val data = if (deserialized) { + lazy val data = if (deserialized) { Left(value.asInstanceOf[Array[Any]]) } else { Right(value.asInstanceOf[ByteBuffer].duplicate()) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ffe6f039145ea..a2d0bd6b25a02 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1221,4 +1221,30 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(unrollMemoryAfterB6 === unrollMemoryAfterB4) assert(unrollMemoryAfterB7 === unrollMemoryAfterB4) } + + test("lazily create a big ByteBuffer to avoid OOM if it cannot be put into MemoryStore") { + store = makeBlockManager(12000) + val memoryStore = store.memoryStore + val blockId = BlockId("rdd_3_10") + val result = memoryStore.putBytes(blockId, 13000) { + fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") + } + assert(result.size === 13000) + assert(result.data === null) + assert(result.droppedBlocks === Nil) + } + + test("put a small ByteBuffer to MemoryStore") { + store = makeBlockManager(12000) + val memoryStore = store.memoryStore + val blockId = BlockId("rdd_3_10") + var bytes: ByteBuffer = null + val result = memoryStore.putBytes(blockId, 10000) { + bytes = ByteBuffer.allocate(10000) + bytes + } + assert(result.size === 10000) + assert(result.data === Right(bytes)) + assert(result.droppedBlocks === Nil) + } } From 1100a5469e6ac16e3246748912b61e864c0dbe4a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Mon, 23 Mar 2015 10:45:43 +0800 Subject: [PATCH 2/3] Replace call-by-name with () => T --- .../apache/spark/storage/BlockManager.scala | 14 +++++-- .../apache/spark/storage/MemoryStore.scala | 37 ++++++++++++------- .../spark/storage/BlockManagerSuite.scala | 8 ++-- 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 07ef0395ef786..9d1ce678c3e74 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -535,10 +535,14 @@ private[spark] class BlockManager( /* We'll store the bytes in memory if the block's storage level includes * "memory serialized", or if it should be cached as objects in memory * but we only requested its serialized bytes. */ - memoryStore.putBytes(blockId, bytes.limit) { + memoryStore.putBytes(blockId, bytes.limit, () => { + // https://issues.apache.org/jira/browse/SPARK-6076 + // If the file size is bigger than the free memory, OOM will happen. So if we cannot + // put it into MemoryStore, copyForMemory should not be created. That's why this + // action is put into a `() => ByteBuffer` and created lazily. val copyForMemory = ByteBuffer.allocate(bytes.limit) copyForMemory.put(bytes) - } + }) bytes.rewind() } if (!asBlockResult) { @@ -996,11 +1000,13 @@ private[spark] class BlockManager( * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. * + * If `data` is not put on disk, it won't be created. + * * Return the block status if the given block has been updated, else None. */ def dropFromMemory( blockId: BlockId, - data: => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { + data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { logInfo(s"Dropping block $blockId from memory") val info = blockInfo.get(blockId).orNull @@ -1024,7 +1030,7 @@ private[spark] class BlockManager( // Drop to disk, if storage level requires if (level.useDisk && !diskStore.contains(blockId)) { logInfo(s"Writing block $blockId to disk") - data match { + data() match { case Left(elements) => diskStore.putArray(blockId, elements, level, returnValues = false) case Right(bytes) => diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index ef538da0a15cc..ad2425a6b3d34 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -85,7 +85,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val values = blockManager.dataDeserialize(blockId, bytes) putIterator(blockId, values, level, returnValues = true) } else { - val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) + val putAttempt = tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } @@ -93,12 +93,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) /** * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and * put it into MemoryStore. Otherwise, the ByteBuffer won't be created. + * + * The caller should guarantee that `size` is correct. */ - def putBytes(blockId: BlockId, size: Long)(_bytes: => ByteBuffer): PutResult = { + def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = { // Work on a duplicate - since the original input might be used elsewhere. - lazy val bytes = _bytes.duplicate().rewind().asInstanceOf[ByteBuffer] - val putAttempt = tryToPut(blockId, bytes, size, deserialized = false) - val data = if (putAttempt.success) Right(bytes.duplicate()) else null + lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] + val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false) + val data = + if (putAttempt.success) { + assert(bytes.limit == size) + Right(bytes.duplicate()) + } else { + null + } PutResult(size, data, putAttempt.droppedBlocks) } @@ -109,11 +117,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) returnValues: Boolean): PutResult = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true) + val putAttempt = tryToPut(blockId, () => values, sizeEstimate, deserialized = true) PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) + val putAttempt = tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } @@ -317,6 +325,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size * must also be passed by the caller. * + * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be + * created to avoid OOM since it may be a big ByteBuffer. + * * Synchronize on `accountingLock` to ensure that all the put requests and its associated block * dropping is done by only on thread at a time. Otherwise while one thread is dropping * blocks to free memory for one block, another thread may use up the freed space for @@ -326,7 +337,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) */ private def tryToPut( blockId: BlockId, - value: => Any, + value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = { @@ -345,7 +356,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) droppedBlocks ++= freeSpaceResult.droppedBlocks if (enoughFreeSpace) { - val entry = new MemoryEntry(value, size, deserialized) + val entry = new MemoryEntry(value(), size, deserialized) entries.synchronized { entries.put(blockId, entry) currentMemory += size @@ -358,11 +369,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) // Tell the block manager that we couldn't put it in memory so that it can drop it to // disk if the block allows disk storage. lazy val data = if (deserialized) { - Left(value.asInstanceOf[Array[Any]]) + Left(value().asInstanceOf[Array[Any]]) } else { - Right(value.asInstanceOf[ByteBuffer].duplicate()) + Right(value().asInstanceOf[ByteBuffer].duplicate()) } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) + val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } } @@ -428,7 +439,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) + val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a2d0bd6b25a02..ec3065be03772 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1226,9 +1226,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach store = makeBlockManager(12000) val memoryStore = store.memoryStore val blockId = BlockId("rdd_3_10") - val result = memoryStore.putBytes(blockId, 13000) { + val result = memoryStore.putBytes(blockId, 13000, () => { fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") - } + }) assert(result.size === 13000) assert(result.data === null) assert(result.droppedBlocks === Nil) @@ -1239,10 +1239,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach val memoryStore = store.memoryStore val blockId = BlockId("rdd_3_10") var bytes: ByteBuffer = null - val result = memoryStore.putBytes(blockId, 10000) { + val result = memoryStore.putBytes(blockId, 10000, () => { bytes = ByteBuffer.allocate(10000) bytes - } + }) assert(result.size === 10000) assert(result.data === Right(bytes)) assert(result.droppedBlocks === Nil) From 7d25545a78aee5f5833c88667e438bb8fcf891ab Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 25 Mar 2015 10:01:02 +0800 Subject: [PATCH 3/3] Add alias for tryToPut and dropFromMemory --- .../apache/spark/storage/BlockManager.scala | 12 ++++++++ .../apache/spark/storage/MemoryStore.scala | 28 ++++++++++++++++--- .../spark/storage/BlockManagerSuite.scala | 8 +++--- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9d1ce678c3e74..551d6a26f9765 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -996,6 +996,18 @@ private[spark] class BlockManager( putIterator(blockId, Iterator(value), level, tellMaster) } + /** + * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory + * store reaches its limit and needs to free up space. + * + * Return the block status if the given block has been updated, else None. + */ + def dropFromMemory( + blockId: BlockId, + data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { + dropFromMemory(blockId, () => data) + } + /** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index ad2425a6b3d34..ce57ac4d48e68 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -85,7 +85,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) val values = blockManager.dataDeserialize(blockId, bytes) putIterator(blockId, values, level, returnValues = true) } else { - val putAttempt = tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) + val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } @@ -117,11 +117,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) returnValues: Boolean): PutResult = { if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - val putAttempt = tryToPut(blockId, () => values, sizeEstimate, deserialized = true) + val putAttempt = tryToPut(blockId, values, sizeEstimate, deserialized = true) PutResult(sizeEstimate, Left(values.iterator), putAttempt.droppedBlocks) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - val putAttempt = tryToPut(blockId, () => bytes, bytes.limit, deserialized = false) + val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false) PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks) } } @@ -320,6 +320,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) blockId.asRDDId.map(_.rddId) } + /** + * Try to put in a set of values, if we can free up enough space. The value should either be + * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size + * must also be passed by the caller. + * + * Synchronize on `accountingLock` to ensure that all the put requests and its associated block + * dropping is done by only on thread at a time. Otherwise while one thread is dropping + * blocks to free memory for one block, another thread may use up the freed space for + * another block. + * + * Return whether put was successful, along with the blocks dropped in the process. + */ + private def tryToPut( + blockId: BlockId, + value: Any, + size: Long, + deserialized: Boolean): ResultWithDroppedBlocks = { + tryToPut(blockId, () => value, size, deserialized) + } + /** * Try to put in a set of values, if we can free up enough space. The value should either be * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size @@ -439,7 +459,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long) } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) + val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ec3065be03772..3ed3e12f48ec3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -170,8 +170,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master - store.dropFromMemory("a1", null) - store.dropFromMemory("a2", null) + store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer]) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") assert(master.getLocations("a1").size === 0, "master did not remove a1") @@ -413,8 +413,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfterEach t2.join() t3.join() - store.dropFromMemory("a1", null) - store.dropFromMemory("a2", null) + store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer]) store.waitForAsyncReregister() } }