@@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
2626import org .apache .spark .util .{SizeEstimator , Utils }
2727import org .apache .spark .util .collection .SizeTrackingVector
2828
29- private case class MemoryEntry (value : Any , size : Long , deserialized : Boolean )
29+ private case class MemoryEntry (value : Any , size : Long , deserialized : Boolean , var dropping : Boolean = false )
3030
3131/**
3232 * Stores blocks in memory, either as Arrays of deserialized Java objects or as
@@ -184,6 +184,17 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
184184 }
185185 }
186186
187+ def removeWithoutUpdateMemorySize (blockId : BlockId ) = {
188+ entries.synchronized {
189+ val entry = entries.remove(blockId)
190+ if (entry != null ) {
191+ entry.size
192+ } else {
193+ 0L
194+ }
195+ }
196+ }
197+
187198 override def clear () {
188199 entries.synchronized {
189200 entries.clear()
@@ -239,18 +250,18 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
239250 val currentSize = vector.estimateSize()
240251 if (currentSize >= memoryThreshold) {
241252 val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
242- // Hold the accounting lock, in case another thread concurrently puts a block that
243- // takes up the unrolling space we just ensured here
244- accountingLock.synchronized {
245- if (! reserveUnrollMemoryForThisThread(amountToRequest)) {
246- // If the first request is not granted, try again after ensuring free space
247- // If there is still not enough space, give up and drop the partition
248- val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
249- if (spaceToEnsure > 0 ) {
250- val result = ensureFreeSpace(blockId, spaceToEnsure)
251- droppedBlocks ++= result.droppedBlocks
253+ if (! reserveUnrollMemoryForThisThread(amountToRequest)) {
254+ val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
255+ if (spaceToEnsure > 0 ) {
256+ val task = planFreeSpace(blockId, spaceToEnsure)
257+ if (task.isDefined) {
258+ droppedBlocks ++= task.get.runTask()
259+ task.get.updateCurrentMemorySizeInTransaction {
260+ keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
261+ }
262+ } else {
263+ keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
252264 }
253- keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
254265 }
255266 }
256267 // New threshold is currentSize * memoryGrowthFactor
@@ -304,41 +315,31 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
304315 size : Long ,
305316 deserialized : Boolean ): ResultWithDroppedBlocks = {
306317
307- /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
308- * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
309- * been released, it must be ensured that those to-be-dropped blocks are not double counted
310- * for freeing up more space for another block that needs to be put. Only then the actually
311- * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
312-
313318 var putSuccess = false
314319 val droppedBlocks = new ArrayBuffer [(BlockId , BlockStatus )]
315320
316- accountingLock.synchronized {
317- val freeSpaceResult = ensureFreeSpace(blockId, size)
318- val enoughFreeSpace = freeSpaceResult.success
319- droppedBlocks ++= freeSpaceResult.droppedBlocks
320-
321- if (enoughFreeSpace) {
321+ val task = planFreeSpace(blockId, size)
322+ if (task.isDefined) {
323+ droppedBlocks ++= task.get.runTask()
324+ task.get.updateCurrentMemorySizeInTransaction {
322325 val entry = new MemoryEntry (value, size, deserialized)
323- entries.synchronized {
324- entries.put(blockId, entry)
325- currentMemory += size
326- }
327- val valuesOrBytes = if (deserialized) " values" else " bytes"
328- logInfo(" Block %s stored as %s in memory (estimated size %s, free %s)" .format(
329- blockId, valuesOrBytes, Utils .bytesToString(size), Utils .bytesToString(freeMemory)))
330- putSuccess = true
326+ entries.put(blockId, entry)
327+ currentMemory += size
328+ }
329+ val valuesOrBytes = if (deserialized) " values" else " bytes"
330+ logInfo(" Block %s stored as %s in memory (estimated size %s, free %s)" .format(
331+ blockId, valuesOrBytes, Utils .bytesToString(size), Utils .bytesToString(freeMemory)))
332+ putSuccess = true
333+ } else {
334+ // Tell the block manager that we couldn't put it in memory so that it can drop it to
335+ // disk if the block allows disk storage.
336+ val data = if (deserialized) {
337+ Left (value.asInstanceOf [Array [Any ]])
331338 } else {
332- // Tell the block manager that we couldn't put it in memory so that it can drop it to
333- // disk if the block allows disk storage.
334- val data = if (deserialized) {
335- Left (value.asInstanceOf [Array [Any ]])
336- } else {
337- Right (value.asInstanceOf [ByteBuffer ].duplicate())
338- }
339- val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
340- droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
339+ Right (value.asInstanceOf [ByteBuffer ].duplicate())
341340 }
341+ val droppedBlockResult = blockManager.dropFromMemory(blockId, data)
342+ droppedBlockResult.foreach { r => droppedBlocks += ((blockId, r._1)) }
342343 }
343344 ResultWithDroppedBlocks (putSuccess, droppedBlocks)
344345 }
@@ -354,66 +355,69 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
354355 *
355356 * Return whether there is enough free space, along with the blocks dropped in the process.
356357 */
357- private def ensureFreeSpace (
358+ private def planFreeSpace (
358359 blockIdToAdd : BlockId ,
359- space : Long ): ResultWithDroppedBlocks = {
360+ space : Long ): Option [ DroppingTask ] = {
360361 logInfo(s " ensureFreeSpace( $space) called with curMem= $currentMemory, maxMem= $maxMemory" )
361362
362- val droppedBlocks = new ArrayBuffer [(BlockId , BlockStatus )]
363-
364363 if (space > maxMemory) {
365364 logInfo(s " Will not store $blockIdToAdd as it is larger than our memory limit " )
366- return ResultWithDroppedBlocks (success = false , droppedBlocks)
365+ return None
367366 }
368367
369- // Take into account the amount of memory currently occupied by unrolling blocks
370- val actualFreeMemory = freeMemory - currentUnrollMemory
368+ accountingLock.synchronized {
369+ // Take into account the amount of memory currently occupied by unrolling blocks
370+ val actualFreeMemory = freeMemory - currentUnrollMemory
371371
372- if (actualFreeMemory < space) {
373- val rddToAdd = getRddId(blockIdToAdd)
374- val selectedBlocks = new ArrayBuffer [BlockId ]
375- var selectedMemory = 0L
372+ if (actualFreeMemory < space) {
373+ val rddToAdd = getRddId(blockIdToAdd)
374+ val selectedBlocks = new ArrayBuffer [BlockId ]
375+ var selectedMemory = 0L
376376
377- // This is synchronized to ensure that the set of entries is not changed
378- // (because of getValue or getBytes) while traversing the iterator, as that
379- // can lead to exceptions.
380- entries.synchronized {
381- val iterator = entries.entrySet().iterator()
382- while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
383- val pair = iterator.next()
384- val blockId = pair.getKey
385- if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
386- selectedBlocks += blockId
387- selectedMemory += pair.getValue.size
377+ // This is synchronized to ensure that the set of entries is not changed
378+ // (because of getValue or getBytes) while traversing the iterator, as that
379+ // can lead to exceptions.
380+ entries.synchronized {
381+ val iterator = entries.entrySet().iterator()
382+ while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
383+ val pair = iterator.next()
384+ val blockId = pair.getKey
385+ val entry = pair.getValue()
386+ if (! entry.dropping && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))) {
387+ selectedBlocks += blockId
388+ selectedMemory += entry.size
389+ }
388390 }
389391 }
390- }
391392
392- if (actualFreeMemory + selectedMemory >= space) {
393- logInfo(s " ${selectedBlocks.size} blocks selected for dropping " )
394- for (blockId <- selectedBlocks) {
395- val entry = entries.synchronized { entries.get(blockId) }
396- // This should never be null as only one thread should be dropping
397- // blocks and removing entries. However the check is still here for
398- // future safety.
399- if (entry != null ) {
400- val data = if (entry.deserialized) {
401- Left (entry.value.asInstanceOf [Array [Any ]])
402- } else {
403- Right (entry.value.asInstanceOf [ByteBuffer ].duplicate())
393+ if (actualFreeMemory + selectedMemory >= space) {
394+ logInfo(s " ${selectedBlocks.size} blocks selected for dropping " )
395+ val toBeDroppedBlocks = new ArrayBuffer [ToBeDroppedBlock ]
396+ for (blockId <- selectedBlocks) {
397+ val entry = entries.synchronized { entries.get(blockId) }
398+ // This should never be null as only one thread should be dropping
399+ // blocks and removing entries. However the check is still here for
400+ // future safety.
401+ if (entry != null ) {
402+ val data = if (entry.deserialized) {
403+ Left (entry.value.asInstanceOf [Array [Any ]])
404+ } else {
405+ Right (entry.value.asInstanceOf [ByteBuffer ].duplicate())
406+ }
407+ toBeDroppedBlocks += ToBeDroppedBlock (blockId, data)
408+ entry.dropping = true
404409 }
405- val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
406- droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
407410 }
411+ Some (new DroppingTask (toBeDroppedBlocks))
412+ } else {
413+ logInfo(s " Will not store $blockIdToAdd as it would require dropping another block " +
414+ " from the same RDD" )
415+ None
408416 }
409- return ResultWithDroppedBlocks (success = true , droppedBlocks)
410417 } else {
411- logInfo(s " Will not store $blockIdToAdd as it would require dropping another block " +
412- " from the same RDD" )
413- return ResultWithDroppedBlocks (success = false , droppedBlocks)
418+ Some (new DroppingTask (Nil ))
414419 }
415420 }
416- ResultWithDroppedBlocks (success = true , droppedBlocks)
417421 }
418422
419423 override def contains (blockId : BlockId ): Boolean = {
@@ -467,8 +471,49 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
467471 private [spark] def currentUnrollMemoryForThisThread : Long = accountingLock.synchronized {
468472 unrollMemoryMap.getOrElse(Thread .currentThread().getId, 0L )
469473 }
474+
475+ private class DroppingTask (blocks : Seq [ToBeDroppedBlock ]) {
476+ private var droppedMemorySize = 0L
477+
478+ def runTask () = {
479+ val droppedBlocks = new ArrayBuffer [(BlockId , BlockStatus )]
480+ var droppingDoneCount = 0
481+ try {
482+ blocks.foreach { block =>
483+ val dropResult = blockManager.dropFromMemory(block.id, block.data)
484+ dropResult.foreach { r =>
485+ droppedBlocks += block.id -> r._1
486+ droppedMemorySize += r._2
487+ }
488+ droppingDoneCount += 1
489+ }
490+ droppedBlocks
491+ } catch {
492+ case e : Exception =>
493+ blocks.drop(droppingDoneCount).foreach { block =>
494+ entries.synchronized {
495+ val entry = entries.get(block.id)
496+ if (entry != null ) entry.dropping = false
497+ }
498+ }
499+ if (droppedMemorySize > 0 ) {
500+ entries.synchronized { currentMemory -= droppedMemorySize }
501+ }
502+ throw e
503+ }
504+ }
505+
506+ def updateCurrentMemorySizeInTransaction (f : => Unit ): Unit = {
507+ entries.synchronized {
508+ currentMemory -= droppedMemorySize
509+ f
510+ }
511+ }
512+ }
470513}
471514
472515private [spark] case class ResultWithDroppedBlocks (
473516 success : Boolean ,
474517 droppedBlocks : Seq [(BlockId , BlockStatus )])
518+
519+ private [spark] case class ToBeDroppedBlock (id : BlockId , data : Either [Array [Any ], ByteBuffer ])
0 commit comments