Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets._

import scala.io.{Source => IOSource}

import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.sql.SparkSession

/**
Expand All @@ -43,36 +46,28 @@ import org.apache.spark.sql.SparkSession
* line 2: metadata (optional json string)
*/
class CommitLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[String](sparkSession, path) {
extends HDFSMetadataLog[CommitMetadata](sparkSession, path) {

import CommitLog._

def add(batchId: Long): Unit = {
super.add(batchId, EMPTY_JSON)
}

override def add(batchId: Long, metadata: String): Boolean = {
throw new UnsupportedOperationException(
"CommitLog does not take any metadata, use 'add(batchId)' instead")
}

override protected def deserialize(in: InputStream): String = {
override protected def deserialize(in: InputStream): CommitMetadata = {
// called inside a try-finally where the underlying stream is closed in the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file in the offset commit log")
}
parseVersion(lines.next.trim, VERSION)
EMPTY_JSON
val metadataJson = if (lines.hasNext) lines.next else EMPTY_JSON
CommitMetadata(metadataJson)
}

override protected def serialize(metadata: String, out: OutputStream): Unit = {
override protected def serialize(metadata: CommitMetadata, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(s"v${VERSION}".getBytes(UTF_8))
out.write('\n')

// write metadata
out.write(EMPTY_JSON.getBytes(UTF_8))
out.write(metadata.json.getBytes(UTF_8))
}
}

Expand All @@ -81,3 +76,13 @@ object CommitLog {
private val EMPTY_JSON = "{}"
}


case class CommitMetadata(nextBatchWatermarkMs: Long = 0) {
def json: String = Serialization.write(this)(CommitMetadata.format)
}

object CommitMetadata {
implicit val format = Serialization.formats(NoTypeHints)

def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json)
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class MicroBatchExecution(
* latest batch id in the offset log, then we can safely move to the next batch
* i.e., committedBatchId + 1 */
commitLog.getLatest() match {
case Some((latestCommittedBatchId, _)) =>
case Some((latestCommittedBatchId, commitMetadata)) =>
if (latestBatchId == latestCommittedBatchId) {
/* The last batch was successfully committed, so we can safely process a
* new next batch but first:
Expand All @@ -283,7 +283,8 @@ class MicroBatchExecution(
currentBatchId = latestCommittedBatchId + 1
isCurrentBatchConstructed = false
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
watermarkTracker.setWatermark(
math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs))
} else if (latestCommittedBatchId < latestBatchId - 1) {
logWarning(s"Batch completion log latest batch id is " +
s"${latestCommittedBatchId}, which is not trailing " +
Expand Down Expand Up @@ -533,11 +534,11 @@ class MicroBatchExecution(
}

withProgressLocked {
commitLog.add(currentBatchId)
watermarkTracker.updateWatermark(lastExecution.executedPlan)
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
committedOffsets ++= availableOffsets
awaitProgressLockCondition.signalAll()
}
watermarkTracker.updateWatermark(lastExecution.executedPlan)
logDebug(s"Completed batch ${currentBatchId}")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class ContinuousExecution(
assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
synchronized {
if (queryExecutionThread.isAlive) {
commitLog.add(epoch)
commitLog.add(epoch, CommitMetadata())
val offset =
continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
committedOffsets ++= Seq(continuousSources(0) -> offset)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"73f7f943-0a08-4ffb-a504-9fa88ff7612a"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1531991874513,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":5000,"batchTimestampMs":1531991878604,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
1
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -127,31 +127,133 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
testStream(aggWithWatermark)(
AddData(inputData2, 15),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(15))
assert(e.get("min") === formatTimestamp(15))
assert(e.get("avg") === formatTimestamp(15))
assert(e.get("watermark") === formatTimestamp(0))
},
assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0),
AddData(inputData2, 10, 12, 14),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(14))
assert(e.get("min") === formatTimestamp(10))
assert(e.get("avg") === formatTimestamp(12))
assert(e.get("watermark") === formatTimestamp(5))
},
assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5),
AddData(inputData2, 25),
CheckAnswer((10, 3)),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(25))
assert(e.get("min") === formatTimestamp(25))
assert(e.get("avg") === formatTimestamp(25))
assert(e.get("watermark") === formatTimestamp(5))
}
assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5)
)
}

test("event time and watermark metrics with Trigger.Once (SPARK-24699)") {
// All event time metrics where watermarking is set
val inputData = MemoryStream[Int]
val aggWithWatermark = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])

// Unlike the ProcessingTime trigger, Trigger.Once only runs one trigger every time
// the query is started and it does not run no-data batches. Hence the answer generated
// by the updated watermark is only generated the next time the query is started.
// Also, the data to process in the next trigger is added *before* starting the stream in
// Trigger.Once to ensure that first and only trigger picks up the new data.

testStream(aggWithWatermark)(
StartStream(Trigger.Once), // to make sure the query is not running when adding data 1st time
awaitTermination(),

AddData(inputData, 15),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer(),
assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0),
// watermark should be updated to 15 - 10 = 5

AddData(inputData, 10, 12, 14),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer(),
assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5),
// watermark should stay at 5

AddData(inputData, 25),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer(),
assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5),
// watermark should be updated to 25 - 10 = 15

AddData(inputData, 50),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer((10, 3)), // watermark = 15 is used to generate this
assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15),
// watermark should be updated to 50 - 10 = 40

AddData(inputData, 50),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this
assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40))
}

test("recovery from Spark ver 2.3.1 commit log without commit metadata (SPARK-24699)") {
// All event time metrics where watermarking is set
val inputData = MemoryStream[Int]
val aggWithWatermark = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])


val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/").toURI

val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)

inputData.addData(15)
inputData.addData(10, 12, 14)

testStream(aggWithWatermark)(
/*

Note: The checkpoint was generated using the following input in Spark version 2.3.1

StartStream(checkpointLocation = "./sql/core/src/test/resources/structured-streaming/" +
"checkpoint-version-2.3.1-without-commit-log-metadata/")),
AddData(inputData, 15), // watermark should be updated to 15 - 10 = 5
CheckAnswer(),
AddData(inputData, 10, 12, 14), // watermark should stay at 5
CheckAnswer(),
StopStream,

// Offset log should have watermark recorded as 5.
*/

StartStream(Trigger.Once),
awaitTermination(),

AddData(inputData, 25),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
awaitTermination(),
CheckNewAnswer(),
assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5),
// watermark should be updated to 25 - 10 = 15

AddData(inputData, 50),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
awaitTermination(),
CheckNewAnswer((10, 3)), // watermark = 15 is used to generate this
assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15),
// watermark should be updated to 50 - 10 = 40

AddData(inputData, 50),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
awaitTermination(),
CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this
assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40))
}

