Skip to content

Commit 644b8ab

Browse files
committed
[SPARK-24295][SS] Add option to retain only last batch in file stream sink metadata
1 parent 6b3c832 commit 644b8ab

File tree

8 files changed

+156
-5
lines changed

8 files changed

+156
-5
lines changed

docs/structured-streaming-programming-guide.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,11 @@ Here are the details of all the sources in Spark.
536536
href="api/R/read.stream.html">R</a>).
537537
E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code>.
538538
<br/><br/>
539+
<code>ignoreFileStreamSinkMetadata</code>: whether to ignore metadata information being left from file stream sink, which leads to always use in-memory file index. (default: false)
540+
<br/>
541+
This option is useful when metadata grows too big and reading metadata is even slower than listing files from filesystem.<br/>
542+
NOTE: This option must be set to "true" if file source is reading from output files which file stream sink is written, with setting "retainOnlyLastBatchInMetadata" option to "true".
543+
<br/><br/>
539544
In addition, there are session configurations that affect certain file-formats. See the <a href="sql-programming-guide.html">SQL Programming Guide</a> for more details. E.g., for "parquet", see <a href="sql-data-sources-parquet.html#configuration">Parquet configuration</a> section.
540545
</td>
541546
<td>Yes</td>
@@ -1812,6 +1817,12 @@ Here are the details of all the sinks in Spark.
18121817
(<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
18131818
href="api/R/write.stream.html">R</a>).
18141819
E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
1820+
<br/>
1821+
<code>retainOnlyLastBatchInMetadata</code>: whether to retain metadata information only for last succeed batch.
1822+
<br/><br/>
1823+
This option greatly reduces overhead on compacting metadata files which would be non-trivial when query processes lots of files in each batch.<br/>
1824+
NOTE: As it only retains the last batch in metadata, the metadata is not readable from file source: you must set "ignoreFileStreamSinkMetadata" option
1825+
to "true" when reading sink's output files from another query, regardless of batch and streaming source.
18151826
</td>
18161827
<td>Yes (exactly-once)</td>
18171828
<td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,9 @@ case class DataSource(
344344
// We are reading from the results of a streaming query. Load files from the metadata log
345345
// instead of listing them using HDFS APIs.
346346
case (format: FileFormat, _)
347-
if FileStreamSink.hasMetadata(
348-
caseInsensitiveOptions.get("path").toSeq ++ paths,
347+
if !caseInsensitiveOptions.getOrElse(
348+
"ignoreFileStreamSinkMetadata", "false").toBoolean &&
349+
FileStreamSink.hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths,
349350
sparkSession.sessionState.newHadoopConf()) =>
350351
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
351352
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
7474
*/
7575
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
7676

77+
/**
78+
* Whether to ignore FileStreamSink metadata in source, which leads to use in-memory file index.
79+
*/
80+
val ignoreFileStreamSinkMetadata: Boolean = withBooleanParameter("ignoreFileStreamSinkMetadata",
81+
default = false)
82+
7783
private def withBooleanParameter(name: String, default: Boolean) = {
7884
parameters.get(name).map { str =>
7985
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ class FileStreamSink(
9797
private val fileLog =
9898
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
9999
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
100+
private val retainOnlyLastBatchInMetadata: Boolean =
101+
options.getOrElse("retainOnlyLastBatchInMetadata", "false").toBoolean
100102

101103
private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
102104
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
@@ -114,7 +116,7 @@ class FileStreamSink(
114116

115117
committer match {
116118
case manifestCommitter: ManifestFileCommitProtocol =>
117-
manifestCommitter.setupManifestOptions(fileLog, batchId)
119+
manifestCommitter.setupManifestOptions(fileLog, batchId, retainOnlyLastBatchInMetadata)
118120
case _ => // Do nothing
119121
}
120122

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,10 @@ class FileStreamSource(
208208
var allFiles: Seq[FileStatus] = null
209209
sourceHasMetadata match {
210210
case None =>
211-
if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
211+
if (sourceOptions.ignoreFileStreamSinkMetadata) {
212+
sourceHasMetadata = Some(false)
213+
allFiles = allFilesUsingMetadataLogFileIndex()
214+
} else if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
212215
sourceHasMetadata = Some(true)
213216
allFiles = allFilesUsingMetadataLogFileIndex()
214217
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,19 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
4242

4343
@transient private var fileLog: FileStreamSinkLog = _
4444
private var batchId: Long = _
45+
private var retainOnlyLastBatch: Boolean = _
4546

4647
/**
4748
* Sets up the manifest log output and the batch id for this job.
4849
* Must be called before any other function.
4950
*/
50-
def setupManifestOptions(fileLog: FileStreamSinkLog, batchId: Long): Unit = {
51+
def setupManifestOptions(
52+
fileLog: FileStreamSinkLog,
53+
batchId: Long,
54+
retainOnlyLastBatch: Boolean): Unit = {
5155
this.fileLog = fileLog
5256
this.batchId = batchId
57+
this.retainOnlyLastBatch = retainOnlyLastBatch
5358
}
5459

5560
override def setupJob(jobContext: JobContext): Unit = {
@@ -63,6 +68,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
6368

6469
if (fileLog.add(batchId, fileStatuses)) {
6570
logInfo(s"Committed batch $batchId")
71+
if (retainOnlyLastBatch) {
72+
// purge older than batchId, which always keep only one batch in file log
73+
fileLog.purge(batchId)
74+
}
6675
} else {
6776
throw new IllegalStateException(s"Race while writing batch $batchId")
6877
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,75 @@ class FileStreamSinkSuite extends StreamTest {
190190
}
191191
}
192192

193+
test("SPARK-24295 retain only last batch for file log metadata") {
194+
val inputData = MemoryStream[Long]
195+
val inputDF = inputData.toDF.toDF("time")
196+
val outputDf = inputDF
197+
.selectExpr("CAST(time AS timestamp) AS timestamp")
198+
199+
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
200+
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
201+
202+
var query: StreamingQuery = null
203+
204+
try {
205+
query =
206+
outputDf.writeStream
207+
.option("checkpointLocation", checkpointDir)
208+
.option("retainOnlyLastBatchInMetadata", true)
209+
.format("parquet")
210+
.start(outputDir)
211+
212+
def addTimestamp(timestampInSecs: Int*): Unit = {
213+
inputData.addData(timestampInSecs.map(_ * 1L): _*)
214+
failAfter(streamingTimeout) {
215+
query.processAllAvailable()
216+
}
217+
}
218+
219+
def check(expectedResult: Long*): Unit = {
220+
val outputDf = spark.read
221+
// This option must be provided when we enable 'retainOnlyLastBatchInFileLog'
222+
// to purge metadata from FileStreamSink, otherwise query will fail while loading
223+
// due to incomplete of metadata.
224+
.option("ignoreFileStreamSinkMetadata", "true")
225+
.parquet(outputDir)
226+
.selectExpr("timestamp")
227+
.sort("timestamp")
228+
checkDataset(outputDf.as[Long], expectedResult: _*)
229+
}
230+
231+
val logPath = new Path(outputDir, FileStreamSink.metadataDir)
232+
val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toUri.toString)
233+
234+
addTimestamp(100)
235+
check(100)
236+
237+
// only new batch is retained, hence length should be 1
238+
assert(fileLog.get(None, None).length == 1)
239+
assert(fileLog.get(None, None).head._1 === 0)
240+
241+
addTimestamp(104, 123)
242+
check(100, 104, 123)
243+
244+
// only new batch is retained, hence length should be 1
245+
assert(fileLog.get(None, None).length === 1)
246+
assert(fileLog.get(None, None).head._1 === 1)
247+
248+
addTimestamp(140)
249+
check(100, 104, 123, 140)
250+
251+
// only new batch is retained, hence length should be 1
252+
assert(fileLog.get(None, None).length === 1)
253+
assert(fileLog.get(None, None).head._1 === 2)
254+
255+
} finally {
256+
if (query != null) {
257+
query.stop()
258+
}
259+
}
260+
}
261+
193262
test("partitioned writing and batch reading with 'basePath'") {
194263
withTempDir { outputDir =>
195264
withTempDir { checkpointDir =>

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,56 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
908908
}
909909
}
910910

911+
test("ignore metadata when reading data from outputs of another streaming query") {
912+
withTempDirs { case (outputDir, checkpointDir) =>
913+
// q1 is a streaming query that reads from memory and writes to text files
914+
val q1Source = MemoryStream[String]
915+
val q1 =
916+
q1Source
917+
.toDF()
918+
.writeStream
919+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
920+
.format("text")
921+
.start(outputDir.getCanonicalPath)
922+
923+
// q2 is a streaming query that reads q1's text outputs
924+
// even q1 is supposed to store metadata in output location, we intend to ignore it
925+
val q2 =
926+
createFileStream("text", outputDir.getCanonicalPath,
927+
options = Map("ignoreFileStreamSinkMetadata" -> "true"))
928+
.filter($"value" contains "keep")
929+
930+
def q1AddData(data: String*): StreamAction =
931+
Execute { _ =>
932+
q1Source.addData(data)
933+
q1.processAllAvailable()
934+
}
935+
def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }
936+
937+
testStream(q2)(
938+
// batch 0
939+
q1AddData("drop1", "keep2"),
940+
q2ProcessAllAvailable(),
941+
CheckAnswer("keep2"),
942+
943+
// batch 1
944+
Assert {
945+
// create a text file that won't be on q1's sink log
946+
// given we are ignoring sink metadata, the content should appear in q2's answer
947+
val shouldNotKeep = new File(outputDir, "keep.txt")
948+
stringToFile(shouldNotKeep, "keep")
949+
shouldNotKeep.exists()
950+
},
951+
q1AddData("keep3"),
952+
q2ProcessAllAvailable(),
953+
// here we should see "keep", whereas with metadata index, it should not appear
954+
CheckAnswer("keep", "keep2", "keep3"),
955+
956+
Execute { _ => q1.stop() }
957+
)
958+
}
959+
}
960+
911961
test("start before another streaming query, and read its output") {
912962
withTempDirs { case (outputDir, checkpointDir) =>
913963
// q1 is a streaming query that reads from memory and writes to text files

0 commit comments

Comments
 (0)