diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 54d736ee5101..5ab90c096d03 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -114,8 +114,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap { outputStream => + val genStartTime=System.currentTimeMillis() val jobOption = outputStream.generateJob(time) jobOption.foreach(_.setCallSite(outputStream.creationSite)) + jobOption.foreach(_.setGenDelay(System.currentTimeMillis()-genStartTime)) jobOption } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala index db0bae9958d6..d75f84c62826 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala @@ -241,4 +241,5 @@ private[streaming] case class JavaOutputOperationInfo( description: String, startTime: Long, endTime: Long, + jobGenTime: Long, failureReason: String) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index b109b9f1cbea..3cbd5e1ff334 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -58,6 +58,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav outputOperationInfo.description: String, outputOperationInfo.startTime.getOrElse(-1), outputOperationInfo.endTime.getOrElse(-1), + outputOperationInfo.jobGenTime.getOrElse(-1), outputOperationInfo.failureReason.orNull ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 5b2b959f8138..b46925557c4e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -34,6 +34,7 @@ import org.apache.spark.streaming.Time @DeveloperApi case class BatchInfo( batchTime: Time, + jobSetCreationDelay: Option[Long], streamIdToInputInfo: Map[Int, StreamInputInfo], submissionTime: Long, processingStartTime: Option[Long], @@ -41,6 +42,7 @@ case class BatchInfo( outputOperationInfos: Map[Int, OutputOperationInfo] ) { + def batchJobSetCreationDelay = jobSetCreationDelay.getOrElse(0L) /** * Time taken for the first job of this batch to start processing from the time this batch * was submitted to the streaming scheduler. Essentially, it is diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index 7050d7ef4524..2f0dcabb9d7e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -34,6 +34,7 @@ class Job(val time: Time, func: () => _) { private var _callSite: CallSite = null private var _startTime: Option[Long] = None private var _endTime: Option[Long] = None + private var _jobGenTime: Option[Long] = None def run() { _result = Try(func()) @@ -85,6 +86,14 @@ class Job(val time: Time, func: () => _) { _startTime = Some(startTime) } + def setGenDelay(jobGenTime: Long): Unit = { + _jobGenTime = Some(jobGenTime) + } + + def getGenDelay(): Option[Long] = { + _jobGenTime + } + def setEndTime(endTime: Long): Unit = { _endTime = Some(endTime) } @@ -96,7 +105,7 @@ class Job(val time: Time, func: () => _) { None } OutputOperationInfo( - time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, failureReason) + time, outputOpId, callSite.shortForm, callSite.longForm, _startTime, _endTime, _jobGenTime, failureReason) } override def toString: String = id diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 307ff1f7ec23..f87b00f50c0e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -231,7 +231,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // added but not allocated, are dangling in the queue after recovering, we have to allocate // those blocks to the next batch, which is the batch they were supposed to go. jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch - jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) + jobScheduler.submitJobSet(JobSet(time, None, graph.generateJobs(time))) } // Restart the timer @@ -241,6 +241,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { + val jobSetCreationStartTime=clock.getTimeMillis() // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") @@ -249,8 +250,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => + val jobSetCreationEndTime=clock.getTimeMillis() val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) - jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) + jobScheduler.submitJobSet(JobSet(time, Option(jobSetCreationEndTime-jobSetCreationStartTime), jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 0baedaf275d6..7b28c21b9113 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -28,6 +28,7 @@ import org.apache.spark.streaming.Time private[streaming] case class JobSet( time: Time, + jobSetCreationDelay: Option[Long], jobs: Seq[Job], streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) { @@ -63,6 +64,7 @@ case class JobSet( def toBatchInfo: BatchInfo = { BatchInfo( time, + jobSetCreationDelay, streamIdToInputInfo, submissionTime, if (hasStarted) Some(processingStartTime) else None, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala index 137e512a670d..8262370de9e0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala @@ -39,6 +39,7 @@ case class OutputOperationInfo( description: String, startTime: Option[Long], endTime: Option[Long], + jobGenTime: Option[Long], failureReason: Option[String]) { /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala index 1af60857bc77..dfc89d500b5c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala @@ -116,6 +116,7 @@ private[ui] case class OutputOperationUIData( description: String, startTime: Option[Long], endTime: Option[Long], + jobGenTime: Option[Long], failureReason: Option[String]) { def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s @@ -130,6 +131,7 @@ private[ui] object OutputOperationUIData { outputOperationInfo.description, outputOperationInfo.startTime, outputOperationInfo.endTime, + outputOperationInfo.jobGenTime, outputOperationInfo.failureReason ) } diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala index 0295e059f7bc..8b9268f24c75 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala @@ -63,7 +63,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { assertReceiverInfo(listener.receiverError.receiverInfo, receiverError.receiverInfo) val batchSubmitted = StreamingListenerBatchSubmitted(BatchInfo( - batchTime = Time(1000L), + batchTime = Time(1000L), None, streamIdToInputInfo = Map( 0 -> StreamInputInfo( inputStreamId = 0, @@ -84,6 +84,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = None, endTime = None, + jobGenTime = None, failureReason = None), 1 -> OutputOperationInfo( batchTime = Time(1000L), @@ -92,13 +93,14 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation2", startTime = None, endTime = None, + jobGenTime = None, failureReason = None)) )) listenerWrapper.onBatchSubmitted(batchSubmitted) assertBatchInfo(listener.batchSubmitted.batchInfo, batchSubmitted.batchInfo) val batchStarted = StreamingListenerBatchStarted(BatchInfo( - batchTime = Time(1000L), + batchTime = Time(1000L), None, streamIdToInputInfo = Map( 0 -> StreamInputInfo( inputStreamId = 0, @@ -119,6 +121,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = Some(1003L), endTime = None, + jobGenTime = None, failureReason = None), 1 -> OutputOperationInfo( batchTime = Time(1000L), @@ -127,13 +130,14 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation2", startTime = Some(1005L), endTime = None, + jobGenTime = None, failureReason = None)) )) listenerWrapper.onBatchStarted(batchStarted) assertBatchInfo(listener.batchStarted.batchInfo, batchStarted.batchInfo) val batchCompleted = StreamingListenerBatchCompleted(BatchInfo( - batchTime = Time(1000L), + batchTime = Time(1000L), None, streamIdToInputInfo = Map( 0 -> StreamInputInfo( inputStreamId = 0, @@ -154,6 +158,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = Some(1003L), endTime = Some(1004L), + jobGenTime = None, failureReason = None), 1 -> OutputOperationInfo( batchTime = Time(1000L), @@ -162,6 +167,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation2", startTime = Some(1005L), endTime = Some(1010L), + jobGenTime = None, failureReason = None)) )) listenerWrapper.onBatchCompleted(batchCompleted) @@ -174,6 +180,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = Some(1003L), endTime = None, + jobGenTime = None, failureReason = None )) listenerWrapper.onOutputOperationStarted(outputOperationStarted) @@ -187,6 +194,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { description = "operation1", startTime = Some(1003L), endTime = Some(1004L), + jobGenTime = None, failureReason = None )) listenerWrapper.onOutputOperationCompleted(outputOperationCompleted) @@ -243,6 +251,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite { assert(javaOutputOperationInfo.description === outputOperationInfo.description) assert(javaOutputOperationInfo.startTime === outputOperationInfo.startTime.getOrElse(-1)) assert(javaOutputOperationInfo.endTime === outputOperationInfo.endTime.getOrElse(-1)) + assert(javaOutputOperationInfo.jobGenTime === outputOperationInfo.jobGenTime.getOrElse(-1)) assert(javaOutputOperationInfo.failureReason === outputOperationInfo.failureReason.orNull) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala index 26b757cc2d53..5f83a9e698ab 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala @@ -63,7 +63,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { 1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))) // onBatchSubmitted - val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) + val batchInfoSubmitted = BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted))) listener.runningBatches should be (Nil) @@ -76,7 +76,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchStarted val batchInfoStarted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (List(BatchUIData(batchInfoStarted))) @@ -118,7 +118,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchCompleted val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) listener.waitingBatches should be (Nil) listener.runningBatches should be (Nil) @@ -159,7 +159,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L)) val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) for(_ <- 0 until (limit + 10)) { listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) @@ -177,7 +177,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // fulfill completedBatchInfos for(i <- 0 until limit) { val batchInfoCompleted = BatchInfo( - Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) + Time(1000 + i * 100), None, Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1) listener.onJobStart(jobStart) @@ -188,7 +188,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { listener.onJobStart(jobStart) val batchInfoSubmitted = - BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None, Map.empty) + BatchInfo(Time(1000 + limit * 100), None, Map.empty, (1000 + limit * 100), None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) // We still can see the info retrieved from onJobStart @@ -205,7 +205,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // A lot of "onBatchCompleted"s happen before "onJobStart" for(i <- limit + 1 to limit * 2) { val batchInfoCompleted = BatchInfo( - Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) + Time(1000 + i * 100), None, Map.empty, 1000 + i * 100, Some(2000 + i * 100), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) } @@ -231,12 +231,12 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchSubmitted val batchInfoSubmitted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, None, None, Map.empty) listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted)) // onBatchStarted val batchInfoStarted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted)) // onJobStart @@ -254,7 +254,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers { // onBatchCompleted val batchInfoCompleted = - BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None, Map.empty) + BatchInfo(Time(1000), None, streamIdToInputInfo, 1000, Some(2000), None, Map.empty) listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted)) }