Skip to content
Closed
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ object MimaExcludes {
// [SPARK-18236] Reduce duplicate objects in Spark UI and HistoryServer
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.TaskInfo.accumulables"),

// [SPARK-18657] Add StreamingQuery.runId
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.runId"),

// [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
Expand Down
19 changes: 17 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,29 @@ def __init__(self, jsq):
@property
@since(2.0)
def id(self):
"""The id of the streaming query.
"""Returns the unique id of this query that persists across restarts from checkpoint data.
That is, this id is generated when a query is started for the first time, and
will be the same every time it is restarted from checkpoint data.
There can only be one query with the same id active in a Spark cluster.
Also see, `runId`.
"""
return self._jsq.id().toString()

@property
@since(2.1)
def runId(self):
"""Returns the unique id of this query that does not persist across restarts. That is, every
query that is started (or restarted from checkpoint) will have a different runId.
"""
return self._jsq.runId().toString()

@property
@since(2.0)
def name(self):
"""The name of the streaming query. This name is unique across all active queries.
"""Returns the user-specified name of the query, or null if not specified.
This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
as `dataframe.writeStream.queryName("query").start()`.
This name, if set, must be unique across all active queries.
"""
return self._jsq.name()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.sql.execution.streaming

import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization


/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*/
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[String] = None) {
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {

/**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
Expand Down Expand Up @@ -54,6 +57,26 @@ object OffsetSeq {
* `nulls` in the sequence are converted to `None`s.
*/
def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
OffsetSeq(offsets.map(Option(_)), metadata)
OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply))
}
}


/**
* Contains metadata associated with a [[OffsetSeq]]. This information is
* persisted to the offset log in the checkpoint location via the [[OffsetSeq]] metadata field.
*
* @param batchWatermarkMs: The current eventTime watermark, used to
* bound the lateness of data that will processed. Time unit: milliseconds
* @param batchTimestampMs: The current batch processing timestamp.
* Time unit: milliseconds
*/
case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this in its own file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. But its a small class and closely tied with OffsetSeq, so I thought its not worth having a separate file for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not worth moving these 6 lines of code in a new file.

def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
}

object OffsetSeqMetadata {
private implicit val format = Serialization.formats(NoTypeHints)
def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
}

Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)

// write metadata
out.write('\n')
out.write(offsetSeq.metadata.getOrElse("").getBytes(UTF_8))
out.write(offsetSeq.metadata.map(_.json).getOrElse("").getBytes(UTF_8))

