Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.{Map, mutable}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.DynamicVariable

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
Expand Down Expand Up @@ -964,7 +965,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
if (isOutputSpecValidationEnabled) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
Expand Down Expand Up @@ -1042,7 +1043,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
if (isOutputSpecValidationEnabled) {
// FileOutputFormat ignores the filesystem parameter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cant these two lines be collapsed into a single function call, say isValidationEnabled ? Reduced duplication of hard to track logic.

val ignoredFs = FileSystem.get(hadoopConf)
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
Expand Down Expand Up @@ -1117,8 +1118,22 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
private[spark] def valueClass: Class[_] = vt.runtimeClass

private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord)

// Note: this needs to be a function instead of a 'val' so that the disableOutputSpecValidation
// setting can take effect:
private def isOutputSpecValidationEnabled: Boolean = {
val validationDisabled = PairRDDFunctions.disableOutputSpecValidation.value
val enabledInConf = self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)
enabledInConf && !validationDisabled
}
}

private[spark] object PairRDDFunctions {
val RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES = 256

/**
* Allows for the `spark.hadoop.validateOutputSpecs` checks to be disabled on a case-by-case
* basis; see SPARK-4835 for more details.
*/
val disableOutputSpecValidation: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false)
}
4 changes: 3 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,9 @@ Apart from these, the following properties are also available, and may be useful
<td>If set to true, validates the output specification (e.g. checking if the output directory already exists)
used in saveAsHadoopFile and other variants. This can be disabled to silence exceptions due to pre-existing
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.
This setting is ignored for jobs generated through Spark Streaming's StreamingContext, since
data may need to be rewritten to pre-existing output directories during checkpoint recovery.</td>
</tr>
<tr>
<td><code>spark.hadoop.cloneConf</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import scala.util.matching.Regex

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext.rddToFileName
Expand Down Expand Up @@ -292,7 +292,13 @@ abstract class DStream[T: ClassTag] (
// set this DStream's creation site, generate RDDs and then restore the previous call site.
val prevCallSite = ssc.sparkContext.getCallSite()
ssc.sparkContext.setCallSite(creationSite)
val rddOption = compute(time)
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
val rddOption = PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
ssc.sparkContext.setCallSite(prevCallSite)

rddOption.foreach { case newRDD =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming.dstream

import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming.{Duration, Time}
import scala.reflect.ClassTag

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConversions._
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
import akka.actor.{ActorRef, Actor, Props}
import org.apache.spark.{SparkException, Logging, SparkEnv}
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._


Expand Down Expand Up @@ -168,7 +169,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
job.run()
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
// see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
eventActor ! JobCompleted(job)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
}
}

test("recovery with saveAsHadoopFile inside transform operation") {
// Regression test for SPARK-4835.
//
// In that issue, the problem was that `saveAsHadoopFile(s)` would fail when the last batch
// was restarted from a checkpoint since the output directory would already exist. However,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space before However

// the other saveAsHadoopFile* tests couldn't catch this because they only tested whether the
// output matched correctly and not whether the post-restart batch had successfully finished
// without throwing any errors. The following test reproduces the same bug with a test that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space before The following

// actually fails because the error in saveAsHadoopFile causes transform() to fail, which
// prevents the expected output from being written to the output stream.
//
// This is not actually a valid use of transform, but it's being used here so that we can test
// the fix for SPARK-4835 independently of additional test cleanup.
//
// After SPARK-5079 is addressed, should be able to remove this test since a strengthened
// version of the other saveAsHadoopFile* tests would prevent regressions for this issue.
val tempDir = Files.createTempDir()
try {
testCheckpointedOperation(
Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq()),
(s: DStream[String]) => {
s.transform { (rdd, time) =>
val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
output.saveAsHadoopFile(
new File(tempDir, "result-" + time.milliseconds).getAbsolutePath,
classOf[Text],
classOf[IntWritable],
classOf[TextOutputFormat[Text, IntWritable]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Will be easier to read if the new File is assigned to a variable, and maybe saveAsHadoopFile is in two lines.

output
}
},
Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq(), Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
3
)
} finally {
Utils.deleteRecursively(tempDir)
}
}

// This tests whether the StateDStream's RDD checkpoints works correctly such
// that the system can recover from a master failure. This assumes as reliable,
// replayable input source - TestInputDStream.
Expand Down