Skip to content

Commit 030e635

Browse files
committed
Update tests. Remove var in OffsetSeqMetadata.
1 parent 1cacd32 commit 030e635

File tree

4 files changed

+92
-42
lines changed

4 files changed

+92
-42
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ object OffsetSeq {
7272
* @param conf: Additional conf_s to be persisted across batches, e.g. number of shuffle partitions.
7373
*/
7474
case class OffsetSeqMetadata(
75-
var batchWatermarkMs: Long = 0,
76-
var batchTimestampMs: Long = 0,
75+
batchWatermarkMs: Long = 0,
76+
batchTimestampMs: Long = 0,
7777
conf: Map[String, String] = Map.empty) {
7878
def json: String = Serialization.write(this)(OffsetSeqMetadata.format)
7979
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -464,25 +464,28 @@ class StreamExecution(
464464
}
465465
}
466466
if (hasNewData) {
467-
// Current batch timestamp in milliseconds
468-
offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis()
467+
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
469468
// Update the eventTime watermark if we find one in the plan.
470469
if (lastExecution != null) {
471470
lastExecution.executedPlan.collect {
472471
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
473472
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
474473
e.eventTimeStats.value.max - e.delayMs
475474
}.headOption.foreach { newWatermarkMs =>
476-
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
475+
if (newWatermarkMs > batchWatermarkMs) {
477476
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
478-
offsetSeqMetadata.batchWatermarkMs = newWatermarkMs
477+
batchWatermarkMs = newWatermarkMs
479478
} else {
480479
logDebug(
481480
s"Event time didn't move: $newWatermarkMs < " +
482-
s"${offsetSeqMetadata.batchWatermarkMs}")
481+
s"$batchWatermarkMs")
483482
}
484483
}
485484
}
485+
offsetSeqMetadata = OffsetSeqMetadata(
486+
batchWatermarkMs,
487+
triggerClock.getTimeMillis(), // Current batch timestamp in milliseconds
488+
offsetSeqMetadata.conf) // Keep the same conf
486489

487490
updateStatusMessage("Writing offsets to log")
488491
reportTimeTaken("walCommit") {

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 80 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
2323
import scala.reflect.ClassTag
2424
import scala.util.control.ControlThrowable
2525

26+
import org.apache.commons.io.FileUtils
27+
2628
import org.apache.spark.sql._
2729
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
2830
import org.apache.spark.sql.execution.command.ExplainCommand
@@ -409,41 +411,88 @@ class StreamSuite extends StreamTest {
409411
CheckAnswer((1, 2), (2, 2), (3, 2)))
410412
}
411413

412-
test("SPARK-19873: backward compat with checkpoints that do not record shuffle partitions") {
413-
val inputData = MemoryStream[Int]
414-
inputData.addData(1, 2, 3, 4)
415-
inputData.addData(3, 4, 5, 6)
416-
inputData.addData(5, 6, 7, 8)
414+
test("SPARK-19873: backward compatibility - recover from a Spark v2.1 checkpoint") {
415+
var inputData: MemoryStream[Int] = null
416+
var query: DataStreamWriter[Row] = null
417417

418-
val resourceUri =
419-
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
420-
val checkpointDir = new File(resourceUri).getCanonicalPath
421-
val query = inputData
422-
.toDF()
423-
.groupBy($"value")
424-
.agg(count("*"))
425-
.writeStream
426-
.queryName("counts")
427-
.outputMode("complete")
428-
.option("checkpointLocation", checkpointDir)
429-
.format("memory")
430-
431-
// Checkpoint data was generated by a query with 10 shuffle partitions.
432-
// Test if recovery from checkpoint is successful.
433-
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
434-
query.start().processAllAvailable()
435-
436-
QueryTest.checkAnswer(spark.table("counts").toDF(),
437-
Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
438-
Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Nil)
418+
def init(): Unit = {
419+
inputData = MemoryStream[Int]
420+
inputData.addData(1, 2, 3, 4)
421+
inputData.addData(3, 4, 5, 6)
422+
inputData.addData(5, 6, 7, 8)
423+
424+
query = inputData
425+
.toDF()
426+
.groupBy($"value")
427+
.agg(count("*"))
428+
.writeStream
429+
.outputMode("complete")
430+
.format("memory")
439431
}
440432

441-
// If the number of partitions is greater, should throw exception.
442-
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
443-
intercept[IllegalArgumentException] {
444-
query.start().processAllAvailable()
433+
// Get an existing checkpoint generated by Spark v2.1.
434+
// v2.1 does not record # shuffle partitions in the offset metadata.
435+
val resourceUri =
436+
this.getClass.getResource("/structured-streaming/checkpoint-version-2.1.0").toURI
437+
val checkpointDir = new File(resourceUri)
438+
439+
// 1 - Test if recovery from the checkpoint is successful.
440+
init()
441+
withTempDir(dir => {
442+
// Copy the checkpoint to a temp dir to prevent changes to the original.
443+
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
444+
FileUtils.copyDirectory(checkpointDir, dir)
445+
446+
// Checkpoint data was generated by a query with 10 shuffle partitions.
447+
// In order to test reading from the checkpoint, the checkpoint must have two or more batches,
448+
// since the last batch may be rerun.
449+
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
450+
var streamingQuery: StreamingQuery = null
451+
try {
452+
streamingQuery =
453+
query
454+
.queryName("counts")
455+
.option("checkpointLocation", dir.getCanonicalPath)
456+
.start()
457+
streamingQuery.processAllAvailable()
458+
inputData.addData(9)
459+
streamingQuery.processAllAvailable()
460+
461+
QueryTest.checkAnswer(spark.table("counts").toDF(),
462+
Row("1", 1) :: Row("2", 1) :: Row("3", 2) :: Row("4", 2) ::
463+
Row("5", 2) :: Row("6", 2) :: Row("7", 1) :: Row("8", 1) :: Row("9", 1) :: Nil)
464+
} finally {
465+
if (streamingQuery ne null) {
466+
streamingQuery.stop()
467+
}
468+
}
445469
}
446-
}
470+
})
471+
472+
// 2 - Check recovery with wrong num shuffle partitions
473+
init()
474+
withTempDir(dir => {
475+
FileUtils.copyDirectory(checkpointDir, dir)
476+
477+
// Since the number of partitions is greater than 10, should throw exception.
478+
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "15") {
479+
var streamingQuery: StreamingQuery = null
480+
try {
481+
intercept[StreamingQueryException] {
482+
streamingQuery =
483+
query
484+
.queryName("badQuery")
485+
.option("checkpointLocation", dir.getCanonicalPath)
486+
.start()
487+
streamingQuery.processAllAvailable()
488+
}
489+
} finally {
490+
if (streamingQuery ne null) {
491+
streamingQuery.stop()
492+
}
493+
}
494+
}
495+
})
447496
}
448497
}
449498

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
152152
AssertOnQuery(q => {
153153
q.exception.get.startOffset ===
154154
q.committedOffsets.toOffsetSeq(
155-
Seq(inputData),
156-
OffsetSeqMetadata(0, 0)).toString &&
155+
Seq(inputData), OffsetSeqMetadata()).toString &&
157156
q.exception.get.endOffset ===
158157
q.availableOffsets.toOffsetSeq(
159-
Seq(inputData),
160-
OffsetSeqMetadata(0, 0)).toString
158+
Seq(inputData), OffsetSeqMetadata()).toString
161159
}, "incorrect start offset or end offset on exception")
162160
)
163161
}

0 commit comments

Comments
 (0)