Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock

/**
Expand Down Expand Up @@ -55,6 +56,7 @@ trait ProgressReporter extends Logging {
protected def streamExecutionMetadata: StreamExecutionMetadata
protected def currentBatchId: Long
protected def sparkSession: SparkSession
protected def postEvent(event: StreamingQueryListener.Event): Unit

// Local timestamps and counters.
private var currentTriggerStartTimestamp = -1L
Expand All @@ -69,6 +71,12 @@ trait ProgressReporter extends Logging {
/** Holds the most recent query progress updates. Accesses must lock on the queue itself. */
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()

private val noDataProgressEventInterval =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval

// The timestamp we report an event that has no input data
private var lastNoDataProgressEventTime = Long.MinValue

@volatile
protected var currentStatus: StreamingQueryStatus = {
new StreamingQueryStatus(
Expand Down Expand Up @@ -99,6 +107,17 @@ trait ProgressReporter extends Logging {
currentDurationsMs.clear()
}

private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
progressBuffer.synchronized {
progressBuffer += newProgress
while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
progressBuffer.dequeue()
}
}
postEvent(new QueryProgressEvent(newProgress))
logInfo(s"Streaming query made progress: $newProgress")
}

/** Finalizes the query progress and adds it to list of recent status updates. */
protected def finishTrigger(hasNewData: Boolean): Unit = {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
Expand Down Expand Up @@ -143,14 +162,18 @@ trait ProgressReporter extends Logging {
sources = sourceProgress.toArray,
sink = sinkProgress)

progressBuffer.synchronized {
progressBuffer += newProgress
while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
progressBuffer.dequeue()
if (hasNewData) {
// Reset noDataEventTimestamp if we processed any data
lastNoDataProgressEventTime = Long.MinValue
updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
lastNoDataProgressEventTime = now
updateProgress(newProgress)
}
}

logInfo(s"Streaming query made progress: $newProgress")
currentStatus = currentStatus.copy(isTriggerActive = false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ class StreamExecution(

private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay

private val noDataProgressEventInterval =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval

/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
Expand Down Expand Up @@ -199,9 +196,6 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SparkSession.setActiveSession(sparkSession)

// The timestamp we report an event that has no input data
var lastNoDataProgressEventTime = Long.MinValue

triggerExecutor.execute(() => {
startTrigger()

Expand All @@ -224,18 +218,6 @@ class StreamExecution(

// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Reset noDataEventTimestamp if we processed any data
lastNoDataProgressEventTime = Long.MinValue
postEvent(new QueryProgressEvent(lastProgress))
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
lastNoDataProgressEventTime = now
postEvent(new QueryProgressEvent(lastProgress))
}
}

if (dataAvailable) {
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
Expand Down Expand Up @@ -486,7 +468,7 @@ class StreamExecution(
}
}

private def postEvent(event: StreamingQueryListener.Event) {
override protected def postEvent(event: StreamingQueryListener.Event): Unit = {
sparkSession.streams.postListenerEvent(event)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
true
}
// `recentProgresses` should not receive too many no data events
actions += AssertOnQuery { q =>
q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
}
testStream(input.toDS)(actions: _*)
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// 11 is the max value of the possible numbers of events.
Expand Down