diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index deaf262c5f57..bdcbe8e14d50 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1830,7 +1830,10 @@ Here are the details of all the sinks in Spark.
File Sink |
Append |
- path: path to the output directory, must be specified.
+ path: path to the output directory, must be specified.
+ outputRetentionMs: time to live (TTL) for output files. Output files which batches were
+ committed older than TTL will be eventually excluded in metadata log. This means reader queries which read
+ the sink's output directory may not process them.
For file-format-specific options, see the related methods in DataFrameWriter
(Scala/Java/Python/
+ log.action == FileStreamSinkLog.DELETE_ACTION || (curTime - log.commitTime) > ttlMs
+ }.map(_.path).toSet
if (deletedFiles.isEmpty) {
logs
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
index 916bd2ddbc81..e07baa8e7a1b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala
@@ -59,7 +59,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this function")
- val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray
+ val commitTimestamp = System.currentTimeMillis()
+ val fileStatuses = taskCommits.flatMap { taskCommit =>
+ taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]].map(_.copy(commitTime = commitTimestamp))
+ }.toArray
if (fileLog.add(batchId, fileStatuses)) {
logInfo(s"Committed batch $batchId")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index f95daafdfe19..a2200b54a2f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.StandardCharsets.UTF_8
+import org.apache.hadoop.fs.FileSystem
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -55,7 +57,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 1000L,
blockReplication = 1,
blockSize = 10000L,
- action = FileStreamSinkLog.ADD_ACTION),
+ action = FileStreamSinkLog.ADD_ACTION,
+ commitTime = 1000L),
SinkFileStatus(
path = "/a/b/y",
size = 200L,
@@ -63,7 +66,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 2000L,
blockReplication = 2,
blockSize = 20000L,
- action = FileStreamSinkLog.DELETE_ACTION),
+ action = FileStreamSinkLog.DELETE_ACTION,
+ commitTime = 2000L),
SinkFileStatus(
path = "/a/b/z",
size = 300L,
@@ -71,13 +75,14 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 3000L,
blockReplication = 3,
blockSize = 30000L,
- action = FileStreamSinkLog.ADD_ACTION))
+ action = FileStreamSinkLog.ADD_ACTION,
+ commitTime = 3000L))
// scalastyle:off
val expected = s"""v$VERSION
- |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
- |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
- |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
+ |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add","commitTime":1000}
+ |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete","commitTime":2000}
+ |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add","commitTime":3000}""".stripMargin
// scalastyle:on
val baos = new ByteArrayOutputStream()
sinkLog.serialize(logs, baos)
@@ -92,9 +97,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
withFileStreamSinkLog { sinkLog =>
// scalastyle:off
val logs = s"""v$VERSION
- |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
- |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
- |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
+ |{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add","commitTime":1000}
+ |{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete","commitTime":2000}
+ |{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add","commitTime":3000}""".stripMargin
// scalastyle:on
val expected = Seq(
@@ -105,7 +110,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 1000L,
blockReplication = 1,
blockSize = 10000L,
- action = FileStreamSinkLog.ADD_ACTION),
+ action = FileStreamSinkLog.ADD_ACTION,
+ commitTime = 1000L),
SinkFileStatus(
path = "/a/b/y",
size = 200L,
@@ -113,7 +119,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 2000L,
blockReplication = 2,
blockSize = 20000L,
- action = FileStreamSinkLog.DELETE_ACTION),
+ action = FileStreamSinkLog.DELETE_ACTION,
+ commitTime = 2000L),
SinkFileStatus(
path = "/a/b/z",
size = 300L,
@@ -121,7 +128,8 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 3000L,
blockReplication = 3,
blockSize = 30000L,
- action = FileStreamSinkLog.ADD_ACTION))
+ action = FileStreamSinkLog.ADD_ACTION,
+ commitTime = 3000L))
assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))
@@ -149,6 +157,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}
+ private def listBatchFiles(fs: FileSystem, sinkLog: FileStreamSinkLog): Set[String] = {
+ fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
+ try {
+ getBatchIdFromFileName(fileName)
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }.toSet
+ }
+
test("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically and one min batches to retain
@@ -158,18 +177,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
-
- def listBatchFiles(): Set[String] = {
- fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
- try {
- getBatchIdFromFileName(fileName)
- true
- } catch {
- case _: NumberFormatException => false
- }
- }.toSet
- }
-
+ def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
@@ -193,18 +201,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
-
- def listBatchFiles(): Set[String] = {
- fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
- try {
- getBatchIdFromFileName(fileName)
- true
- } catch {
- case _: NumberFormatException => false
- }
- }.toSet
- }
-
+ def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
@@ -225,18 +222,35 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}
+ test("filter out outdated entries when compacting") {
+ val curTime = System.currentTimeMillis()
+ withFileStreamSinkLog(Some(60000), sinkLog => {
+ val logs = Seq(
+ newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime),
+ newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime),
+ newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime))
+ assert(logs === sinkLog.compactLogs(logs))
+
+ val logs2 = Seq(
+ newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000),
+ newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000))
+ assert(logs === sinkLog.compactLogs(logs ++ logs2))
+ })
+ }
+
test("read Spark 2.1.0 log format") {
+ val maxLong = Long.MaxValue
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
// SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
- SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
- SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
- SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
- SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION),
- SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION),
- SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION),
- SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION),
- SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION),
- SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION)
+ SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
+ SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
+ SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
+ SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
+ SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
+ SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
+ SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
+ SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
+ SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION, maxLong)
))
}
@@ -244,7 +258,16 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
*/
- private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = {
+ private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus =
+ newFakeSinkFileStatus(path, action, Long.MaxValue)
+
+ /**
+ * Create a fake SinkFileStatus using path and action, and commit time.
+ */
+ private def newFakeSinkFileStatus(
+ path: String,
+ action: String,
+ commitTime: Long): SinkFileStatus = {
SinkFileStatus(
path = path,
size = 100L,
@@ -252,12 +275,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 100L,
blockReplication = 1,
blockSize = 100L,
- action = action)
+ action = action,
+ commitTime = commitTime)
}
- private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
+ private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit =
+ withFileStreamSinkLog(None, f)
+
+ private def withFileStreamSinkLog(ttl: Option[Long], f: FileStreamSinkLog => Unit): Unit = {
withTempDir { file =>
- val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath)
+ val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath,
+ ttl)
f(sinkLog)
}
}
|