test("append mode") {
val inputData = MemoryStream[Int]

Expand Down Expand Up @@ -625,10 +727,20 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
true
}

/** Assert event stats generated on that last batch with data in it */
private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
AssertOnQuery { q =>
Execute("AssertEventStats") { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
true
}
}

/** Assert event stats generated on that last batch with data in it */
private def assertEventStats(min: Long, max: Long, avg: Double, wtrmark: Long): AssertOnQuery = {
assertEventStats { e =>
assert(e.get("min") === formatTimestamp(min), s"min value mismatch")
assert(e.get("max") === formatTimestamp(max), s"max value mismatch")
assert(e.get("avg") === formatTimestamp(avg.toLong), s"avg value mismatch")
assert(e.get("watermark") === formatTimestamp(wtrmark), s"watermark value mismatch")
}
}

Expand All @@ -638,4 +750,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
private def formatTimestamp(sec: Long): String = {
timestampFormat.format(new ju.Date(sec * 1000))
}

private def awaitTermination(): AssertOnQuery = Execute("AwaitTermination") { q =>
q.awaitTermination()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be

/** Execute arbitrary code */
object Execute {
def apply(func: StreamExecution => Any): AssertOnQuery =
AssertOnQuery(query => { func(query); true }, "Execute")
def apply(name: String)(func: StreamExecution => Any): AssertOnQuery =
AssertOnQuery(query => { func(query); true }, "name")

def apply(func: StreamExecution => Any): AssertOnQuery = apply("Execute")(func)
}

object AwaitEpoch {
Expand Down Expand Up @@ -512,7 +514,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
logInfo(s"Processing test stream action: $action")
action match {
case StartStream(trigger, triggerClock, additionalConfs, checkpointLocation) =>
verify(currentStream == null, "stream already running")
verify(currentStream == null || !currentStream.isActive, "stream already running")
verify(triggerClock.isInstanceOf[SystemClock]
|| triggerClock.isInstanceOf[StreamManualClock],
"Use either SystemClock or StreamManualClock to start the stream")
Expand Down