From 92bfce821c998c0b1a5b7ab3b674d0e59a7c0033 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 2 Jun 2016 13:30:42 -0700 Subject: [PATCH 1/6] Add failing regression test. --- .../scala/org/apache/spark/FailureSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 333c23bdaf6d6..99a43367a2437 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.io.{IOException, NotSerializableException, ObjectInputStream} import org.apache.spark.memory.TestMemoryConsumer +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NonSerializable // Common state shared by FailureSuite-launched tasks. We use a global object @@ -241,6 +242,21 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { FailureSuiteState.clear() } + test("failure because cached RDD files are missing") { + sc = new SparkContext("local[1,2]", "test") + val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY) + rdd.count() + + // Directly delete all files from the disk store, triggering failures when reading cached data: + sc.parallelize(1 to 8, 8).foreachPartition { _ => + SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete()) + } + + // Each task should fail once due to missing cached data, but then should succeed on its second + // attempt because the missing cache locations will be purged and the blocks will be recomputed. + rdd.count() + } + // TODO: Need to add tests with shuffle fetch failures. } From a104ac534be3df927a21aeb42fe6c11e1cd03636 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 2 Jun 2016 14:08:43 -0700 Subject: [PATCH 2/6] Add failing unit tests in BlockManagerSuite. --- .../spark/storage/BlockManagerSuite.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a2580304c4ed2..ac6ec16d3e50f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1139,6 +1139,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getSingle("a3").isDefined, "a3 was not in store") } + private def testReadWithLossOfOnDiskFiles( + storageLevel: StorageLevel, + readMethod: BlockManager => Option[_]): Unit = { + store = makeBlockManager(12000) + assert(store.putSingle("blockId", new Array[Byte](4000), storageLevel)) + assert(store.getStatus("blockId").isDefined) + // Directly delete all files from the disk store, triggering failures when reading blocks: + store.diskBlockManager.getAllFiles().foreach(_.delete()) + // The BlockManager still thinks that these blocks exist: + assert(store.getStatus("blockId").isDefined) + // Because the BlockManager's metadata claims that the block exists (i.e. that it's present + // in at least one store), the read attempts to read it and fails when the on-disk file is + // missing. + intercept[SparkException] { + readMethod(store) + } + // Subsequent read attempts will succeed; the block isn't present but we return an expected + // "block not found" response rather than a fatal error: + assert(readMethod(store).isEmpty) + // The reason why this second read succeeded is because the metadata entry for the missing + // block was removed as a result of the read failure: + assert(store.getStatus("blockId").isEmpty) + } + + test("remove cached block if a read fails due to missing on-disk files") { + val storageLevels = Seq( + StorageLevel(useDisk = true, useMemory = false, deserialized = false), + StorageLevel(useDisk = true, useMemory = false, deserialized = true)) + val readMethods = Map[String, BlockManager => Option[_]]( + "getLocalBytes" -> ((m: BlockManager) => m.getLocalBytes("blockId")), + "getLocalValues" -> ((m: BlockManager) => m.getLocalValues("blockId")) + ) + testReadWithLossOfOnDiskFiles(StorageLevel.DISK_ONLY, _.getLocalBytes("blockId")) + for ((readMethodName, readMethod) <- readMethods; storageLevel <- storageLevels) { + withClue(s"$readMethodName $storageLevel") { + testReadWithLossOfOnDiskFiles(storageLevel, readMethod) + } + } + } + test("SPARK-13328: refresh block locations (fetch should fail after hitting a threshold)") { val mockBlockTransferService = new MockBlockTransferService(conf.getInt("spark.block.failures.beforeLocationRefresh", 5)) From e26b2f6aa0571392730338960666ca66f5901f35 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 2 Jun 2016 14:09:02 -0700 Subject: [PATCH 3/6] Fix actual bug. --- .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2f9473aedc2dc..00fa94fc9fbf4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -442,6 +442,8 @@ private[spark] class BlockManager( Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } else { releaseLock(blockId) + // Remove the missing block so that its unavailability is reported to the driver + removeBlock(blockId) throw new SparkException(s"Block $blockId was not found even though it's read-locked") } } @@ -490,6 +492,8 @@ private[spark] class BlockManager( serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get) } else { releaseLock(blockId) + // Remove the missing block so that its unavailability is reported to the driver + removeBlock(blockId) throw new SparkException(s"Block $blockId was not found even though it's read-locked") } } else { // storage level is serialized @@ -500,6 +504,8 @@ private[spark] class BlockManager( maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes) } else { releaseLock(blockId) + // Remove the missing block so that its unavailability is reported to the driver + removeBlock(blockId) throw new SparkException(s"Block $blockId was not found even though it's read-locked") } } From 0c51bb63e979549f2fad4fc14767ac6a19952e8c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 2 Jun 2016 14:21:22 -0700 Subject: [PATCH 4/6] Update test names. --- core/src/test/scala/org/apache/spark/FailureSuite.scala | 2 +- .../test/scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 99a43367a2437..38ee62f46bd6a 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -242,7 +242,7 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { FailureSuiteState.clear() } - test("failure because cached RDD files are missing") { + test("failure because cached RDD partitions are missing from DiskStore (SPARK-15736)") { sc = new SparkContext("local[1,2]", "test") val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY) rdd.count() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index ac6ec16d3e50f..6821582254f5b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -1163,7 +1163,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getStatus("blockId").isEmpty) } - test("remove cached block if a read fails due to missing on-disk files") { + test("remove block if a read fails due to missing DiskStore files (SPARK-15736)") { val storageLevels = Seq( StorageLevel(useDisk = true, useMemory = false, deserialized = false), StorageLevel(useDisk = true, useMemory = false, deserialized = true)) From b86ff24596beb82c74a5d3daf4480a40597043b4 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 2 Jun 2016 14:50:00 -0700 Subject: [PATCH 5/6] Simplify integration test. --- core/src/test/scala/org/apache/spark/FailureSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 38ee62f46bd6a..132f6361e41e6 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -246,12 +246,8 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local[1,2]", "test") val rdd = sc.parallelize(1 to 2, 2).persist(StorageLevel.DISK_ONLY) rdd.count() - // Directly delete all files from the disk store, triggering failures when reading cached data: - sc.parallelize(1 to 8, 8).foreachPartition { _ => - SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete()) - } - + SparkEnv.get.blockManager.diskBlockManager.getAllFiles().foreach(_.delete()) // Each task should fail once due to missing cached data, but then should succeed on its second // attempt because the missing cache locations will be purged and the blocks will be recomputed. rdd.count() From 80bc4867aae98837da8023a0da6f80114e21ff9e Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 2 Jun 2016 14:50:12 -0700 Subject: [PATCH 6/6] De-duplicate copy-pasted code. --- .../apache/spark/storage/BlockManager.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 00fa94fc9fbf4..83a9cbd63d391 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -402,6 +402,17 @@ private[spark] class BlockManager( locations } + /** + * Cleanup code run in response to a failed local read. + * Must be called while holding a read lock on the block. + */ + private def handleLocalReadFailure(blockId: BlockId): Nothing = { + releaseLock(blockId) + // Remove the missing block so that its unavailability is reported to the driver + removeBlock(blockId) + throw new SparkException(s"Block $blockId was not found even though it's read-locked") + } + /** * Get block from local block manager as an iterator of Java objects. */ @@ -441,10 +452,7 @@ private[spark] class BlockManager( val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } else { - releaseLock(blockId) - // Remove the missing block so that its unavailability is reported to the driver - removeBlock(blockId) - throw new SparkException(s"Block $blockId was not found even though it's read-locked") + handleLocalReadFailure(blockId) } } } @@ -491,10 +499,7 @@ private[spark] class BlockManager( // The block was not found on disk, so serialize an in-memory copy: serializerManager.dataSerialize(blockId, memoryStore.getValues(blockId).get) } else { - releaseLock(blockId) - // Remove the missing block so that its unavailability is reported to the driver - removeBlock(blockId) - throw new SparkException(s"Block $blockId was not found even though it's read-locked") + handleLocalReadFailure(blockId) } } else { // storage level is serialized if (level.useMemory && memoryStore.contains(blockId)) { @@ -503,10 +508,7 @@ private[spark] class BlockManager( val diskBytes = diskStore.getBytes(blockId) maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes) } else { - releaseLock(blockId) - // Remove the missing block so that its unavailability is reported to the driver - removeBlock(blockId) - throw new SparkException(s"Block $blockId was not found even though it's read-locked") + handleLocalReadFailure(blockId) } } }