Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming
* - Allow the user to query the latest batch id.
* - Allow the user to query the metadata object of a specified batch id.
* - Allow the user to query metadata objects in a range of batch ids.
* - Allow the user to remove obsolete metdata
*/
trait MetadataLog[T] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ class StreamExecution(
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")

// Now that we have logged the new batch, no further processing will happen for
// the previous batch, and it is safe to discard the old metadata.
// NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
// flight at the same time), this cleanup logic will need to change.
offsetLog.purge(currentBatchId)
} else {
awaitBatchLock.lock()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
)
}

testQuietly("StreamExecution metadata garbarge collection") {
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map(6 / _)

// Run a few batches through the application
testStream(mapped)(
AddData(inputData, 1, 2),
CheckAnswer(6, 3),
AddData(inputData, 1, 2),
CheckAnswer(6, 3, 6, 3),
AddData(inputData, 4, 6),
CheckAnswer(6, 3, 6, 3, 1, 1),

// Three batches have run, but only one set of metadata should be present
AssertOnQuery(
q => {
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475
toTest.size == 1 && toTest.head == "2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually this test case will always pass, even without the change here. I will submit a patch based on yours that fixes the issues here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, didn't notice I had left that "true" at the end of that block of code. @petermaxlee , let me know if you need help preparing the final version for merge.

true
}
)
)
}

/**
* A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`.
*
Expand Down