Skip to content

Commit 820913f

Browse files
zsxwingtdas
authored andcommitted
[SPARK-10071] [STREAMING] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
Output a warning when serializing QueueInputDStream rather than throwing an exception to allow unit tests use it. Moreover, this PR also throws an better exception when deserializing QueueInputDStream to make the user find out the problem easily. The previous exception is hard to understand: https://issues.apache.org/jira/browse/SPARK-8553 Author: zsxwing <zsxwing@gmail.com> Closes apache#8624 from zsxwing/SPARK-10071 and squashes the following commits: 847cfa8 [zsxwing] Output a warning when writing QueueInputDStream and throw a better exception when reading QueueInputDStream
1 parent ae74c3f commit 820913f

File tree

3 files changed

+30
-13
lines changed

3 files changed

+30
-13
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ object CheckpointReader extends Logging {
321321

322322
// Try to read the checkpoint files in the order
323323
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
324-
val compressionCodec = CompressionCodec.createCodec(conf)
324+
var readError: Exception = null
325325
checkpointFiles.foreach(file => {
326326
logInfo("Attempting to load checkpoint from file " + file)
327327
try {
@@ -332,13 +332,15 @@ object CheckpointReader extends Logging {
332332
return Some(cp)
333333
} catch {
334334
case e: Exception =>
335+
readError = e
335336
logWarning("Error reading checkpoint from file " + file, e)
336337
}
337338
})
338339

339340
// If none of checkpoint files could be read, then throw exception
340341
if (!ignoreReadError) {
341-
throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
342+
throw new SparkException(
343+
s"Failed to read checkpoint from directory $checkpointPath", readError)
342344
}
343345
None
344346
}

streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.streaming.dstream
1919

20-
import java.io.{NotSerializableException, ObjectOutputStream}
20+
import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream}
2121

2222
import scala.collection.mutable.{ArrayBuffer, Queue}
2323
import scala.reflect.ClassTag
@@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag](
3737

3838
override def stop() { }
3939

40+
private def readObject(in: ObjectInputStream): Unit = {
41+
throw new NotSerializableException("queueStream doesn't support checkpointing. " +
42+
"Please don't use queueStream when checkpointing is enabled.")
43+
}
44+
4045
private def writeObject(oos: ObjectOutputStream): Unit = {
41-
throw new NotSerializableException("queueStream doesn't support checkpointing")
46+
logWarning("queueStream doesn't support checkpointing")
4247
}
4348

4449
override def compute(validTime: Time): Option[RDD[T]] = {

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts
3030
import org.scalatest.exceptions.TestFailedDueToTimeoutException
3131
import org.scalatest.time.SpanSugar._
3232

33-
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
33+
import org.apache.spark._
3434
import org.apache.spark.metrics.MetricsSystem
3535
import org.apache.spark.metrics.source.Source
3636
import org.apache.spark.storage.StorageLevel
@@ -726,16 +726,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
726726
}
727727

728728
test("queueStream doesn't support checkpointing") {
729-
val checkpointDir = Utils.createTempDir()
730-
ssc = new StreamingContext(master, appName, batchDuration)
731-
val rdd = ssc.sparkContext.parallelize(1 to 10)
732-
ssc.queueStream[Int](Queue(rdd)).print()
733-
ssc.checkpoint(checkpointDir.getAbsolutePath)
734-
val e = intercept[NotSerializableException] {
735-
ssc.start()
729+
val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
730+
def creatingFunction(): StreamingContext = {
731+
val _ssc = new StreamingContext(conf, batchDuration)
732+
val rdd = _ssc.sparkContext.parallelize(1 to 10)
733+
_ssc.checkpoint(checkpointDirectory)
734+
_ssc.queueStream[Int](Queue(rdd)).register()
735+
_ssc
736+
}
737+
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
738+
ssc.start()
739+
eventually(timeout(10000 millis)) {
740+
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
741+
}
742+
ssc.stop()
743+
val e = intercept[SparkException] {
744+
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
736745
}
737746
// StreamingContext.validate changes the message, so use "contains" here
738-
assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
747+
assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
748+
"Please don't use queueStream when checkpointing is enabled."))
739749
}
740750

741751
def addInputStream(s: StreamingContext): DStream[Int] = {

0 commit comments

Comments
 (0)