From 1b42cc4a449248da65402a6ea2112c55a3bb8501 Mon Sep 17 00:00:00 2001 From: Chris Horn Date: Fri, 29 Jun 2018 18:54:45 -0400 Subject: [PATCH 1/3] a failing test case --- .../streaming/EventTimeWatermarkSuite.scala | 109 +++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 7e8fde1ff8e5..28b06aea6a4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -24,7 +24,7 @@ import java.util.{Calendar, Date} import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, Dataset} import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} @@ -458,6 +458,96 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } } + test("SPARK-24699: watermark should behave the same for Trigger ProcessingTime / Once") { + val watermarkSeconds = 2 + val windowSeconds = 5 + def windowAggregation(body: (MemoryStream[Int], Dataset[(Long, Long)]) => Unit): Unit = { + val source = MemoryStream[Int] + val sink = { + source + .toDF() + .withColumn("eventTime", 'value cast "timestamp") + .withWatermark("eventTime", s"$watermarkSeconds seconds") + .groupBy(window($"eventTime", s"$windowSeconds seconds") as 'window) + .count() + .select('window.getField("start").cast("long").as[Long], 'count.as[Long]) + } + body(source, sink) + } + val (one, two, three) = ( + Seq(1, 1, 2, 3, 4, 4, 6), + Seq(7, 8, 9), + Seq(11, 12, 13, 14, 14) + ) + val (resultsAfterOne, resultsAfterTwo, resultsAfterThree) = ( + CheckAnswer(), + CheckAnswer(0 -> 6), + CheckAnswer(0 -> 6, 5 -> 4) + ) + val (statsAfterOne, statsAfterTwo, statsAfterThree) = ( + checkEventStats( + min = one.min, + max = one.max, + avg = one.sum.toDouble / one.size, + watermark = 0, + "first" + ), + checkEventStats( + min = two.min, + max = two.max, + avg = two.sum.toDouble / two.size, + watermark = one.max - watermarkSeconds, + "second" + ), + checkEventStats( + min = three.min, + max = three.max, + avg = three.sum.toDouble / three.size, + watermark = two.max - watermarkSeconds, + "third" + ) + ) + + // check assertions using normal Trigger.Processing + windowAggregation { + (source, sink) => testStream(sink)( + AddData(source, one: _*), + resultsAfterOne, + statsAfterOne, + + AddData(source, two: _*), + resultsAfterTwo, + statsAfterTwo, + + AddData(source, three: _*), + resultsAfterThree, + statsAfterThree + ) + } + + // check assertions while using Trigger.Once + windowAggregation { + (source, sink) => testStream(sink)( + AddData(source, one: _*), + StartStream(Trigger.Once), + resultsAfterOne, + statsAfterOne, + StopStream, + + AddData(source, two: _*), + StartStream(Trigger.Once), + resultsAfterTwo, + statsAfterTwo, + StopStream, + + AddData(source, three: _*), + StartStream(Trigger.Once), + resultsAfterThree, + statsAfterThree + ) + } + } + test("test no-data flag") { val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key @@ -484,6 +574,19 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche testWithFlag(false) } + private def checkEventStats( + min: Long, + max: Long, + avg: Double, + watermark: Long, + name: String = "event stats" + ): AssertOnQuery = assertEventStats { e => + assert(e.get("min") === formatTimestamp(min), s"[$name]: min value") + assert(e.get("max") === formatTimestamp(max), s"[$name]: max value") + assert(e.get("avg") === formatTimestamp(avg), s"[$name]: avg value") + assert(e.get("watermark") === formatTimestamp(watermark), s"[$name]: watermark value") + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => q.processAllAvailable() val progressWithData = q.recentProgress.lastOption.get @@ -504,4 +607,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche private def formatTimestamp(sec: Long): String = { timestampFormat.format(new ju.Date(sec * 1000)) } + + private def formatTimestamp(sec: Double): String = { + timestampFormat.format(new ju.Date((sec * 1000).toLong)) + } } From c746a15fa86be3d43bf8b36f4d7fae8cf1e836b5 Mon Sep 17 00:00:00 2001 From: Chris Horn Date: Mon, 2 Jul 2018 17:13:26 -0400 Subject: [PATCH 2/3] make it clear that Trigger variations are being tested --- .../streaming/EventTimeWatermarkSuite.scala | 62 ++++++++----------- 1 file changed, 25 insertions(+), 37 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 28b06aea6a4f..48c1177cd6fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -508,43 +508,31 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) ) - // check assertions using normal Trigger.Processing - windowAggregation { - (source, sink) => testStream(sink)( - AddData(source, one: _*), - resultsAfterOne, - statsAfterOne, - - AddData(source, two: _*), - resultsAfterTwo, - statsAfterTwo, - - AddData(source, three: _*), - resultsAfterThree, - statsAfterThree - ) - } - - // check assertions while using Trigger.Once - windowAggregation { - (source, sink) => testStream(sink)( - AddData(source, one: _*), - StartStream(Trigger.Once), - resultsAfterOne, - statsAfterOne, - StopStream, - - AddData(source, two: _*), - StartStream(Trigger.Once), - resultsAfterTwo, - statsAfterTwo, - StopStream, - - AddData(source, three: _*), - StartStream(Trigger.Once), - resultsAfterThree, - statsAfterThree - ) + Seq(Trigger.ProcessingTime(0), Trigger.Once) foreach { trigger => + windowAggregation { + (source, sink) => testStream(sink)( + StartStream(trigger), + StopStream, + + AddData(source, one: _*), + StartStream(trigger), + resultsAfterOne, + statsAfterOne, + StopStream, + + AddData(source, two: _*), + StartStream(trigger), + resultsAfterTwo, + statsAfterTwo, + StopStream, + + AddData(source, three: _*), + StartStream(trigger), + resultsAfterThree, + statsAfterThree, + StopStream + ) + } } } From 7e54a89558543f81dd8f251cd8b4e7f60d56ddde Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 11 Jul 2018 02:14:32 -0700 Subject: [PATCH 3/3] Fixed bug Co-authored-by: Tathagata Das Co-authored-by: c-horn --- .../sql/execution/streaming/CommitLog.scala | 33 +++--- .../streaming/MicroBatchExecution.scala | 10 +- .../continuous/ContinuousExecution.scala | 2 +- .../streaming/EventTimeWatermarkSuite.scala | 101 ++++++++++++++++++ 4 files changed, 127 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala index 5b114242558d..0063318db332 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala @@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets._ import scala.io.{Source => IOSource} +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + import org.apache.spark.sql.SparkSession /** @@ -43,36 +46,28 @@ import org.apache.spark.sql.SparkSession * line 2: metadata (optional json string) */ class CommitLog(sparkSession: SparkSession, path: String) - extends HDFSMetadataLog[String](sparkSession, path) { + extends HDFSMetadataLog[CommitMetadata](sparkSession, path) { import CommitLog._ - def add(batchId: Long): Unit = { - super.add(batchId, EMPTY_JSON) - } - - override def add(batchId: Long, metadata: String): Boolean = { - throw new UnsupportedOperationException( - "CommitLog does not take any metadata, use 'add(batchId)' instead") - } - - override protected def deserialize(in: InputStream): String = { + override protected def deserialize(in: InputStream): CommitMetadata = { // called inside a try-finally where the underlying stream is closed in the caller val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file in the offset commit log") } parseVersion(lines.next.trim, VERSION) - EMPTY_JSON + val metadataJson = if (lines.hasNext) lines.next else EMPTY_JSON + CommitMetadata(metadataJson) } - override protected def serialize(metadata: String, out: OutputStream): Unit = { + override protected def serialize(metadata: CommitMetadata, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller out.write(s"v${VERSION}".getBytes(UTF_8)) out.write('\n') // write metadata - out.write(EMPTY_JSON.getBytes(UTF_8)) + out.write(metadata.json.getBytes(UTF_8)) } } @@ -81,3 +76,13 @@ object CommitLog { private val EMPTY_JSON = "{}" } + +case class CommitMetadata(nextBatchWatermarkMs: Long = 0) { + def json: String = Serialization.write(this)(CommitMetadata.format) +} + +object CommitMetadata { + implicit val format = Serialization.formats(NoTypeHints) + + def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 16651dd060d7..e16f1da6493a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -265,7 +265,7 @@ class MicroBatchExecution( * latest batch id in the offset log, then we can safely move to the next batch * i.e., committedBatchId + 1 */ commitLog.getLatest() match { - case Some((latestCommittedBatchId, _)) => + case Some((latestCommittedBatchId, commitMetadata)) => if (latestBatchId == latestCommittedBatchId) { /* The last batch was successfully committed, so we can safely process a * new next batch but first: @@ -283,7 +283,9 @@ class MicroBatchExecution( currentBatchId = latestCommittedBatchId + 1 isCurrentBatchConstructed = false committedOffsets ++= availableOffsets - // Construct a new batch be recomputing availableOffsets + watermarkTracker.setWatermark( + math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs)) + println(s"Recovered at $currentBatchId with wm ${watermarkTracker.currentWatermark}") } else if (latestCommittedBatchId < latestBatchId - 1) { logWarning(s"Batch completion log latest batch id is " + s"${latestCommittedBatchId}, which is not trailing " + @@ -533,11 +535,11 @@ class MicroBatchExecution( } withProgressLocked { - commitLog.add(currentBatchId) + watermarkTracker.updateWatermark(lastExecution.executedPlan) + commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)) committedOffsets ++= availableOffsets awaitProgressLockCondition.signalAll() } - watermarkTracker.updateWatermark(lastExecution.executedPlan) logDebug(s"Completed batch ${currentBatchId}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index a0bb8292d776..b07934ac3a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -311,7 +311,7 @@ class ContinuousExecution( assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit") synchronized { if (queryExecutionThread.isAlive) { - commitLog.add(epoch) + commitLog.add(epoch, CommitMetadata()) val offset = continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json) committedOffsets ++= Seq(continuousSources(0) -> offset) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 58ed9790ea12..11f8dff66b32 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -462,6 +462,91 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } } + test("SPARK-24699: watermark should behave the same for Trigger ProcessingTime / Once") { + val watermarkSeconds = 2 + val windowSeconds = 5 + val source = MemoryStream[Int] + val df = source + .toDF() + .withColumn("eventTime", 'value cast "timestamp") + .withWatermark("eventTime", s"$watermarkSeconds seconds") + .groupBy(window($"eventTime", s"$windowSeconds seconds") as 'window) + .count() + .select('window.getField("start").cast("long").as[Long], 'count.as[Long]) + + val (one, two, three, four) = ( + Seq(1, 1, 2, 3, 4, 4, 6), + Seq(7, 8, 9), + Seq(11, 12, 13, 14, 14), + Seq(15) + ) + val (resultsAfterOne, resultsAfterTwo, resultsAfterThree, resultsAfterFour) = ( + CheckAnswer(), + CheckAnswer(), + CheckAnswer(0 -> 6), + CheckAnswer(0 -> 6, 5 -> 4) + ) + val (statsAfterOne, statsAfterTwo, statsAfterThree, statsAfterFour) = ( + assertEventStats( + min = one.min, + max = one.max, + avg = one.sum.toDouble / one.size, + watermark = 0, + "first" + ), + assertEventStats( + min = two.min, + max = two.max, + avg = two.sum.toDouble / two.size, + watermark = one.max - watermarkSeconds, + "second" + ), + assertEventStats( + min = three.min, + max = three.max, + avg = three.sum.toDouble / three.size, + watermark = two.max - watermarkSeconds, + "third" + ), + assertEventStats( + min = four.min, + max = four.max, + avg = four.sum.toDouble / four.size, + watermark = three.max - watermarkSeconds, + "fourth" + ) + ) + + testStream(df)( + StartStream(Trigger.Once), + StopStream, + + AddData(source, one: _*), + StartStream(Trigger.Once), + resultsAfterOne, + statsAfterOne, + StopStream, + + AddData(source, two: _*), + StartStream(Trigger.Once), + resultsAfterTwo, + statsAfterTwo, + StopStream, + + AddData(source, three: _*), + StartStream(Trigger.Once), + resultsAfterThree, + statsAfterThree, + StopStream, + + AddData(source, four: _*), + StartStream(Trigger.Once), + resultsAfterFour, + statsAfterFour, + StopStream + ) + } + test("test no-data flag") { val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key @@ -632,10 +717,26 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche } } + private def assertEventStats( + min: Long, + max: Long, + avg: Double, + watermark: Long, + name: String = "event stats"): AssertOnQuery = assertEventStats { e => + assert(e.get("min") === formatTimestamp(min), s"[$name]: min value") + assert(e.get("max") === formatTimestamp(max), s"[$name]: max value") + assert(e.get("avg") === formatTimestamp(avg), s"[$name]: avg value") + assert(e.get("watermark") === formatTimestamp(watermark), s"[$name]: watermark value") + } + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) private def formatTimestamp(sec: Long): String = { timestampFormat.format(new ju.Date(sec * 1000)) } + + private def formatTimestamp(sec: Double): String = { + timestampFormat.format(new ju.Date((sec * 1000).toLong)) + } }