From 2c20dcbcf499aee5d6fbbb80f4803b3cad37c17c Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sun, 17 Sep 2017 17:53:49 +0800 Subject: [PATCH 01/16] refactor memorystore --- .../spark/storage/memory/MemoryStore.scala | 205 +++++++++--------- 1 file changed, 100 insertions(+), 105 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 90e3af2d0ec7..33f8f9f064dc 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -160,27 +160,13 @@ private[spark] class MemoryStore( } } - /** - * Attempt to put the given block in memory store as values. - * - * It's possible that the iterator is too large to materialize and store in memory. To avoid - * OOM exceptions, this method will gradually unroll the iterator while periodically checking - * whether there is enough free memory. If the block is successfully materialized, then the - * temporary unroll memory used during the materialization is "transferred" to storage memory, - * so we won't acquire more memory than is actually needed to store the block. - * - * @return in case of success, the estimated size of the stored data. In case of failure, return - * an iterator containing the values of the block. The returned iterator will be backed - * by the combination of the partially-unrolled block and the remaining elements of the - * original input iterator. The caller must either fully consume this iterator or call - * `close()` on it in order to free the storage memory consumed by the partially-unrolled - * block. - */ - private[storage] def putIteratorAsValues[T]( + private def putIterator[T]( blockId: BlockId, values: Iterator[T], - classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { - + classTag: ClassTag[T], + memoryMode: MemoryMode, + storeValue: (T, Boolean) => Long, + createMemoryEntry: () => MemoryEntry[T]): (Boolean, Long) = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Number of elements unrolled so far @@ -190,19 +176,17 @@ private[spark] class MemoryStore( // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory - val memoryCheckPeriod = 16 + val memoryCheckPeriod = conf.getLong("spark.storage.memoryCheckPeriod", 16) // Memory currently reserved by this task for this particular unrolling operation var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size - val memoryGrowthFactor = 1.5 + val memoryGrowthFactor = conf.getDouble("spark.storage.memoryGrowthFactor", 1.5) // Keep track of unroll memory used by this particular block / putIterator() operation var unrollMemoryUsedByThisBlock = 0L - // Underlying vector for unrolling the block - var vector = new SizeTrackingVector[T]()(classTag) // Request enough memory to begin unrolling keepUnrolling = - reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP) + reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + @@ -211,16 +195,17 @@ private[spark] class MemoryStore( unrollMemoryUsedByThisBlock += initialMemoryThreshold } + var needEstimateSize = false // Unroll this block safely, checking whether we have exceeded our threshold periodically while (values.hasNext && keepUnrolling) { - vector += values.next() - if (elementsUnrolled % memoryCheckPeriod == 0) { + needEstimateSize = elementsUnrolled % memoryCheckPeriod == 0 + val currentSize = storeValue(values.next(), needEstimateSize) + if (needEstimateSize) { // If our vector's size has exceeded the threshold, request more memory - val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong keepUnrolling = - reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) + reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } @@ -232,17 +217,13 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = - new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) + val entry = createMemoryEntry() val size = entry.size def transferUnrollToStorage(amount: Long): Unit = { // Synchronize so that transfer is atomic memoryManager.synchronized { - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) + releaseUnrollMemoryForThisTask(memoryMode, amount) + val success = memoryManager.acquireStorageMemory(blockId, amount, memoryMode) assert(success, "transferring unroll memory to storage memory failed") } } @@ -251,7 +232,7 @@ private[spark] class MemoryStore( if (unrollMemoryUsedByThisBlock <= size) { val acquiredExtra = memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP) + blockId, size - unrollMemoryUsedByThisBlock, memoryMode) if (acquiredExtra) { transferUnrollToStorage(unrollMemoryUsedByThisBlock) } @@ -260,35 +241,84 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) + releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) transferUnrollToStorage(size) true } } + if (enoughStorageMemory) { entries.synchronized { entries.put(blockId, entry) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) - Right(size) + (true, size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") - Left(new PartiallyUnrolledIterator( - this, - MemoryMode.ON_HEAP, - unrollMemoryUsedByThisBlock, - unrolled = arrayValues.toIterator, - rest = Iterator.empty)) + (false, unrollMemoryUsedByThisBlock) + } + } else { + (false, unrollMemoryUsedByThisBlock) + } + } + + /** + * Attempt to put the given block in memory store as values. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @return in case of success, the estimated size of the stored data. In case of failure, return + * an iterator containing the values of the block. The returned iterator will be backed + * by the combination of the partially-unrolled block and the remaining elements of the + * original input iterator. The caller must either fully consume this iterator or call + * `close()` on it in order to free the storage memory consumed by the partially-unrolled + * block. + */ + private[storage] def putIteratorAsValues[T]( + blockId: BlockId, + values: Iterator[T], + classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { + + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[T]()(classTag) + + var currentSize = 0L + def storeValue(value: T, needEstimateSize: Boolean): Long = { + vector += value + if (needEstimateSize) { + currentSize = vector.estimateSize() } + + currentSize + } + + def createMemoryEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + val arrayValues = vector.toArray + vector = null + val entry = + new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) + entry + } + + val (storedSuccess, size) = + putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, createMemoryEntry) + + if (storedSuccess) { + Right(size) } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, vector.estimateSize()) Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, - unrollMemoryUsedByThisBlock, + size, unrolled = vector.iterator, rest = values)) } @@ -323,13 +353,8 @@ private[spark] class MemoryStore( case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ } - // Whether there is still enough memory for us to continue unrolling this block - var keepUnrolling = true // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold - // Keep track of unroll memory used by this particular block / putIterator() operation - var unrollMemoryUsedByThisBlock = 0L - // Underlying buffer for unrolling the block val redirectableStream = new RedirectableOutputStream val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + @@ -347,70 +372,40 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } - // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) - - if (!keepUnrolling) { - logWarning(s"Failed to reserve initial memory threshold of " + - s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.") - } else { - unrollMemoryUsedByThisBlock += initialMemoryThreshold - } - - def reserveAdditionalMemoryIfNecessary(): Unit = { - if (bbos.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) - if (keepUnrolling) { - unrollMemoryUsedByThisBlock += amountToRequest - } + var currentSize = 0L + def storeValue(value: T, needEstimateSize: Boolean): Long = { + serializationStream.writeObject(value)(classTag) + if (needEstimateSize) { + currentSize = bbos.size } - } - // Unroll this block safely, checking whether we have exceeded our threshold - while (values.hasNext && keepUnrolling) { - serializationStream.writeObject(values.next())(classTag) - reserveAdditionalMemoryIfNecessary() + currentSize } - // Make sure that we have enough memory to store the block. By this point, it is possible that - // the block's actual memory usage has exceeded the unroll memory by a small amount, so we - // perform one final call to attempt to allocate additional memory if necessary. - if (keepUnrolling) { - serializationStream.close() - reserveAdditionalMemoryIfNecessary() + def createMemoryEntry(): MemoryEntry[T] = { + val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) + entry } - if (keepUnrolling) { - val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) - // Synchronize so that transfer is atomic - memoryManager.synchronized { - releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) - val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) - assert(success, "transferring unroll memory to storage memory failed") - } - entries.synchronized { - entries.put(blockId, entry) - } - logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(entry.size), - Utils.bytesToString(maxMemory - blocksMemoryUsed))) - Right(entry.size) + val (storedSuccess, size) = + putIterator(blockId, values, classTag, MemoryMode.OFF_HEAP, storeValue, createMemoryEntry) + + if (storedSuccess) { + Right(size) } else { // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, bbos.size) - Left( - new PartiallySerializedBlock( - this, - serializerManager, - blockId, - serializationStream, - redirectableStream, - unrollMemoryUsedByThisBlock, - memoryMode, - bbos, - values, - classTag)) + Left(new PartiallySerializedBlock( + this, + serializerManager, + blockId, + serializationStream, + redirectableStream, + size, + memoryMode, + bbos, + values, + classTag)) } } From 92e1d51b18a810307a0b6d0cb761925a0429ead2 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 20 Sep 2017 07:45:17 +0800 Subject: [PATCH 02/16] fix bug and add some comments --- .../spark/storage/memory/MemoryStore.scala | 143 ++++++++++-------- 1 file changed, 82 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 2c081d924271..baafd745d4e3 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -161,13 +161,30 @@ private[spark] class MemoryStore( } } + /** + * Attempt to put the given block in memory store as values or bytes. + * + * It's possible that the iterator is too large to materialize and store in memory. To avoid + * OOM exceptions, this method will gradually unroll the iterator while periodically checking + * whether there is enough free memory. If the block is successfully materialized, then the + * temporary unroll memory used during the materialization is "transferred" to storage memory, + * so we won't acquire more memory than is actually needed to store the block. + * + * @param memoryMode The values saved mode. + * @param storeValue Store the record of values to the MemoryStore. + * @param estimateSize Get the memory size which used to unroll the block. The parameters + * determine whether we need precise size. + * @return if the block is stored successfully, return the stored data size. Else return the + * memory has used for unroll the block. + */ private def putIterator[T]( blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode, - storeValue: (T, Boolean) => Long, - createMemoryEntry: () => MemoryEntry[T]): (Boolean, Long) = { + storeValue: T => Unit, + estimateSize: Boolean => Long, + createMemoryEntry: () => MemoryEntry[T]): Either[Long, Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Number of elements unrolled so far @@ -196,12 +213,12 @@ private[spark] class MemoryStore( unrollMemoryUsedByThisBlock += initialMemoryThreshold } - var needEstimateSize = false // Unroll this block safely, checking whether we have exceeded our threshold periodically while (values.hasNext && keepUnrolling) { - needEstimateSize = elementsUnrolled % memoryCheckPeriod == 0 - val currentSize = storeValue(values.next(), needEstimateSize) - if (needEstimateSize) { + storeValue(values.next()) + if (elementsUnrolled % memoryCheckPeriod == 0) { + // we don't need the precise size. + val currentSize = estimateSize(false) // If our vector's size has exceeded the threshold, request more memory if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong @@ -218,8 +235,8 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - val entry = createMemoryEntry() - val size = entry.size + // get the precise size + val size = estimateSize(true) def transferUnrollToStorage(amount: Long): Unit = { // Synchronize so that transfer is atomic memoryManager.synchronized { @@ -250,18 +267,18 @@ private[spark] class MemoryStore( if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, entry) + entries.put(blockId, createMemoryEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) - (true, size) + Right(size) } else { assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, "released too much unroll memory") - (false, unrollMemoryUsedByThisBlock) + Left(unrollMemoryUsedByThisBlock) } } else { - (false, unrollMemoryUsedByThisBlock) + Left(unrollMemoryUsedByThisBlock) } } @@ -288,40 +305,45 @@ private[spark] class MemoryStore( // Underlying vector for unrolling the block var vector = new SizeTrackingVector[T]()(classTag) + var arrayValues: Array[T] = null + var preciseSize: Long = -1 - var currentSize = 0L - def storeValue(value: T, needEstimateSize: Boolean): Long = { + def storeValue(value: T): Unit = { vector += value - if (needEstimateSize) { - currentSize = vector.estimateSize() - } + } - currentSize + def estimateSize(precise: Boolean): Long = { + if (precise) { + // We only call need the precise size after all values unrolled. + arrayValues = vector.toArray + preciseSize = SizeEstimator.estimate(arrayValues) + vector = null + preciseSize + } else { + vector.estimateSize() + } } def createMemoryEntry(): MemoryEntry[T] = { // We successfully unrolled the entirety of this block - val arrayValues = vector.toArray - vector = null - val entry = - new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag) + assert(arrayValues != null, "arrayValue shouldn't be null!") + assert(preciseSize != -1, "preciseSize shouldn't be -1") + val entry = new DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) entry } - val (storedSuccess, size) = - putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, createMemoryEntry) - - if (storedSuccess) { - Right(size) - } else { - // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, vector.estimateSize()) - Left(new PartiallyUnrolledIterator( - this, - MemoryMode.ON_HEAP, - size, - unrolled = vector.iterator, - rest = values)) + putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, + estimateSize, createMemoryEntry) match { + case Right(unrolledSize) => Right(unrolledSize) + case Left(unrollMemoryUsedByThisBlock) => + // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, vector.estimateSize()) + Left(new PartiallyUnrolledIterator( + this, + MemoryMode.ON_HEAP, + unrollMemoryUsedByThisBlock, + unrolled = vector.iterator, + rest = values)) } } @@ -373,40 +395,39 @@ private[spark] class MemoryStore( ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } - var currentSize = 0L - def storeValue(value: T, needEstimateSize: Boolean): Long = { + def storeValue(value: T): Unit = { serializationStream.writeObject(value)(classTag) - if (needEstimateSize) { - currentSize = bbos.size - } + } - currentSize + def estimateSize(precise: Boolean): Long = { + // We don't need care about the value of precise, because the cost of obtaining precise + // values is very low. + bbos.size } def createMemoryEntry(): MemoryEntry[T] = { + serializationStream.close() val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) entry } - val (storedSuccess, size) = - putIterator(blockId, values, classTag, MemoryMode.OFF_HEAP, storeValue, createMemoryEntry) - - if (storedSuccess) { - Right(size) - } else { - // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, bbos.size) - Left(new PartiallySerializedBlock( - this, - serializerManager, - blockId, - serializationStream, - redirectableStream, - size, - memoryMode, - bbos, - values, - classTag)) + putIterator(blockId, values, classTag, MemoryMode.OFF_HEAP, + storeValue, estimateSize, createMemoryEntry) match { + case Right(unrolledSize) => Right(unrolledSize) + case Left(unrollMemoryUsedByThisBlock) => + // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, bbos.size) + Left(new PartiallySerializedBlock( + this, + serializerManager, + blockId, + serializationStream, + redirectableStream, + unrollMemoryUsedByThisBlock, + memoryMode, + bbos, + values, + classTag)) } } From 6e2e29be7ad9d4bf3aae2d55fb4bf93c3286009b Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 20 Sep 2017 08:28:35 +0800 Subject: [PATCH 03/16] better variable name --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index baafd745d4e3..3e0a3472a26e 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -334,7 +334,7 @@ private[spark] class MemoryStore( putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, estimateSize, createMemoryEntry) match { - case Right(unrolledSize) => Right(unrolledSize) + case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, vector.estimateSize()) @@ -413,7 +413,7 @@ private[spark] class MemoryStore( putIterator(blockId, values, classTag, MemoryMode.OFF_HEAP, storeValue, estimateSize, createMemoryEntry) match { - case Right(unrolledSize) => Right(unrolledSize) + case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // We ran out of space while unrolling the values for this block logUnrollFailureMessage(blockId, bbos.size) From d2b8ccd500f0076d281cc402b4a9633fb38562ed Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 20 Sep 2017 10:34:19 +0800 Subject: [PATCH 04/16] small fix --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 3e0a3472a26e..7cda2e960b24 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -411,8 +411,8 @@ private[spark] class MemoryStore( entry } - putIterator(blockId, values, classTag, MemoryMode.OFF_HEAP, - storeValue, estimateSize, createMemoryEntry) match { + putIterator(blockId, values, classTag, memoryMode, storeValue, + estimateSize, createMemoryEntry) match { case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // We ran out of space while unrolling the values for this block From 9ea8f49139d22229e88e98a458a4862f229e1a33 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 20 Sep 2017 14:23:27 +0800 Subject: [PATCH 05/16] fix unit test errors --- .../spark/storage/memory/MemoryStore.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 7cda2e960b24..2869e53d2612 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -337,12 +337,18 @@ private[spark] class MemoryStore( case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, vector.estimateSize()) + val (unrolledIterator, size) = if (vector != null) { + (vector.iterator, vector.estimateSize()) + } else { + (arrayValues.toIterator, preciseSize) + } + + logUnrollFailureMessage(blockId, size) Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, unrollMemoryUsedByThisBlock, - unrolled = vector.iterator, + unrolled = unrolledIterator, rest = values)) } } @@ -400,8 +406,9 @@ private[spark] class MemoryStore( } def estimateSize(precise: Boolean): Long = { - // We don't need care about the value of precise, because the cost of obtaining precise - // values is very low. + if (precise) { + serializationStream.flush() + } bbos.size } @@ -416,7 +423,7 @@ private[spark] class MemoryStore( case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, bbos.size) + logUnrollFailureMessage(blockId, unrollMemoryUsedByThisBlock) Left(new PartiallySerializedBlock( this, serializerManager, From d0fcf4f1772ca4ab920093442dab51bc5dd08c1c Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 22 Sep 2017 22:02:15 +0800 Subject: [PATCH 06/16] small gix --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 2869e53d2612..1ce0c045075f 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -407,13 +407,12 @@ private[spark] class MemoryStore( def estimateSize(precise: Boolean): Long = { if (precise) { - serializationStream.flush() + serializationStream.close() } bbos.size } def createMemoryEntry(): MemoryEntry[T] = { - serializationStream.close() val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) entry } From bc3ad4ea11e49b19ef4199642dbc4488f202d928 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 8 Nov 2017 09:37:01 +0800 Subject: [PATCH 07/16] address comments --- .../apache/spark/storage/memory/MemoryStore.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index b8fbe95638cc..e48df4ef4e51 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -170,10 +170,14 @@ private[spark] class MemoryStore( * temporary unroll memory used during the materialization is "transferred" to storage memory, * so we won't acquire more memory than is actually needed to store the block. * + * @param blockId The block id. + * @param values The values which need be stored. + * @param classTag the [[ClassTag]] for the block. * @param memoryMode The values saved mode. * @param storeValue Store the record of values to the MemoryStore. * @param estimateSize Get the memory size which used to unroll the block. The parameters * determine whether we need precise size. + * @param createMemoryEntry Using [[MemoryEntry]] to hold the stored values or bytes. * @return if the block is stored successfully, return the stored data size. Else return the * memory has used for unroll the block. */ @@ -317,7 +321,6 @@ private[spark] class MemoryStore( // We only call need the precise size after all values unrolled. arrayValues = vector.toArray preciseSize = SizeEstimator.estimate(arrayValues) - vector = null preciseSize } else { vector.estimateSize() @@ -326,10 +329,7 @@ private[spark] class MemoryStore( def createMemoryEntry(): MemoryEntry[T] = { // We successfully unrolled the entirety of this block - assert(arrayValues != null, "arrayValue shouldn't be null!") - assert(preciseSize != -1, "preciseSize shouldn't be -1") - val entry = new DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) - entry + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) } putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, @@ -413,8 +413,7 @@ private[spark] class MemoryStore( } def createMemoryEntry(): MemoryEntry[T] = { - val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) - entry + SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) } putIterator(blockId, values, classTag, memoryMode, storeValue, From a2b951358e88bbbc0335a7eabda19e36d6586904 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sun, 21 Jan 2018 15:22:48 +0800 Subject: [PATCH 08/16] address comments --- .../spark/storage/memory/MemoryStore.scala | 167 +++++++++--------- 1 file changed, 81 insertions(+), 86 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index e48df4ef4e51..3e26c67f9640 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -56,6 +56,12 @@ private case class SerializedMemoryEntry[T]( def size: Long = buffer.size } +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def esitimatedSize(roughly: Boolean): Long + def buildEntry(): MemoryEntry[T] +} + private[storage] trait BlockEvictionHandler { /** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory @@ -174,10 +180,8 @@ private[spark] class MemoryStore( * @param values The values which need be stored. * @param classTag the [[ClassTag]] for the block. * @param memoryMode The values saved mode. - * @param storeValue Store the record of values to the MemoryStore. - * @param estimateSize Get the memory size which used to unroll the block. The parameters - * determine whether we need precise size. - * @param createMemoryEntry Using [[MemoryEntry]] to hold the stored values or bytes. + * @param valuesHolder A holder that supports storing record of values into memory store as + * values of bytes. * @return if the block is stored successfully, return the stored data size. Else return the * memory has used for unroll the block. */ @@ -186,9 +190,7 @@ private[spark] class MemoryStore( values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode, - storeValue: T => Unit, - estimateSize: Boolean => Long, - createMemoryEntry: () => MemoryEntry[T]): Either[Long, Long] = { + valuesHolder: ValuesHolder[T]): Either[Long, Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") // Number of elements unrolled so far @@ -219,10 +221,10 @@ private[spark] class MemoryStore( // Unroll this block safely, checking whether we have exceeded our threshold periodically while (values.hasNext && keepUnrolling) { - storeValue(values.next()) + valuesHolder.storeValue(values.next()) if (elementsUnrolled % memoryCheckPeriod == 0) { - // we don't need the precise size. - val currentSize = estimateSize(false) + // For performance reason, just get the rough value + val currentSize = valuesHolder.esitimatedSize(true) // If our vector's size has exceeded the threshold, request more memory if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong @@ -239,8 +241,8 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - // get the precise size - val size = estimateSize(true) + // We need more precise value + val size = valuesHolder.esitimatedSize(false) def transferUnrollToStorage(amount: Long): Unit = { // Synchronize so that transfer is atomic memoryManager.synchronized { @@ -271,7 +273,7 @@ private[spark] class MemoryStore( if (enoughStorageMemory) { entries.synchronized { - entries.put(blockId, createMemoryEntry()) + entries.put(blockId, valuesHolder.buildEntry()) } logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) @@ -289,12 +291,6 @@ private[spark] class MemoryStore( /** * Attempt to put the given block in memory store as values. * - * It's possible that the iterator is too large to materialize and store in memory. To avoid - * OOM exceptions, this method will gradually unroll the iterator while periodically checking - * whether there is enough free memory. If the block is successfully materialized, then the - * temporary unroll memory used during the materialization is "transferred" to storage memory, - * so we won't acquire more memory than is actually needed to store the block. - * * @return in case of success, the estimated size of the stored data. In case of failure, return * an iterator containing the values of the block. The returned iterator will be backed * by the combination of the partially-unrolled block and the remaining elements of the @@ -307,40 +303,43 @@ private[spark] class MemoryStore( values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { - // Underlying vector for unrolling the block - var vector = new SizeTrackingVector[T]()(classTag) - var arrayValues: Array[T] = null - var preciseSize: Long = -1 + val valuesHolder = new ValuesHolder[T] { + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[T]()(classTag) + var arrayValues: Array[T] = null + var preciseSize: Long = -1 - def storeValue(value: T): Unit = { - vector += value - } + override def storeValue(value: T): Unit = { + vector += value + } - def estimateSize(precise: Boolean): Long = { - if (precise) { - // We only call need the precise size after all values unrolled. - arrayValues = vector.toArray - preciseSize = SizeEstimator.estimate(arrayValues) - preciseSize - } else { - vector.estimateSize() + override def esitimatedSize(roughly: Boolean): Long = { + if (!roughly) { + // We only need the more precise size after all values unrolled. + arrayValues = vector.toArray + vector = null + preciseSize = SizeEstimator.estimate(arrayValues) + preciseSize + } else { + // For performance, rough estimate + vector.estimateSize() + } } - } - def createMemoryEntry(): MemoryEntry[T] = { - // We successfully unrolled the entirety of this block - DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) + override def buildEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) + } } - putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue, - estimateSize, createMemoryEntry) match { + putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match { case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // We ran out of space while unrolling the values for this block - val (unrolledIterator, size) = if (vector != null) { - (vector.iterator, vector.estimateSize()) + val (unrolledIterator, size) = if (valuesHolder.vector != null) { + (valuesHolder.vector.iterator, valuesHolder.vector.estimateSize()) } else { - (arrayValues.toIterator, preciseSize) + (valuesHolder.arrayValues.toIterator, valuesHolder.preciseSize) } logUnrollFailureMessage(blockId, size) @@ -356,12 +355,6 @@ private[spark] class MemoryStore( /** * Attempt to put the given block in memory store as bytes. * - * It's possible that the iterator is too large to materialize and store in memory. To avoid - * OOM exceptions, this method will gradually unroll the iterator while periodically checking - * whether there is enough free memory. If the block is successfully materialized, then the - * temporary unroll memory used during the materialization is "transferred" to storage memory, - * so we won't acquire more memory than is actually needed to store the block. - * * @return in case of success, the estimated size of the stored data. In case of failure, * return a handle which allows the caller to either finish the serialization by * spilling to disk or to deserialize the partially-serialized block and reconstruct @@ -377,47 +370,49 @@ private[spark] class MemoryStore( require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - val allocator = memoryMode match { - case MemoryMode.ON_HEAP => ByteBuffer.allocate _ - case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ - } + val valuesHolder = new ValuesHolder[T] { + val allocator = memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } - // Initial per-task memory to request for unrolling blocks (bytes). - val initialMemoryThreshold = unrollMemoryThreshold - val redirectableStream = new RedirectableOutputStream - val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { - logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + - s"is too large to be set as chunk size. Chunk size has been capped to " + - s"${Utils.bytesToString(Int.MaxValue)}") - Int.MaxValue - } else { - initialMemoryThreshold.toInt - } - val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) - redirectableStream.setOutputStream(bbos) - val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) - } + // Initial per-task memory to request for unrolling blocks (bytes). + val initialMemoryThreshold = unrollMemoryThreshold + val redirectableStream = new RedirectableOutputStream + val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { + logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + + s"is too large to be set as chunk size. Chunk size has been capped to " + + s"${Utils.bytesToString(Int.MaxValue)}") + Int.MaxValue + } else { + initialMemoryThreshold.toInt + } - def storeValue(value: T): Unit = { - serializationStream.writeObject(value)(classTag) - } + val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) + redirectableStream.setOutputStream(bbos) + val serializationStream: SerializationStream = { + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() + ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) + } - def estimateSize(precise: Boolean): Long = { - if (precise) { - serializationStream.close() + override def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) } - bbos.size - } - def createMemoryEntry(): MemoryEntry[T] = { - SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) + override def esitimatedSize(roughly: Boolean): Long = { + if (!roughly) { + serializationStream.close() + } + bbos.size + } + + override def buildEntry(): MemoryEntry[T] = { + SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) + } } - putIterator(blockId, values, classTag, memoryMode, storeValue, - estimateSize, createMemoryEntry) match { + putIterator(blockId, values, classTag, memoryMode, valuesHolder) match { case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // We ran out of space while unrolling the values for this block @@ -426,11 +421,11 @@ private[spark] class MemoryStore( this, serializerManager, blockId, - serializationStream, - redirectableStream, + valuesHolder.serializationStream, + valuesHolder.redirectableStream, unrollMemoryUsedByThisBlock, memoryMode, - bbos, + valuesHolder.bbos, values, classTag)) } From 9d7c52d53d98b971ad1ad05c828855f7298b6058 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Sun, 21 Jan 2018 16:41:05 +0800 Subject: [PATCH 09/16] fix compile errors --- .../spark/storage/memory/MemoryStore.scala | 165 ++++++++++-------- 1 file changed, 91 insertions(+), 74 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 3e26c67f9640..fa951e39b984 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -56,12 +56,6 @@ private case class SerializedMemoryEntry[T]( def size: Long = buffer.size } -private trait ValuesHolder[T] { - def storeValue(value: T): Unit - def esitimatedSize(roughly: Boolean): Long - def buildEntry(): MemoryEntry[T] -} - private[storage] trait BlockEvictionHandler { /** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory @@ -303,34 +297,7 @@ private[spark] class MemoryStore( values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = { - val valuesHolder = new ValuesHolder[T] { - // Underlying vector for unrolling the block - var vector = new SizeTrackingVector[T]()(classTag) - var arrayValues: Array[T] = null - var preciseSize: Long = -1 - - override def storeValue(value: T): Unit = { - vector += value - } - - override def esitimatedSize(roughly: Boolean): Long = { - if (!roughly) { - // We only need the more precise size after all values unrolled. - arrayValues = vector.toArray - vector = null - preciseSize = SizeEstimator.estimate(arrayValues) - preciseSize - } else { - // For performance, rough estimate - vector.estimateSize() - } - } - - override def buildEntry(): MemoryEntry[T] = { - // We successfully unrolled the entirety of this block - DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) - } - } + val valuesHolder = new DeserializedValuesHolder[T](classTag) putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match { case Right(storedSize) => Right(storedSize) @@ -370,48 +337,21 @@ private[spark] class MemoryStore( require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - val valuesHolder = new ValuesHolder[T] { - val allocator = memoryMode match { - case MemoryMode.ON_HEAP => ByteBuffer.allocate _ - case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ - } - - // Initial per-task memory to request for unrolling blocks (bytes). - val initialMemoryThreshold = unrollMemoryThreshold - val redirectableStream = new RedirectableOutputStream - val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { - logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + - s"is too large to be set as chunk size. Chunk size has been capped to " + - s"${Utils.bytesToString(Int.MaxValue)}") - Int.MaxValue - } else { - initialMemoryThreshold.toInt - } - - val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) - redirectableStream.setOutputStream(bbos) - val serializationStream: SerializationStream = { - val autoPick = !blockId.isInstanceOf[StreamBlockId] - val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() - ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) - } - - override def storeValue(value: T): Unit = { - serializationStream.writeObject(value)(classTag) - } - - override def esitimatedSize(roughly: Boolean): Long = { - if (!roughly) { - serializationStream.close() - } - bbos.size - } - - override def buildEntry(): MemoryEntry[T] = { - SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) - } + // Initial per-task memory to request for unrolling blocks (bytes). + val initialMemoryThreshold = unrollMemoryThreshold + val redirectableStream = new RedirectableOutputStream + val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { + logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + + s"is too large to be set as chunk size. Chunk size has been capped to " + + s"${Utils.bytesToString(Int.MaxValue)}") + Int.MaxValue + } else { + initialMemoryThreshold.toInt } + val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag, + memoryMode, serializerManager) + putIterator(blockId, values, classTag, memoryMode, valuesHolder) match { case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => @@ -703,6 +643,83 @@ private[spark] class MemoryStore( } } +private trait ValuesHolder[T] { + def storeValue(value: T): Unit + def esitimatedSize(roughly: Boolean): Long + def buildEntry(): MemoryEntry[T] +} + +/** + * A holder for storing the deserialized values. + */ +private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] { + // Underlying vector for unrolling the block + var vector = new SizeTrackingVector[T]()(classTag) + var arrayValues: Array[T] = null + var preciseSize: Long = -1 + + override def storeValue(value: T): Unit = { + vector += value + } + + override def esitimatedSize(roughly: Boolean): Long = { + if (!roughly) { + // We only need the more precise size after all values unrolled. + arrayValues = vector.toArray + vector = null + preciseSize = SizeEstimator.estimate(arrayValues) + preciseSize + } else { + // For performance, rough estimate + vector.estimateSize() + } + } + + override def buildEntry(): MemoryEntry[T] = { + // We successfully unrolled the entirety of this block + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) + } +} + +/** + * A holder for storing the serialized values. + */ +private class SerializedValuesHolder[T]( + blockId: BlockId, + chunkSize: Int, + classTag: ClassTag[T], + memoryMode: MemoryMode, + serializerManager: SerializerManager) extends ValuesHolder[T] { + val allocator = memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + + val redirectableStream = new RedirectableOutputStream + val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator) + redirectableStream.setOutputStream(bbos) + val serializationStream: SerializationStream = { + val autoPick = !blockId.isInstanceOf[StreamBlockId] + val ser = serializerManager.getSerializer(classTag, autoPick).newInstance() + ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) + } + + override def storeValue(value: T): Unit = { + serializationStream.writeObject(value)(classTag) + } + + override def esitimatedSize(roughly: Boolean): Long = { + if (!roughly) { + serializationStream.close() + } + bbos.size + } + + override def buildEntry(): MemoryEntry[T] = { + SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) + } +} + /** * The result of a failed [[MemoryStore.putIteratorAsValues()]] call. * From c4424943f5b74f8d1c191228cd8055d5482e7658 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Mon, 22 Jan 2018 10:49:01 +0800 Subject: [PATCH 10/16] address comments --- .../scala/org/apache/spark/storage/memory/MemoryStore.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index fa951e39b984..fdb2215c2744 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -175,7 +175,7 @@ private[spark] class MemoryStore( * @param classTag the [[ClassTag]] for the block. * @param memoryMode The values saved mode. * @param valuesHolder A holder that supports storing record of values into memory store as - * values of bytes. + * values or bytes. * @return if the block is stored successfully, return the stored data size. Else return the * memory has used for unroll the block. */ @@ -356,7 +356,7 @@ private[spark] class MemoryStore( case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, unrollMemoryUsedByThisBlock) + logUnrollFailureMessage(blockId, valuesHolder.bbos.size) Left(new PartiallySerializedBlock( this, serializerManager, From f392217e77f3b7f29f3f0728c29a2e041adb0c0e Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Wed, 24 Jan 2018 14:01:48 +0800 Subject: [PATCH 11/16] address comments --- .../spark/storage/memory/MemoryStore.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index fdb2215c2744..00f8885e895a 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -173,11 +173,14 @@ private[spark] class MemoryStore( * @param blockId The block id. * @param values The values which need be stored. * @param classTag the [[ClassTag]] for the block. - * @param memoryMode The values saved mode. + * @param memoryMode The values saved memory mode(ON_HEAP or OFF_HEAP). * @param valuesHolder A holder that supports storing record of values into memory store as * values or bytes. * @return if the block is stored successfully, return the stored data size. Else return the - * memory has used for unroll the block. + * memory has reserved for unrolling the block (There are two reasons for store failed: + * First, the block is partially-unrolled; second, the block is entirely unrolled and + * the actual stored data size is larger than reserved, but we can't request extra + * memory). */ private def putIterator[T]( blockId: BlockId, @@ -218,7 +221,7 @@ private[spark] class MemoryStore( valuesHolder.storeValue(values.next()) if (elementsUnrolled % memoryCheckPeriod == 0) { // For performance reason, just get the rough value - val currentSize = valuesHolder.esitimatedSize(true) + val currentSize = valuesHolder.estimatedSize(true) // If our vector's size has exceeded the threshold, request more memory if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong @@ -236,7 +239,7 @@ private[spark] class MemoryStore( if (keepUnrolling) { // We need more precise value - val size = valuesHolder.esitimatedSize(false) + val size = valuesHolder.estimatedSize(false) def transferUnrollToStorage(amount: Long): Unit = { // Synchronize so that transfer is atomic memoryManager.synchronized { @@ -339,7 +342,6 @@ private[spark] class MemoryStore( // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold - val redirectableStream = new RedirectableOutputStream val chunkSize = if (initialMemoryThreshold > Int.MaxValue) { logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " + s"is too large to be set as chunk size. Chunk size has been capped to " + @@ -645,7 +647,7 @@ private[spark] class MemoryStore( private trait ValuesHolder[T] { def storeValue(value: T): Unit - def esitimatedSize(roughly: Boolean): Long + def estimatedSize(roughly: Boolean): Long def buildEntry(): MemoryEntry[T] } @@ -662,7 +664,7 @@ private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends Values vector += value } - override def esitimatedSize(roughly: Boolean): Long = { + override def estimatedSize(roughly: Boolean): Long = { if (!roughly) { // We only need the more precise size after all values unrolled. arrayValues = vector.toArray @@ -708,7 +710,7 @@ private class SerializedValuesHolder[T]( serializationStream.writeObject(value)(classTag) } - override def esitimatedSize(roughly: Boolean): Long = { + override def estimatedSize(roughly: Boolean): Long = { if (!roughly) { serializationStream.close() } From ded080d364faf8395f33f2bb7a4eb2d5332f570f Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 25 Jan 2018 11:24:23 +0800 Subject: [PATCH 12/16] address comments --- .../spark/storage/memory/MemoryStore.scala | 137 +++++++++--------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 00f8885e895a..cbd2eff0d490 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -220,8 +220,7 @@ private[spark] class MemoryStore( while (values.hasNext && keepUnrolling) { valuesHolder.storeValue(values.next()) if (elementsUnrolled % memoryCheckPeriod == 0) { - // For performance reason, just get the rough value - val currentSize = valuesHolder.estimatedSize(true) + val currentSize = valuesHolder.estimatedSize() // If our vector's size has exceeded the threshold, request more memory if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong @@ -237,51 +236,53 @@ private[spark] class MemoryStore( elementsUnrolled += 1 } + val valuesBuilder = if (keepUnrolling) { + Some(valuesHolder.getBuilder()) + } else { + None + } + + // Make sure that we have enough memory to store the block. By this point, it is possible that + // the block's actual memory usage has exceeded the unroll memory by a small amount, so we + // perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - // We need more precise value - val size = valuesHolder.estimatedSize(false) - def transferUnrollToStorage(amount: Long): Unit = { - // Synchronize so that transfer is atomic - memoryManager.synchronized { - releaseUnrollMemoryForThisTask(memoryMode, amount) - val success = memoryManager.acquireStorageMemory(blockId, amount, memoryMode) - assert(success, "transferring unroll memory to storage memory failed") + val size = valuesBuilder.get.preciseSize + if (size > unrollMemoryUsedByThisBlock) { + val amountToRequest = size - unrollMemoryUsedByThisBlock + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) + if (keepUnrolling) { + unrollMemoryUsedByThisBlock += amountToRequest } } - // Acquire storage memory if necessary to store this block in memory. - val enoughStorageMemory = { - if (unrollMemoryUsedByThisBlock <= size) { - val acquiredExtra = - memoryManager.acquireStorageMemory( - blockId, size - unrollMemoryUsedByThisBlock, memoryMode) - if (acquiredExtra) { - transferUnrollToStorage(unrollMemoryUsedByThisBlock) - } - acquiredExtra - } else { // unrollMemoryUsedByThisBlock > size - // If this task attempt already owns more unroll memory than is necessary to store the - // block, then release the extra memory that will not be used. - val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory) - transferUnrollToStorage(size) - true - } + } + + if (keepUnrolling) { + val entry = valuesBuilder.get.build() + // Synchronize so that transfer is atomic + memoryManager.synchronized { + releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) + val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) + assert(success, "transferring unroll memory to storage memory failed") } - if (enoughStorageMemory) { - entries.synchronized { - entries.put(blockId, valuesHolder.buildEntry()) - } - logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format( - blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) - Right(size) - } else { - assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock, - "released too much unroll memory") - Left(unrollMemoryUsedByThisBlock) + entries.synchronized { + entries.put(blockId, entry) } + + logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId, + Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) + Right(entry.size) } else { + // We ran out of space while unrolling the values for this block + val actualSize = if (valuesBuilder.isEmpty) { + valuesHolder.estimatedSize() + } else { + valuesBuilder.get.preciseSize + } + + logUnrollFailureMessage(blockId, actualSize) Left(unrollMemoryUsedByThisBlock) + } } @@ -305,14 +306,12 @@ private[spark] class MemoryStore( putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match { case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => - // We ran out of space while unrolling the values for this block - val (unrolledIterator, size) = if (valuesHolder.vector != null) { - (valuesHolder.vector.iterator, valuesHolder.vector.estimateSize()) + val unrolledIterator = if (valuesHolder.vector != null) { + valuesHolder.vector.iterator } else { - (valuesHolder.arrayValues.toIterator, valuesHolder.preciseSize) + valuesHolder.arrayValues.toIterator } - logUnrollFailureMessage(blockId, size) Left(new PartiallyUnrolledIterator( this, MemoryMode.ON_HEAP, @@ -357,8 +356,6 @@ private[spark] class MemoryStore( putIterator(blockId, values, classTag, memoryMode, valuesHolder) match { case Right(storedSize) => Right(storedSize) case Left(unrollMemoryUsedByThisBlock) => - // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, valuesHolder.bbos.size) Left(new PartiallySerializedBlock( this, serializerManager, @@ -645,10 +642,15 @@ private[spark] class MemoryStore( } } +private trait ValuesBuilder[T] { + def preciseSize: Long + def build(): MemoryEntry[T] +} + private trait ValuesHolder[T] { def storeValue(value: T): Unit - def estimatedSize(roughly: Boolean): Long - def buildEntry(): MemoryEntry[T] + def estimatedSize(): Long + def getBuilder(): ValuesBuilder[T] } /** @@ -658,28 +660,24 @@ private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends Values // Underlying vector for unrolling the block var vector = new SizeTrackingVector[T]()(classTag) var arrayValues: Array[T] = null - var preciseSize: Long = -1 override def storeValue(value: T): Unit = { vector += value } - override def estimatedSize(roughly: Boolean): Long = { - if (!roughly) { - // We only need the more precise size after all values unrolled. - arrayValues = vector.toArray - vector = null - preciseSize = SizeEstimator.estimate(arrayValues) - preciseSize - } else { - // For performance, rough estimate - vector.estimateSize() - } + override def estimatedSize(): Long = { + vector.estimateSize() } - override def buildEntry(): MemoryEntry[T] = { + override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] { // We successfully unrolled the entirety of this block - DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) + arrayValues = vector.toArray + vector = null + + override val preciseSize: Long = SizeEstimator.estimate(arrayValues) + + override def build(): MemoryEntry[T] = + DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag) } } @@ -710,15 +708,18 @@ private class SerializedValuesHolder[T]( serializationStream.writeObject(value)(classTag) } - override def estimatedSize(roughly: Boolean): Long = { - if (!roughly) { - serializationStream.close() - } + override def estimatedSize(): Long = { bbos.size } - override def buildEntry(): MemoryEntry[T] = { - SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) + override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] { + // We successfully unrolled the entirety of this block + serializationStream.close() + + override val preciseSize: Long = bbos.size + + override def build(): MemoryEntry[T] = + SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) } } From b41f1bbb5e774205b321554af1376c4683582a0e Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 25 Jan 2018 11:27:22 +0800 Subject: [PATCH 13/16] small fix --- .../main/scala/org/apache/spark/storage/memory/MemoryStore.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index cbd2eff0d490..7bbeb32528ba 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -282,7 +282,6 @@ private[spark] class MemoryStore( logUnrollFailureMessage(blockId, actualSize) Left(unrollMemoryUsedByThisBlock) - } } From 9e0759fb49eb4994099c10c8f8ec3a05637c915b Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 25 Jan 2018 13:20:34 +0800 Subject: [PATCH 14/16] address comments --- .../spark/storage/memory/MemoryStore.scala | 56 +++++++++---------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 7bbeb32528ba..31f195662c53 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -236,17 +236,12 @@ private[spark] class MemoryStore( elementsUnrolled += 1 } - val valuesBuilder = if (keepUnrolling) { - Some(valuesHolder.getBuilder()) - } else { - None - } - // Make sure that we have enough memory to store the block. By this point, it is possible that // the block's actual memory usage has exceeded the unroll memory by a small amount, so we // perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - val size = valuesBuilder.get.preciseSize + val valuesBuilder = valuesHolder.getBuilder() + val size = valuesBuilder.preciseSize if (size > unrollMemoryUsedByThisBlock) { val amountToRequest = size - unrollMemoryUsedByThisBlock keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) @@ -254,33 +249,31 @@ private[spark] class MemoryStore( unrollMemoryUsedByThisBlock += amountToRequest } } - } - if (keepUnrolling) { - val entry = valuesBuilder.get.build() - // Synchronize so that transfer is atomic - memoryManager.synchronized { - releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) - val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) - assert(success, "transferring unroll memory to storage memory failed") - } + if (keepUnrolling) { + val entry = valuesBuilder.build() + // Synchronize so that transfer is atomic + memoryManager.synchronized { + releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) + val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) + assert(success, "transferring unroll memory to storage memory failed") + } - entries.synchronized { - entries.put(blockId, entry) - } + entries.synchronized { + entries.put(blockId, entry) + } - logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId, - Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) - Right(entry.size) - } else { - // We ran out of space while unrolling the values for this block - val actualSize = if (valuesBuilder.isEmpty) { - valuesHolder.estimatedSize() + logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId, + Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed))) + Right(entry.size) } else { - valuesBuilder.get.preciseSize + // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, valuesBuilder.preciseSize) + Left(unrollMemoryUsedByThisBlock) } - - logUnrollFailureMessage(blockId, actualSize) + } else { + // We ran out of space while unrolling the values for this block + logUnrollFailureMessage(blockId, valuesHolder.estimatedSize()) Left(unrollMemoryUsedByThisBlock) } } @@ -649,6 +642,9 @@ private trait ValuesBuilder[T] { private trait ValuesHolder[T] { def storeValue(value: T): Unit def estimatedSize(): Long + // Return a ValuesBuilder which is used to build a memory entry and get the stored data size. + // Note: After this method is called, the ValuesHolder is invalid, we can't store data and + // get estimate size again. def getBuilder(): ValuesBuilder[T] } @@ -715,7 +711,7 @@ private class SerializedValuesHolder[T]( // We successfully unrolled the entirety of this block serializationStream.close() - override val preciseSize: Long = bbos.size + override def preciseSize(): Long = bbos.size override def build(): MemoryEntry[T] = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) From 40bdcacfc14b24c913c5979e0b2cf8b90154c543 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Thu, 25 Jan 2018 15:39:19 +0800 Subject: [PATCH 15/16] address comments --- .../org/apache/spark/storage/memory/MemoryStore.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 31f195662c53..f127b81e6fd6 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -642,9 +642,12 @@ private trait ValuesBuilder[T] { private trait ValuesHolder[T] { def storeValue(value: T): Unit def estimatedSize(): Long - // Return a ValuesBuilder which is used to build a memory entry and get the stored data size. - // Note: After this method is called, the ValuesHolder is invalid, we can't store data and - // get estimate size again. + + /** + * Note: After this method is called, the ValuesHolder is invalid, we can't store data and + * get estimate size again. + * @return a ValuesBuilder which is used to build a memory entry and get the stored data size. + */ def getBuilder(): ValuesBuilder[T] } From 9d1aeefb829601f5905006f410ce2bacf34a5da6 Mon Sep 17 00:00:00 2001 From: Xianyang Liu Date: Fri, 26 Jan 2018 09:10:27 +0800 Subject: [PATCH 16/16] address comments --- .../spark/storage/memory/MemoryStore.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index f127b81e6fd6..4cc5bcb7f9ba 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -240,8 +240,8 @@ private[spark] class MemoryStore( // the block's actual memory usage has exceeded the unroll memory by a small amount, so we // perform one final call to attempt to allocate additional memory if necessary. if (keepUnrolling) { - val valuesBuilder = valuesHolder.getBuilder() - val size = valuesBuilder.preciseSize + val entryBuilder = valuesHolder.getBuilder() + val size = entryBuilder.preciseSize if (size > unrollMemoryUsedByThisBlock) { val amountToRequest = size - unrollMemoryUsedByThisBlock keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) @@ -251,7 +251,7 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - val entry = valuesBuilder.build() + val entry = entryBuilder.build() // Synchronize so that transfer is atomic memoryManager.synchronized { releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) @@ -268,7 +268,7 @@ private[spark] class MemoryStore( Right(entry.size) } else { // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, valuesBuilder.preciseSize) + logUnrollFailureMessage(blockId, entryBuilder.preciseSize) Left(unrollMemoryUsedByThisBlock) } } else { @@ -634,7 +634,7 @@ private[spark] class MemoryStore( } } -private trait ValuesBuilder[T] { +private trait MemoryEntryBuilder[T] { def preciseSize: Long def build(): MemoryEntry[T] } @@ -646,9 +646,10 @@ private trait ValuesHolder[T] { /** * Note: After this method is called, the ValuesHolder is invalid, we can't store data and * get estimate size again. - * @return a ValuesBuilder which is used to build a memory entry and get the stored data size. + * @return a MemoryEntryBuilder which is used to build a memory entry and get the stored data + * size. */ - def getBuilder(): ValuesBuilder[T] + def getBuilder(): MemoryEntryBuilder[T] } /** @@ -667,7 +668,7 @@ private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends Values vector.estimateSize() } - override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] { + override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] { // We successfully unrolled the entirety of this block arrayValues = vector.toArray vector = null @@ -710,7 +711,7 @@ private class SerializedValuesHolder[T]( bbos.size } - override def getBuilder(): ValuesBuilder[T] = new ValuesBuilder[T] { + override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] { // We successfully unrolled the entirety of this block serializationStream.close()