@@ -525,6 +525,54 @@ abstract class FileStreamSinkSuite extends StreamTest {
525525 }
526526 }
527527 }
528+
529+ test(" Handle FileStreamSink metadata correctly for empty partition" ) {
530+ Seq (" parquet" , " orc" , " text" , " json" ).foreach { format =>
531+ val inputData = MemoryStream [String ]
532+ val df = inputData.toDF()
533+
534+ withTempDir { outputDir =>
535+ withTempDir { checkpointDir =>
536+ var query : StreamingQuery = null
537+ try {
538+ // repartition to more than the input to leave empty partitions
539+ query =
540+ df.repartition(10 )
541+ .writeStream
542+ .option(" checkpointLocation" , checkpointDir.getCanonicalPath)
543+ .format(format)
544+ .start(outputDir.getCanonicalPath)
545+
546+ inputData.addData(" 1" , " 2" , " 3" )
547+ inputData.addData(" 4" , " 5" )
548+
549+ failAfter(streamingTimeout) {
550+ query.processAllAvailable()
551+ }
552+ } finally {
553+ if (query != null ) {
554+ query.stop()
555+ }
556+ }
557+
558+ val fs = new Path (outputDir.getCanonicalPath).getFileSystem(
559+ spark.sessionState.newHadoopConf())
560+ val sinkLog = new FileStreamSinkLog (FileStreamSinkLog .VERSION , spark,
561+ outputDir.getCanonicalPath)
562+
563+ val allFiles = sinkLog.allFiles()
564+ // only files from non-empty partition should be logged
565+ assert(allFiles.length < 10 )
566+ assert(allFiles.forall(file => fs.exists(new Path (file.path))))
567+
568+ // the query should be able to read all rows correctly with metadata log
569+ val outputDf = spark.read.format(format).load(outputDir.getCanonicalPath)
570+ .selectExpr(" CAST(value AS INT)" ).as[Int ]
571+ checkDatasetUnorderly(outputDf, 1 , 2 , 3 , 4 , 5 )
572+ }
573+ }
574+ }
575+ }
528576}
529577
530578object PendingCommitFilesTrackingManifestFileCommitProtocol {
0 commit comments