Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ Here are the details of all the sources in Spark.
When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must have 2 subdirectories (so depth of directory is greater than 2). e.g. <code>/archived/here</code>. This will ensure archived files are never included as new source files.<br/>
Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option.<br/>
NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query.
<br/><br/>
For file-format-specific options, see the related methods in <code>DataStreamReader</code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,17 @@ class FileStreamSource(
CaseInsensitiveMap(options), None).allFiles()
}

private def setSourceHasMetadata(newValue: Option[Boolean]): Unit = newValue match {
case Some(true) =>
if (sourceCleaner.isDefined) {
throw new UnsupportedOperationException("Clean up source files is not supported when" +
" reading from the output directory of FileStreamSink.")
}
sourceHasMetadata = Some(true)
case _ =>
sourceHasMetadata = newValue
}

/**
* Returns a list of files found, sorted by their timestamp.
*/
Expand All @@ -216,7 +227,7 @@ class FileStreamSource(
sourceHasMetadata match {
case None =>
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
sourceHasMetadata = Some(true)
setSourceHasMetadata(Some(true))
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
allFiles = allFilesUsingInMemoryFileIndex()
Expand All @@ -228,10 +239,10 @@ class FileStreamSource(
// metadata log and data files are only generated after the previous
// `FileStreamSink.hasMetadata` check
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf, sparkSession.sessionState.conf)) {
sourceHasMetadata = Some(true)
setSourceHasMetadata(Some(true))
allFiles = allFilesUsingMetadataLogFileIndex()
} else {
sourceHasMetadata = Some(false)
setSourceHasMetadata(Some(false))
// `allFiles` have already been fetched using InMemoryFileIndex in this round
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.util.Progressable
import org.scalatest.PrivateMethodTester
import org.scalatest.time.SpanSugar._

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
Expand Down Expand Up @@ -149,6 +150,20 @@ abstract class FileStreamSourceTest
}
}

case class AddFilesToFileStreamSinkLog(
fs: FileSystem,
srcDir: Path,
sinkLog: FileStreamSinkLog,
batchId: Int)(
pathFilter: Path => Boolean) extends ExternalAction {
override def runAction(): Unit = {
val statuses = fs.listStatus(srcDir, new PathFilter {
override def accept(path: Path): Boolean = pathFilter(path)
})
sinkLog.add(batchId, statuses.map(SinkFileStatus(_)))
}
}

/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
def createFileStream(
format: String,
Expand Down Expand Up @@ -1617,14 +1632,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}

test("remove completed files when remove option is enabled") {
def assertFileIsRemoved(files: Array[String], fileName: String): Unit = {
assert(!files.exists(_.startsWith(fileName)))
}

def assertFileIsNotRemoved(files: Array[String], fileName: String): Unit = {
assert(files.exists(_.startsWith(fileName)))
}

withTempDirs { case (src, tmp) =>
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
Expand All @@ -1642,28 +1649,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
CheckAnswer("keep1"),
AssertOnQuery("input file removed") { _: StreamExecution =>
// it doesn't rename any file yet
assertFileIsNotRemoved(src.list(), "keep1")
assertFileIsNotRemoved(src, "keep1")
true
},
AddTextFileData("keep2", src, tmp, tmpFilePrefix = "ke ep2 %"),
CheckAnswer("keep1", "keep2"),
AssertOnQuery("input file removed") { _: StreamExecution =>
val files = src.list()

// it renames input file for first batch, but not for second batch yet
assertFileIsRemoved(files, "keep1")
assertFileIsNotRemoved(files, "ke ep2 %")
assertFileIsRemoved(src, "keep1")
assertFileIsNotRemoved(src, "ke ep2 %")

true
},
AddTextFileData("keep3", src, tmp, tmpFilePrefix = "keep3"),
CheckAnswer("keep1", "keep2", "keep3"),
AssertOnQuery("input file renamed") { _: StreamExecution =>
val files = src.list()

// it renames input file for second batch, but not third batch yet
assertFileIsRemoved(files, "ke ep2 %")
assertFileIsNotRemoved(files, "keep3")
assertFileIsRemoved(src, "ke ep2 %")
assertFileIsNotRemoved(src, "keep3")

true
}
Expand Down Expand Up @@ -1739,6 +1742,44 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

Seq("delete", "archive").foreach { cleanOption =>
test(s"Throw UnsupportedOperationException on configuring $cleanOption when source path" +
" refers the output dir of FileStreamSink") {
withThreeTempDirs { case (src, tmp, archiveDir) =>
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
// Force deleting the old logs
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
) {
val option = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "1",
"cleanSource" -> cleanOption, "sourceArchiveDir" -> archiveDir.getAbsolutePath)

val fileStream = createFileStream("text", src.getCanonicalPath, options = option)
val filtered = fileStream.filter($"value" contains "keep")

// create FileStreamSinkLog under source directory
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
new File(src, FileStreamSink.metadataDir).getCanonicalPath)
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
val srcPath = new Path(src.getCanonicalPath)
val fileSystem = srcPath.getFileSystem(hadoopConf)

// Here we will just check whether the source file is removed or not, as we cover
// functionality test of "archive" in other UT.
testStream(filtered)(
AddTextFileData("keep1", src, tmp, tmpFilePrefix = "keep1"),
AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 0) { path =>
path.getName.startsWith("keep1")
},
ExpectFailure[UnsupportedOperationException](
t => assert(t.getMessage.startsWith("Clean up source files is not supported")),
isFatalError = false)
)
}
}
}
}

class FakeFileSystem(scheme: String) extends FileSystem {
override def exists(f: Path): Boolean = true

Expand Down Expand Up @@ -1797,6 +1838,14 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

private def assertFileIsRemoved(sourceDir: File, fileName: String): Unit = {
assert(!sourceDir.list().exists(_.startsWith(fileName)))
}

private def assertFileIsNotRemoved(sourceDir: File, fileName: String): Unit = {
assert(sourceDir.list().exists(_.startsWith(fileName)))
}

private def assertFileIsNotMoved(sourceDir: File, expectedDir: File, filePrefix: String): Unit = {
assert(sourceDir.exists())
assert(sourceDir.list().exists(_.startsWith(filePrefix)))
Expand Down