diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala index 5b114242558dc..0063318db332d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala @@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + import org.apache.spark.sql.SparkSession /** @@ -43,36 +46,28 @@ import org.apache.spark.sql.SparkSession * line 2: metadata (optional json string) */ class CommitLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[String](sparkSession, path) { + extends HDFSMetadataLog[CommitMetadata](sparkSession, path) { import CommitLog._ - def add(batchId: Long): Unit = { - super.add(batchId, EMPTY_JSON) - } - - override def add(batchId: Long, metadata: String): Boolean = { - throw new UnsupportedOperationException( - "CommitLog does not take any metadata, use 'add(batchId)' instead") - } - - override protected def deserialize(in: InputStream): String = { + override protected def deserialize(in: InputStream): CommitMetadata = { // called inside a try-finally where the underlying stream is closed in the caller val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file in the offset commit log") } parseVersion(lines.next.trim, VERSION) - EMPTY_JSON + val metadataJson = if (lines.hasNext) lines.next else EMPTY_JSON + CommitMetadata(metadataJson) } - override protected def serialize(metadata: String, out: OutputStream): Unit = { + override protected def serialize(metadata: CommitMetadata, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller out.write(s"v${VERSION}".getBytes(UTF_8)) out.write('\n') // write metadata - out.write(EMPTY_JSON.getBytes(UTF_8)) + out.write(metadata.json.getBytes(UTF_8)) } } @@ -81,3 +76,13 @@ object CommitLog { private val EMPTY_JSON = "{}" } + +case class CommitMetadata(nextBatchWatermarkMs: Long = 0) { + def json: String = Serialization.write(this)(CommitMetadata.format) +} + +object CommitMetadata { + implicit val format = Serialization.formats(NoTypeHints) + + def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 16651dd060d73..e218a5e85e05f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -265,7 +265,7 @@ class MicroBatchExecution( * latest batch id in the offset log, then we can safely move to the next batch * i.e., committedBatchId + 1 */ commitLog.getLatest() match { - case Some((latestCommittedBatchId, _)) => + case Some((latestCommittedBatchId, commitMetadata)) => if (latestBatchId == latestCommittedBatchId) { /* The last batch was successfully committed, so we can safely process a * new next batch but first: @@ -283,7 +283,8 @@ class MicroBatchExecution( currentBatchId = latestCommittedBatchId + 1 isCurrentBatchConstructed = false committedOffsets ++= availableOffsets - // Construct a new batch be recomputing availableOffsets + watermarkTracker.setWatermark( + math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs)) } else if (latestCommittedBatchId < latestBatchId - 1) { logWarning(s"Batch completion log latest batch id is " + s"${latestCommittedBatchId}, which is not trailing " + @@ -533,11 +534,11 @@ class MicroBatchExecution( } withProgressLocked { - commitLog.add(currentBatchId) + watermarkTracker.updateWatermark(lastExecution.executedPlan) + commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) committedOffsets ++= availableOffsets awaitProgressLockCondition.signalAll() } - watermarkTracker.updateWatermark(lastExecution.executedPlan) logDebug(s"Completed batch ${currentBatchId}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index a0bb8292d7766..b07934ac3a5b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -311,7 +311,7 @@ class ContinuousExecution( assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit") synchronized { if (queryExecutionThread.isAlive) { - commitLog.add(epoch) + commitLog.add(epoch, CommitMetadata()) val offset = continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json) committedOffsets ++= Seq(continuousSources(0) -> offset) diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/0 new file mode 100644 index 0000000000000..83321cd95eb0c --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/0 @@ -0,0 +1,2 @@ +v1 +{} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/1 new file mode 100644 index 0000000000000..83321cd95eb0c --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/commits/1 @@ -0,0 +1,2 @@ +v1 +{} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/metadata b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/metadata new file mode 100644 index 0000000000000..f205857e6876f --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/metadata @@ -0,0 +1 @@ +{"id":"73f7f943-0a08-4ffb-a504-9fa88ff7612a"} \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/0 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/0 new file mode 100644 index 0000000000000..8fa80bedc2285 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/0 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":0,"batchTimestampMs":1531991874513,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}} +0 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/1 b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/1 new file mode 100644 index 0000000000000..2248a58fea006 --- /dev/null +++ b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/offsets/1 @@ -0,0 +1,3 @@ +v1 +{"batchWatermarkMs":5000,"batchTimestampMs":1531991878604,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}} +1 \ No newline at end of file diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/1.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/2.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/0/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/1.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/2.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/1/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/1.delta new file mode 100644 index 0000000000000..171aa58a06e21 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/2.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/2/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/1.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/2.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/3/2.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/1.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/1.delta new file mode 100644 index 0000000000000..6352978051846 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/1.delta differ diff --git a/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/2.delta b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/2.delta new file mode 100644 index 0000000000000..cfb3a481deb59 Binary files /dev/null and b/sql/core/src/test/resources/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/state/0/4/2.delta differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 58ed9790ea123..026af17c7b23f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -127,31 +127,133 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche testStream(aggWithWatermark)( AddData(inputData2, 15), CheckAnswer(), - assertEventStats { e => - assert(e.get("max") === formatTimestamp(15)) - assert(e.get("min") === formatTimestamp(15)) - assert(e.get("avg") === formatTimestamp(15)) - assert(e.get("watermark") === formatTimestamp(0)) - }, + assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0), AddData(inputData2, 10, 12, 14), CheckAnswer(), - assertEventStats { e => - assert(e.get("max") === formatTimestamp(14)) - assert(e.get("min") === formatTimestamp(10)) - assert(e.get("avg") === formatTimestamp(12)) - assert(e.get("watermark") === formatTimestamp(5)) - }, + assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5), AddData(inputData2, 25), CheckAnswer((10, 3)), - assertEventStats { e => - assert(e.get("max") === formatTimestamp(25)) - assert(e.get("min") === formatTimestamp(25)) - assert(e.get("avg") === formatTimestamp(25)) - assert(e.get("watermark") === formatTimestamp(5)) - } + assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5) ) } + test("event time and watermark metrics with Trigger.Once (SPARK-24699)") { + // All event time metrics where watermarking is set + val inputData = MemoryStream[Int] + val aggWithWatermark = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + // Unlike the ProcessingTime trigger, Trigger.Once only runs one trigger every time + // the query is started and it does not run no-data batches. Hence the answer generated + // by the updated watermark is only generated the next time the query is started. + // Also, the data to process in the next trigger is added *before* starting the stream in + // Trigger.Once to ensure that first and only trigger picks up the new data. + + testStream(aggWithWatermark)( + StartStream(Trigger.Once), // to make sure the query is not running when adding data 1st time + awaitTermination(), + + AddData(inputData, 15), + StartStream(Trigger.Once), + awaitTermination(), + CheckNewAnswer(), + assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0), + // watermark should be updated to 15 - 10 = 5 + + AddData(inputData, 10, 12, 14), + StartStream(Trigger.Once), + awaitTermination(), + CheckNewAnswer(), + assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5), + // watermark should stay at 5 + + AddData(inputData, 25), + StartStream(Trigger.Once), + awaitTermination(), + CheckNewAnswer(), + assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5), + // watermark should be updated to 25 - 10 = 15 + + AddData(inputData, 50), + StartStream(Trigger.Once), + awaitTermination(), + CheckNewAnswer((10, 3)), // watermark = 15 is used to generate this + assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15), + // watermark should be updated to 50 - 10 = 40 + + AddData(inputData, 50), + StartStream(Trigger.Once), + awaitTermination(), + CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this + assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40)) + } + + test("recovery from Spark ver 2.3.1 commit log without commit metadata (SPARK-24699)") { + // All event time metrics where watermarking is set + val inputData = MemoryStream[Int] + val aggWithWatermark = inputData.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .groupBy(window($"eventTime", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + + + val resourceUri = this.getClass.getResource( + "/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/").toURI + + val checkpointDir = Utils.createTempDir().getCanonicalFile + // Copy the checkpoint to a temp dir to prevent changes to the original. + // Not doing this will lead to the test passing on the first run, but fail subsequent runs. + FileUtils.copyDirectory(new File(resourceUri), checkpointDir) + + inputData.addData(15) + inputData.addData(10, 12, 14) + + testStream(aggWithWatermark)( + /* + + Note: The checkpoint was generated using the following input in Spark version 2.3.1 + + StartStream(checkpointLocation = "./sql/core/src/test/resources/structured-streaming/" + + "checkpoint-version-2.3.1-without-commit-log-metadata/")), + AddData(inputData, 15), // watermark should be updated to 15 - 10 = 5 + CheckAnswer(), + AddData(inputData, 10, 12, 14), // watermark should stay at 5 + CheckAnswer(), + StopStream, + + // Offset log should have watermark recorded as 5. + */ + + StartStream(Trigger.Once), + awaitTermination(), + + AddData(inputData, 25), + StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), + awaitTermination(), + CheckNewAnswer(), + assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5), + // watermark should be updated to 25 - 10 = 15 + + AddData(inputData, 50), + StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), + awaitTermination(), + CheckNewAnswer((10, 3)), // watermark = 15 is used to generate this + assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15), + // watermark should be updated to 50 - 10 = 40 + + AddData(inputData, 50), + StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), + awaitTermination(), + CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this + assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40)) + } + test("append mode") { val inputData = MemoryStream[Int] @@ -625,10 +727,20 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche true } + /** Assert event stats generated on that last batch with data in it */ private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = { - AssertOnQuery { q => + Execute("AssertEventStats") { q => body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) - true + } + } + + /** Assert event stats generated on that last batch with data in it */ + private def assertEventStats(min: Long, max: Long, avg: Double, wtrmark: Long): AssertOnQuery = { + assertEventStats { e => + assert(e.get("min") === formatTimestamp(min), s"min value mismatch") + assert(e.get("max") === formatTimestamp(max), s"max value mismatch") + assert(e.get("avg") === formatTimestamp(avg.toLong), s"avg value mismatch") + assert(e.get("watermark") === formatTimestamp(wtrmark), s"watermark value mismatch") } } @@ -638,4 +750,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche private def formatTimestamp(sec: Long): String = { timestampFormat.format(new ju.Date(sec * 1000)) } + + private def awaitTermination(): AssertOnQuery = Execute("AwaitTermination") { q => + q.awaitTermination() + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 4c3fd58cb2e45..df22bc1315b7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -291,8 +291,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be /** Execute arbitrary code */ object Execute { - def apply(func: StreamExecution => Any): AssertOnQuery = - AssertOnQuery(query => { func(query); true }, "Execute") + def apply(name: String)(func: StreamExecution => Any): AssertOnQuery = + AssertOnQuery(query => { func(query); true }, "name") + + def apply(func: StreamExecution => Any): AssertOnQuery = apply("Execute")(func) } object AwaitEpoch { @@ -512,7 +514,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be logInfo(s"Processing test stream action: $action") action match { case StartStream(trigger, triggerClock, additionalConfs, checkpointLocation) => - verify(currentStream == null, "stream already running") + verify(currentStream == null || !currentStream.isActive, "stream already running") verify(triggerClock.isInstanceOf[SystemClock] || triggerClock.isInstanceOf[StreamManualClock], "Use either SystemClock or StreamManualClock to start the stream")