// write offsets, one per line
offsetSeq.offsets.map(_.map(_.json)).foreach { offset =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ trait ProgressReporter extends Logging {

// Internal state of the stream, required for computing metrics.
protected def id: UUID
protected def runId: UUID
protected def name: String
protected def triggerClock: Clock
protected def logicalPlan: LogicalPlan
Expand All @@ -52,7 +53,7 @@ trait ProgressReporter extends Logging {
protected def committedOffsets: StreamProgress
protected def sources: Seq[Source]
protected def sink: Sink
protected def streamExecutionMetadata: StreamExecutionMetadata
protected def offsetSeqMetadata: OffsetSeqMetadata
protected def currentBatchId: Long
protected def sparkSession: SparkSession

Expand Down Expand Up @@ -134,11 +135,12 @@ trait ProgressReporter extends Logging {

val newProgress = new StreamingQueryProgress(
id = id,
runId = runId,
name = name,
timestamp = currentTriggerStartTimestamp,
batchId = currentBatchId,
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
currentWatermark = streamExecutionMetadata.batchWatermarkMs,
currentWatermark = offsetSeqMetadata.batchWatermarkMs,
stateOperators = executionStats.stateOperators.toArray,
sources = sourceProgress.toArray,
sink = sinkProgress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.internal.Logging
import org.apache.spark.sql._
Expand Down Expand Up @@ -58,9 +56,6 @@ class StreamExecution(

import org.apache.spark.sql.streaming.StreamingQueryListener._

// TODO: restore this from the checkpoint directory.
override val id: UUID = UUID.randomUUID()

private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay

private val noDataProgressEventInterval =
Expand Down Expand Up @@ -98,8 +93,30 @@ class StreamExecution(
/** The current batchId or -1 if execution has not yet been initialized. */
protected var currentBatchId: Long = -1

/** Stream execution metadata */
protected var streamExecutionMetadata = StreamExecutionMetadata()
/** Metadata associated with the whole query */
protected val streamMetadata: StreamMetadata = {
val metadataPath = new Path(checkpointFile("metadata"))
val hadoopConf = sparkSession.sessionState.newHadoopConf()
StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
newMetadata
}
}

/** Metadata associated with the offset seq of a batch in the query. */
protected var offsetSeqMetadata = OffsetSeqMetadata()

override val id: UUID = UUID.fromString(streamMetadata.id)

override val runId: UUID = UUID.randomUUID

/**
* Pretty identified string of printing in logs. Format is
* If name is set "queryName [id = xyz, runId = abc]" else "[id = xyz, runId = abc]"
*/
private val prettyIdString =
Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"

/** All stream sources present in the query plan. */
protected val sources =
Expand Down Expand Up @@ -128,16 +145,17 @@ class StreamExecution(
/* Get the call site in the caller thread; will pass this into the micro batch thread */
private val callSite = Utils.getCallSite()

/** Used to report metrics to coda-hale. */
lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name")
/** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */
lazy val streamMetrics = new MetricsReporter(
this, s"spark.streaming.${Option(name).getOrElse(id)}")

/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
*/
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $name") {
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
Expand Down Expand Up @@ -191,7 +209,7 @@ class StreamExecution(
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
}

postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception.
postEvent(new QueryStartedEvent(id, runId, name)) // Assumption: Does not throw exception.

// Unblock starting thread
startLatch.countDown()
Expand Down Expand Up @@ -261,10 +279,10 @@ class StreamExecution(
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
this,
s"Query $name terminated with exception: ${e.getMessage}",
s"Query $prettyIdString terminated with exception: ${e.getMessage}",
e,
committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
committedOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString,
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata).toString)
logError(s"Query $name terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
Expand All @@ -282,7 +300,7 @@ class StreamExecution(
// Notify others
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(
new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString)))
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
Expand All @@ -301,9 +319,9 @@ class StreamExecution(
logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
availableOffsets = nextOffsets.toStreamProgress(sources)
streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse("{}"))
offsetSeqMetadata = nextOffsets.metadata.getOrElse(OffsetSeqMetadata())
logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
s"at batch timestamp ${streamExecutionMetadata.batchTimestampMs}")
s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")

offsetLog.get(batchId - 1).foreach {
case lastOffsets =>
Expand Down Expand Up @@ -359,15 +377,15 @@ class StreamExecution(
}
if (hasNewData) {
// Current batch timestamp in milliseconds
streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis()
offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
updateStatusMessage("Writing offsets to log")
reportTimeTaken("walCommit") {
assert(offsetLog.add(
currentBatchId,
availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)),
availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId. " +
s"Metadata ${streamExecutionMetadata.toString}")
s"Metadata ${offsetSeqMetadata.toString}")

// NOTE: The following code is correct because runBatches() processes exactly one
// batch at a time. If we add pipeline parallelism (multiple batches in flight at
Expand Down Expand Up @@ -437,21 +455,21 @@ class StreamExecution(
val triggerLogicalPlan = withNewSources transformAllExpressions {
case a: Attribute if replacementMap.contains(a) => replacementMap(a)
case ct: CurrentTimestamp =>
CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs,
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType)
case cd: CurrentDate =>
CurrentBatchTimestamp(streamExecutionMetadata.batchTimestampMs,
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType)
}

val executedPlan = reportTimeTaken("queryPlanning") {
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSession,
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
currentBatchId,
streamExecutionMetadata.batchWatermarkMs)
offsetSeqMetadata.batchWatermarkMs)
lastExecution.executedPlan // Force the lazy generation of execution plan
}

Expand All @@ -468,12 +486,12 @@ class StreamExecution(
logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
(e.maxEventTime.value / 1000) - e.delay.milliseconds()
}.headOption.foreach { newWatermark =>
if (newWatermark > streamExecutionMetadata.batchWatermarkMs) {
if (newWatermark > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermark ms")
streamExecutionMetadata.batchWatermarkMs = newWatermark
offsetSeqMetadata.batchWatermarkMs = newWatermark
} else {
logTrace(s"Event time didn't move: $newWatermark < " +
s"$streamExecutionMetadata.currentEventTimeWatermark")
s"$offsetSeqMetadata.currentEventTimeWatermark")
}
}

Expand Down Expand Up @@ -503,7 +521,7 @@ class StreamExecution(
microBatchThread.join()
}
uniqueSources.foreach(_.stop())
logInfo(s"Query $name was stopped")
logInfo(s"Query $prettyIdString was stopped")
}

/**
Expand Down Expand Up @@ -594,7 +612,7 @@ class StreamExecution(
override def explain(): Unit = explain(extended = false)

override def toString: String = {
s"Streaming Query - $name [state = $state]"
s"Streaming Query $prettyIdString [state = $state]"
}

def toDebugString: String = {
Expand All @@ -603,7 +621,7 @@ class StreamExecution(
} else ""
s"""
|=== Streaming Query ===
|Name: $name
|Identifier: $prettyIdString
|Current Offsets: $committedOffsets
|
|Current State: $state
Expand All @@ -622,33 +640,6 @@ class StreamExecution(
case object TERMINATED extends State
}

/**
* Contains metadata associated with a stream execution. This information is
* persisted to the offset log via the OffsetSeq metadata field. Current
* information contained in this object includes:
*
* @param batchWatermarkMs: The current eventTime watermark, used to
* bound the lateness of data that will processed. Time unit: milliseconds
* @param batchTimestampMs: The current batch processing timestamp.
* Time unit: milliseconds
*/
case class StreamExecutionMetadata(
var batchWatermarkMs: Long = 0,
var batchTimestampMs: Long = 0) {
private implicit val formats = StreamExecutionMetadata.formats

/**
* JSON string representation of this object.
*/
def json: String = Serialization.write(this)
}

object StreamExecutionMetadata {
private implicit val formats = Serialization.formats(NoTypeHints)

def apply(json: String): StreamExecutionMetadata =
Serialization.read[StreamExecutionMetadata](json)
}

/**
* A special thread to run the stream query. Some codes require to run in the StreamExecutionThread
Expand Down
Loading