Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
}

/**
* Maximum age of a file that can be found in this directory, before it is deleted.
* Maximum age of a file that can be found in this directory, before it is ignored. For the
* first batch all files will be considered valid. If `latestFirst` is set to `true` and
* `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are
* valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest file, and not the
* timestamp of the current system. That this means if the last file has timestamp 1000, and the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,29 @@ class FileStreamSource(

private val fileSortOrder = if (sourceOptions.latestFirst) {
logWarning(
"""'latestFirst' is true. New files will be processed first.
|It may affect the watermark value""".stripMargin)
"""'latestFirst' is true. New files will be processed first, which may affect the watermark
|value. In addition, 'maxFileAge' will be ignored.""".stripMargin)
implicitly[Ordering[Long]].reverse
} else {
implicitly[Ordering[Long]]
}

private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && maxFilesPerBatch.isDefined) {
Long.MaxValue
} else {
sourceOptions.maxFileAgeMs
}

/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
val seenFiles = new SeenFilesMap(maxFileAgeMs)

metadataLog.allFiles().foreach { entry =>
seenFiles.add(entry.path, entry.timestamp)
}
seenFiles.purge()

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = $maxFileAgeMs")

/**
* Returns the maximum offset that can be retrieved from the source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,49 +1173,65 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
SerializedOffset(str.trim)
}

private def runTwoBatchesAndVerifyResults(
src: File,
latestFirst: Boolean,
firstBatch: String,
secondBatch: String,
maxFileAge: Option[String] = None): Unit = {
val srcOptions = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1") ++
maxFileAge.map("maxFileAge" -> _)
val fileStream = createFileStream(
"text",
src.getCanonicalPath,
options = srcOptions)
val clock = new StreamManualClock()
testStream(fileStream)(
StartStream(trigger = ProcessingTime(10), triggerClock = clock),
AssertOnQuery { _ =>
// Block until the first batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(0))
}
true
},
CheckLastBatch(firstBatch),
AdvanceManualClock(10),
AssertOnQuery { _ =>
// Block until the second batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(10))
}
true
},
CheckLastBatch(secondBatch)
)
}

test("FileStreamSource - latestFirst") {
withTempDir { src =>
// Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
val f1 = stringToFile(new File(src, "1.txt"), "1")
val f2 = stringToFile(new File(src, "2.txt"), "2")
f2.setLastModified(f1.lastModified + 1000)

def runTwoBatchesAndVerifyResults(
latestFirst: Boolean,
firstBatch: String,
secondBatch: String): Unit = {
val fileStream = createFileStream(
"text",
src.getCanonicalPath,
options = Map("latestFirst" -> latestFirst.toString, "maxFilesPerTrigger" -> "1"))
val clock = new StreamManualClock()
testStream(fileStream)(
StartStream(trigger = ProcessingTime(10), triggerClock = clock),
AssertOnQuery { _ =>
// Block until the first batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(0))
}
true
},
CheckLastBatch(firstBatch),
AdvanceManualClock(10),
AssertOnQuery { _ =>
// Block until the second batch finishes.
eventually(timeout(streamingTimeout)) {
assert(clock.isStreamWaitingAt(10))
}
true
},
CheckLastBatch(secondBatch)
)
}

// Read oldest files first, so the first batch is "1", and the second batch is "2".
runTwoBatchesAndVerifyResults(latestFirst = false, firstBatch = "1", secondBatch = "2")
runTwoBatchesAndVerifyResults(src, latestFirst = false, firstBatch = "1", secondBatch = "2")

// Read latest files first, so the first batch is "2", and the second batch is "1".
runTwoBatchesAndVerifyResults(latestFirst = true, firstBatch = "2", secondBatch = "1")
runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1")
}
}

test("SPARK-19813: Ignore maxFileAge when maxFilesPerTrigger and latestFirst is used") {
withTempDir { src =>
// Prepare two files: 1.txt, 2.txt, and make sure they have different modified time.
val f1 = stringToFile(new File(src, "1.txt"), "1")
val f2 = stringToFile(new File(src, "2.txt"), "2")
f2.setLastModified(f1.lastModified + 3600 * 1000 /* 1 hour later */)

runTwoBatchesAndVerifyResults(src, latestFirst = true, firstBatch = "2", secondBatch = "1",
maxFileAge = Some("1m") /* 1 minute */)
}
}

Expand Down