From f254f94fdc5e2648d7c1104bf5ec2355de7c6055 Mon Sep 17 00:00:00 2001 From: Doug Rohrer Date: Mon, 14 May 2018 12:24:00 -0400 Subject: [PATCH 1/4] [SPARK-24225] Support closing AutoClosable objects in MemoryStore so Broadcast Variables can be released properly --- .../spark/storage/memory/MemoryStore.scala | 31 ++++++- .../spark/storage/MemoryStoreSuite.scala | 80 +++++++++++++++++++ 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 4cc5bcb7f9ba..fa2be88c99d5 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -21,6 +21,7 @@ import java.io.OutputStream import java.nio.ByteBuffer import java.util.LinkedHashMap +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -384,15 +385,36 @@ private[spark] class MemoryStore( } } + private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { + entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) + case _ => + } + } + + private def maybeCloseValues(objs: Array[Any]): Unit = { + objs.foreach { + case closable: AutoCloseable => + safelyCloseValue(closable) + case _ => + } + } + + private def safelyCloseValue(closable: AutoCloseable) = { + try { + closable.close() + } catch { + case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex) + } + } + def remove(blockId: BlockId): Boolean = memoryManager.synchronized { val entry = entries.synchronized { entries.remove(blockId) } if (entry != null) { - entry match { - case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() - case _ => - } + maybeReleaseResources(entry) memoryManager.releaseStorageMemory(entry.size, entry.memoryMode) logDebug(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") @@ -404,6 +426,7 @@ private[spark] class MemoryStore( def clear(): Unit = memoryManager.synchronized { entries.synchronized { + entries.values().asScala.foreach(maybeReleaseResources) entries.clear() } onHeapUnrollMemoryMap.clear() diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 7274072e5049..6d47a9bd0528 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -526,4 +526,84 @@ class MemoryStoreSuite } } } + + test("[SPARK-24225]: remove should close AutoCloseable object") { + + val (store, _) = makeMemoryStore(12000) + + val id = BroadcastBlockId(0) + val tracker = new CloseTracker() + store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) + assert(store.contains(id)) + store.remove(id) + assert(tracker.getClosed()) + } + + test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") { + + val (store, _) = makeMemoryStore(12000) + + val id = BroadcastBlockId(0) + val tracker = new CloseTracker(true) + store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) + assert(store.contains(id)) + store.remove(id) + assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close AutoCloseable objects") { + + val (store, _) = makeMemoryStore(12000) + + val id = BroadcastBlockId(0) + val tracker = new CloseTracker + store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) + assert(store.contains(id)) + store.clear() + assert(tracker.getClosed()) + } + + test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") { + + val (store, _) = makeMemoryStore(12000) + + val id1 = BroadcastBlockId(1) + val tracker2 = new CloseTracker + val tracker1 = new CloseTracker + store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), ClassTag.Any) + assert(store.contains(id1)) + store.clear() + assert(tracker1.getClosed()) + assert(tracker2.getClosed()) + } + + test("[SPARK-24225]: clear should close AutoCloseable objects even if they throw exceptions") { + + val (store, _) = makeMemoryStore(12000) + + val id1 = BroadcastBlockId(1) + val id2 = BroadcastBlockId(2) + val tracker2 = new CloseTracker(true) + val tracker1 = new CloseTracker(true) + store.putIteratorAsValues(id1, Iterator(tracker1), ClassTag.Any) + store.putIteratorAsValues(id2, Iterator(tracker2), ClassTag.Any) + assert(store.contains(id1)) + assert(store.contains(id2)) + store.clear() + assert(tracker1.getClosed()) + assert(tracker2.getClosed()) + } +} + +private class CloseTracker (val throwsOnClosed: Boolean = false) extends AutoCloseable { + var closed = false + override def close(): Unit = { + closed = true + if (throwsOnClosed) { + throw new RuntimeException("Throwing") + } + } + def getClosed(): Boolean = { + closed + } } From 62d46d3bf49ef0393a916d3cafaae4947f374f36 Mon Sep 17 00:00:00 2001 From: Doug Rohrer Date: Tue, 15 May 2018 11:31:28 -0400 Subject: [PATCH 2/4] Only close `Broadcast` blocks - Also fixed indentation issue. --- .../apache/spark/storage/memory/MemoryStore.scala | 14 ++++++++------ .../apache/spark/storage/MemoryStoreSuite.scala | 12 ++++++++++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index fa2be88c99d5..f129e2eeedd1 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -395,13 +395,13 @@ private[spark] class MemoryStore( private def maybeCloseValues(objs: Array[Any]): Unit = { objs.foreach { - case closable: AutoCloseable => - safelyCloseValue(closable) - case _ => - } + case closable: AutoCloseable => + safelyCloseValue(closable) + case _ => + } } - private def safelyCloseValue(closable: AutoCloseable) = { + private def safelyCloseValue(closable: AutoCloseable): Unit = { try { closable.close() } catch { @@ -414,7 +414,9 @@ private[spark] class MemoryStore( entries.remove(blockId) } if (entry != null) { - maybeReleaseResources(entry) + if (blockId.isBroadcast) { + maybeReleaseResources(entry) + } memoryManager.releaseStorageMemory(entry.size, entry.memoryMode) logDebug(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 6d47a9bd0528..8245b24d30d2 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -539,6 +539,18 @@ class MemoryStoreSuite assert(tracker.getClosed()) } + test("[SPARK-24225]: remove should not close object if it's not a broadcast variable") { + + val (store, _) = makeMemoryStore(12000) + + val id = "a1" // This will be a test variable + val tracker = new CloseTracker() + store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) + assert(store.contains(id)) + store.remove(id) + assert(!tracker.getClosed()) + } + test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") { val (store, _) = makeMemoryStore(12000) From 94512e67518c1e36a078866d01199fd245cb2f88 Mon Sep 17 00:00:00 2001 From: Doug Rohrer Date: Tue, 15 May 2018 13:46:32 -0400 Subject: [PATCH 3/4] Address style/variable naming issues --- .../spark/storage/memory/MemoryStore.scala | 6 ++-- .../spark/storage/MemoryStoreSuite.scala | 29 ++++++++++++------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index f129e2eeedd1..23883486277e 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -388,13 +388,13 @@ private[spark] class MemoryStore( private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { entry match { case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() - case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs) + case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values) case _ => } } - private def maybeCloseValues(objs: Array[Any]): Unit = { - objs.foreach { + private def maybeCloseValues(values: Array[Any]): Unit = { + values.foreach { case closable: AutoCloseable => safelyCloseValue(closable) case _ => diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 8245b24d30d2..8a7aa24fa184 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -536,7 +536,7 @@ class MemoryStoreSuite store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) assert(store.contains(id)) store.remove(id) - assert(tracker.getClosed()) + assert(tracker.getClosed) } test("[SPARK-24225]: remove should not close object if it's not a broadcast variable") { @@ -548,7 +548,7 @@ class MemoryStoreSuite store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) assert(store.contains(id)) store.remove(id) - assert(!tracker.getClosed()) + assert(!tracker.getClosed) } test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") { @@ -560,7 +560,7 @@ class MemoryStoreSuite store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) assert(store.contains(id)) store.remove(id) - assert(tracker.getClosed()) + assert(tracker.getClosed) } test("[SPARK-24225]: clear should close AutoCloseable objects") { @@ -572,7 +572,7 @@ class MemoryStoreSuite store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any) assert(store.contains(id)) store.clear() - assert(tracker.getClosed()) + assert(tracker.getClosed) } test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") { @@ -585,8 +585,8 @@ class MemoryStoreSuite store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), ClassTag.Any) assert(store.contains(id1)) store.clear() - assert(tracker1.getClosed()) - assert(tracker2.getClosed()) + assert(tracker1.getClosed) + assert(tracker2.getClosed) } test("[SPARK-24225]: clear should close AutoCloseable objects even if they throw exceptions") { @@ -602,20 +602,29 @@ class MemoryStoreSuite assert(store.contains(id1)) assert(store.contains(id2)) store.clear() - assert(tracker1.getClosed()) - assert(tracker2.getClosed()) + assert(tracker1.getClosed) + assert(tracker1.getExceptionThrown) + assert(tracker2.getClosed) + assert(tracker2.getExceptionThrown) } } private class CloseTracker (val throwsOnClosed: Boolean = false) extends AutoCloseable { - var closed = false + private var closed = false + private var exceptionThrown = false + override def close(): Unit = { closed = true if (throwsOnClosed) { + exceptionThrown = true throw new RuntimeException("Throwing") } } - def getClosed(): Boolean = { + def getClosed: Boolean = { closed } + + def getExceptionThrown: Boolean = { + exceptionThrown + } } From 790c906062fc55f553a5cecb61147f801875d4b2 Mon Sep 17 00:00:00 2001 From: Doug Rohrer Date: Fri, 18 May 2018 09:27:13 -0400 Subject: [PATCH 4/4] Fix handling of broadcast blocks Needd to do the check further down the call stack so the original release still works. --- .../spark/broadcast/TorrentBroadcast.scala | 1 + .../spark/storage/memory/MemoryStore.scala | 30 +++++++------------ .../scala/org/apache/spark/util/Utils.scala | 12 ++++++++ 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index e125095cf477..84ef0fcc469a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -238,6 +238,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) // need to re-fetch it. val storageLevel = StorageLevel.MEMORY_AND_DISK if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { + Utils.tryClose(obj) throw new SparkException(s"Failed to store $broadcastId in BlockManager") } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 23883486277e..aa6413707daf 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -385,27 +385,21 @@ private[spark] class MemoryStore( } } - private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = { - entry match { - case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() - case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values) - case _ => - } + private def maybeReleaseResources(resource: (BlockId, MemoryEntry[_])): Unit = { + maybeReleaseResources(resource._1, resource._2) } - private def maybeCloseValues(values: Array[Any]): Unit = { - values.foreach { - case closable: AutoCloseable => - safelyCloseValue(closable) + private def maybeReleaseResources(blockId: BlockId, entry: MemoryEntry[_]): Unit = { + entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values, blockId) case _ => } } - private def safelyCloseValue(closable: AutoCloseable): Unit = { - try { - closable.close() - } catch { - case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex) + private def maybeCloseValues(values: Array[Any], blockId: BlockId): Unit = { + if (blockId.isBroadcast) { + values.foreach(value => Utils.tryClose(value)) } } @@ -414,9 +408,7 @@ private[spark] class MemoryStore( entries.remove(blockId) } if (entry != null) { - if (blockId.isBroadcast) { - maybeReleaseResources(entry) - } + maybeReleaseResources(blockId, entry) memoryManager.releaseStorageMemory(entry.size, entry.memoryMode) logDebug(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") @@ -428,7 +420,7 @@ private[spark] class MemoryStore( def clear(): Unit = memoryManager.synchronized { entries.synchronized { - entries.values().asScala.foreach(maybeReleaseResources) + entries.asScala.foreach(maybeReleaseResources) entries.clear() } onHeapUnrollMemoryMap.clear() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 13adaa921dc2..3f9d2f98d875 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1930,6 +1930,18 @@ private[spark] object Utils extends Logging { } } + def tryClose(value: Any): Unit = { + value match { + case closable: AutoCloseable => + try { + closable.close() + } catch { + case ex: Exception => logError(s"Failed to close AutoClosable $closable", ex) + } + case _ => + } + } + /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ def isFatalError(e: Throwable): Boolean = { e match {