From 5a0a1f9f94a2ee2640d0c482012ba992cfe88180 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 25 Nov 2015 17:09:05 -0800 Subject: [PATCH 1/4] Preserve partitioner through RDD checkpointing --- .../spark/rdd/ReliableCheckpointRDD.scala | 113 +++++++++++++++++- .../spark/rdd/ReliableRDDCheckpointData.scala | 21 +--- .../org/apache/spark/CheckpointSuite.scala | 59 ++++++++- 3 files changed, 163 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index a69be6a068bb..c64b609c6249 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -20,12 +20,12 @@ package org.apache.spark.rdd import java.io.IOException import scala.reflect.ClassTag +import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.{SerializableConfiguration, Utils} /** @@ -33,8 +33,9 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} */ private[spark] class ReliableCheckpointRDD[T: ClassTag]( sc: SparkContext, - val checkpointPath: String) - extends CheckpointRDD[T](sc) { + val checkpointPath: String, + _partitioner: Option[Partitioner] = None + ) extends CheckpointRDD[T](sc) { @transient private val hadoopConf = sc.hadoopConfiguration @transient private val cpath = new Path(checkpointPath) @@ -49,6 +50,14 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( */ override def getCheckpointFile: Option[String] = Some(checkpointPath) + override val partitioner: Option[Partitioner] = { + println("Creating partitioner") + _partitioner.orElse { + println("Reading partitioner from file") + ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) + } + } + /** * Return partitions described by the files in the checkpoint directory. * @@ -100,10 +109,52 @@ private[spark] object ReliableCheckpointRDD extends Logging { "part-%05d".format(partitionIndex) } + private def checkpointPartitionerFileName(): String = { + "_partitioner" + } + /** - * Write this partition's values to a checkpoint file. + * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. */ - def writeCheckpointFile[T: ClassTag]( + def createCheckpointedRDD[T: ClassTag]( + originalRDD: RDD[T], + checkpointDir: String, + blockSize: Int = -1): ReliableCheckpointRDD[T] = { + + val sc = originalRDD.sparkContext + + // Create the output path for the checkpoint + val checkpointDirPath = new Path(checkpointDir) + val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) + if (!fs.mkdirs(checkpointDirPath)) { + throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") + } + + // Save to file, and reload it as an RDD + val broadcastedConf = sc.broadcast( + new SerializableConfiguration(sc.hadoopConfiguration)) + // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) + sc.runJob(originalRDD, + writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) + + if (originalRDD.partitioner.nonEmpty) { + writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) + } + + val newRDD = new ReliableCheckpointRDD[T]( + sc, checkpointDirPath.toString, originalRDD.partitioner) + if (newRDD.partitions.length != originalRDD.partitions.length) { + throw new SparkException( + s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " + + s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})") + } + newRDD + } + + /** + * Write a RDD partition's data to a checkpoint file. + */ + def writePartitionToCheckpointFile[T: ClassTag]( path: String, broadcastedConf: Broadcast[SerializableConfiguration], blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { @@ -151,6 +202,58 @@ private[spark] object ReliableCheckpointRDD extends Logging { } } + def writePartitionerToCheckpointDir( + sc: SparkContext, partitioner: Partitioner, checkpointDirPath: Path): Unit = { + try { + val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) + val bufferSize = sc.conf.getInt("spark.buffer.size", 65536) + val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) + val fileOutputStream = fs.create(partitionerFilePath, false, bufferSize) + val serializer = SparkEnv.get.serializer.newInstance() + val serializeStream = serializer.serializeStream(fileOutputStream) + Utils.tryWithSafeFinally { + serializeStream.writeObject(partitioner) + } { + serializeStream.close() + } + println(s"Written partitioner to $partitionerFilePath") + } catch { + case NonFatal(e) => + logWarning(s"Error writing partitioner $partitioner to $checkpointDirPath") + } + } + + def readCheckpointedPartitionerFile( + sc: SparkContext, + checkpointDirPath: String): Option[Partitioner] = { + try { + val bufferSize = sc.conf.getInt("spark.buffer.size", 65536) + val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) + val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) + println(s"Reading partitioner from $partitionerFilePath") + if (fs.exists(partitionerFilePath)) { + val fileInputStream = fs.open(partitionerFilePath, bufferSize) + val serializer = SparkEnv.get.serializer.newInstance() + val deserializeStream = serializer.deserializeStream(fileInputStream) + val partitioner = Utils.tryWithSafeFinally[Partitioner] { + deserializeStream.readObject[Partitioner] + } { + deserializeStream.close() + } + println(s"Read partitioner from $partitionerFilePath") + Some(partitioner) + } else { + println("No partitioner file") + None + } + } catch { + case NonFatal(e) => + println(s"Error reading partitioner from $checkpointDirPath, " + + s"partitioner will not be recovered which can lead to performance loss", e) + None + } + } + /** * Read the content of the specified checkpoint file. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index 91cad6662e4d..f619eacfe11b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -55,25 +55,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v * This is called immediately after the first action invoked on this RDD has completed. */ protected override def doCheckpoint(): CheckpointRDD[T] = { - - // Create the output path for the checkpoint - val path = new Path(cpDir) - val fs = path.getFileSystem(rdd.context.hadoopConfiguration) - if (!fs.mkdirs(path)) { - throw new SparkException(s"Failed to create checkpoint path $cpDir") - } - - // Save to file, and reload it as an RDD - val broadcastedConf = rdd.context.broadcast( - new SerializableConfiguration(rdd.context.hadoopConfiguration)) - // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582) - rdd.context.runJob(rdd, ReliableCheckpointRDD.writeCheckpointFile[T](cpDir, broadcastedConf) _) - val newRDD = new ReliableCheckpointRDD[T](rdd.context, cpDir) - if (newRDD.partitions.length != rdd.partitions.length) { - throw new SparkException( - s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " + - s"number of partitions from original RDD $rdd(${rdd.partitions.length})") - } + val newRDD = ReliableCheckpointRDD.createCheckpointedRDD(rdd, cpDir) // Optionally clean our checkpoint files if the reference is out of scope if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) { @@ -83,7 +65,6 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v } logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") - newRDD } diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index ab23326c6c25..0f0cdf12de6a 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -21,7 +21,8 @@ import java.io.File import scala.reflect.ClassTag -import org.apache.spark.CheckpointSuite._ +import org.apache.hadoop.fs.Path + import org.apache.spark.rdd._ import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId} import org.apache.spark.util.Utils @@ -74,8 +75,10 @@ trait RDDCheckpointTester { self: SparkFunSuite => // Test whether the checkpoint file has been created if (reliableCheckpoint) { - assert( - collectFunc(sparkContext.checkpointFile[U](operatedRDD.getCheckpointFile.get)) === result) + assert(operatedRDD.getCheckpointFile.nonEmpty) + val recoveredRDD = sparkContext.checkpointFile[U](operatedRDD.getCheckpointFile.get) + assert(collectFunc(recoveredRDD) === result) + assert(recoveredRDD.partitioner === operatedRDD.partitioner) } // Test whether dependencies have been changed from its earlier parent RDD @@ -211,9 +214,12 @@ trait RDDCheckpointTester { self: SparkFunSuite => } /** Run a test twice, once for local checkpointing and once for reliable checkpointing. */ - protected def runTest(name: String)(body: Boolean => Unit): Unit = { + protected def runTest( + name: String, skipLocalCheckpoint: Boolean = false)(body: Boolean => Unit): Unit = { test(name + " [reliable checkpoint]")(body(true)) - test(name + " [local checkpoint]")(body(false)) + if (!skipLocalCheckpoint) { + test(name + " [local checkpoint]")(body(false)) + } } /** @@ -264,6 +270,49 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS assert(flatMappedRDD.collect() === result) } + runTest("checkpointing partitioners", skipLocalCheckpoint = true) { _: Boolean => + + def testPartitionerCheckpointing( + partitioner: Partitioner, + corruptPartitionerFile: Boolean = false + ): Unit = { + val rddWithPartitioner = sc.makeRDD(1 to 4).map { _ -> 1 }.partitionBy(partitioner) + rddWithPartitioner.checkpoint() + rddWithPartitioner.count() + assert(rddWithPartitioner.getCheckpointFile.get.nonEmpty, + "checkpointing was not successful") + + if (corruptPartitionerFile) { + // Overwrite the partitioner file with garbage data + val checkpointDir = new Path(rddWithPartitioner.getCheckpointFile.get) + val fs = checkpointDir.getFileSystem(sc.hadoopConfiguration) + val partitionerFile = fs.listStatus(checkpointDir) + .find(_.getPath.getName.contains("partitioner")) + .map(_.getPath) + require(partitionerFile.nonEmpty, "could not find the partitioner file for testing") + val output = fs.create(partitionerFile.get, true) + output.write(100) + output.close() + } + + val newRDD = sc.checkpointFile[(Int, Int)](rddWithPartitioner.getCheckpointFile.get) + assert(newRDD.collect().toSet === rddWithPartitioner.collect().toSet, "RDD not recovered") + + if (!corruptPartitionerFile) { + assert(newRDD.partitioner != None, "partitioner not recovered") + assert(newRDD.partitioner === rddWithPartitioner.partitioner, + "recovered partitioner does not match") + } else { + assert(newRDD.partitioner == None, "partitioner unexpectedly recovered") + } + } + + testPartitionerCheckpointing(partitioner) + + // Test that corrupted partitioner file does not prevent recovery of RDD + testPartitionerCheckpointing(partitioner, corruptPartitionerFile = true) + } + runTest("RDDs with one-to-one dependencies") { reliableCheckpoint: Boolean => testRDD(_.map(x => x.toString), reliableCheckpoint) testRDD(_.flatMap(x => 1 to x), reliableCheckpoint) From 4dfa2659365a8f1e24fec807139f5d504fdbc0c0 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 25 Nov 2015 17:20:15 -0800 Subject: [PATCH 2/4] Minor update --- .../apache/spark/rdd/ReliableCheckpointRDD.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index c64b609c6249..51712625e7dc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -51,9 +51,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( override def getCheckpointFile: Option[String] = Some(checkpointPath) override val partitioner: Option[Partitioner] = { - println("Creating partitioner") _partitioner.orElse { - println("Reading partitioner from file") ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) } } @@ -216,7 +214,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { } { serializeStream.close() } - println(s"Written partitioner to $partitionerFilePath") + logDebug(s"Written partitioner to $partitionerFilePath") } catch { case NonFatal(e) => logWarning(s"Error writing partitioner $partitioner to $checkpointDirPath") @@ -230,7 +228,6 @@ private[spark] object ReliableCheckpointRDD extends Logging { val bufferSize = sc.conf.getInt("spark.buffer.size", 65536) val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) - println(s"Reading partitioner from $partitionerFilePath") if (fs.exists(partitionerFilePath)) { val fileInputStream = fs.open(partitionerFilePath, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() @@ -240,16 +237,16 @@ private[spark] object ReliableCheckpointRDD extends Logging { } { deserializeStream.close() } - println(s"Read partitioner from $partitionerFilePath") + logDebug(s"Read partitioner from $partitionerFilePath") Some(partitioner) } else { - println("No partitioner file") + logDebug("No partitioner file") None } } catch { case NonFatal(e) => - println(s"Error reading partitioner from $checkpointDirPath, " + - s"partitioner will not be recovered which can lead to performance loss", e) + logWarning(s"Error reading partitioner from $checkpointDirPath, " + + s"partitioner will not be recovered which may lead to performance loss", e) None } } From bf7bebf5d4a95d1fce85840a9f3699666357345d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 30 Nov 2015 18:23:05 -0800 Subject: [PATCH 3/4] Address PR comments --- .../spark/rdd/ReliableCheckpointRDD.scala | 18 ++++++++++++++---- .../org/apache/spark/CheckpointSuite.scala | 4 +++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 51712625e7dc..fa71b8c26233 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -48,7 +48,7 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( /** * Return the path of the checkpoint directory this RDD reads data from. */ - override def getCheckpointFile: Option[String] = Some(checkpointPath) + override val getCheckpointFile: Option[String] = Some(checkpointPath) override val partitioner: Option[Partitioner] = { _partitioner.orElse { @@ -114,7 +114,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { /** * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD. */ - def createCheckpointedRDD[T: ClassTag]( + def writeRDDToCheckpointDirectory[T: ClassTag]( originalRDD: RDD[T], checkpointDir: String, blockSize: Int = -1): ReliableCheckpointRDD[T] = { @@ -200,7 +200,11 @@ private[spark] object ReliableCheckpointRDD extends Logging { } } - def writePartitionerToCheckpointDir( + /** + * Write a partitioner to the given RDD checkpoint directory. This is done on a best-effort + * basis; any exception while writing the partitioner is caught, logged and ignored. + */ + private def writePartitionerToCheckpointDir( sc: SparkContext, partitioner: Partitioner, checkpointDirPath: Path): Unit = { try { val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) @@ -221,7 +225,13 @@ private[spark] object ReliableCheckpointRDD extends Logging { } } - def readCheckpointedPartitionerFile( + + /** + * Read a partitioner from the given RDD checkpoint directory, if it exists. + * This is done on a best-effort basis; any exception while reading the partitioner is + * caught, logged and ignored. + */ + private def readCheckpointedPartitionerFile( sc: SparkContext, checkpointDirPath: String): Option[Partitioner] = { try { diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 0f0cdf12de6a..553d46285ac0 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -215,7 +215,9 @@ trait RDDCheckpointTester { self: SparkFunSuite => /** Run a test twice, once for local checkpointing and once for reliable checkpointing. */ protected def runTest( - name: String, skipLocalCheckpoint: Boolean = false)(body: Boolean => Unit): Unit = { + name: String, + skipLocalCheckpoint: Boolean = false + )(body: Boolean => Unit): Unit = { test(name + " [reliable checkpoint]")(body(true)) if (!skipLocalCheckpoint) { test(name + " [local checkpoint]")(body(false)) From 9eb7250f639c06449f226eccfa995164a7f46d89 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 30 Nov 2015 19:19:37 -0800 Subject: [PATCH 4/4] Fix compiler error --- .../scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index f619eacfe11b..cac6cbe780e9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -55,7 +55,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v * This is called immediately after the first action invoked on this RDD has completed. */ protected override def doCheckpoint(): CheckpointRDD[T] = { - val newRDD = ReliableCheckpointRDD.createCheckpointedRDD(rdd, cpDir) + val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir) // Optionally clean our checkpoint files if the reference is out of scope if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {