diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index b3e4240c315b..285af65f78ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -329,6 +329,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) + // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved + mayRemoveCrcFile(srcPath) } @@ -345,5 +347,17 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs case _ => false } + + private def mayRemoveCrcFile(path: Path): Unit = { + try { + val checksumFile = new Path(path.getParent, s".${path.getName}.crc") + if (exists(checksumFile)) { + // checksum file exists, deleting it + delete(checksumFile) + } + } catch { + case NonFatal(_) => // ignore, we are removing crc file as "best-effort" + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index fe59cb25d500..ab6fbeda7768 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite { assert(fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception + // crc file should not be leaked when origin file doesn't exist. + // The implementation of Hadoop filesystem may filter out checksum file, so + // listing files from local filesystem. + val fileNames = new File(path.getParent.toString).listFiles().toSeq + .filter(p => p.isFile).map(p => p.getName) + val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc")) + val originFileNamesForExistingCrcFiles = crcFiles.map { name => + // remove first "." and last ".crc" + name.substring(1, name.length - 4) + } + + // Check all origin files exist for all crc files. + assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet), + s"Some of origin files for crc files don't exist - crc files: $crcFiles / " + + s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames") + // Open and delete fm.open(path).close() fm.delete(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 0e36e7f5da12..e832422a93b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.concurrent.Waiters._ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.UninterruptibleThread @@ -59,6 +60,21 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { } test("HDFSMetadataLog: purge") { + testPurge() + } + + Seq( + classOf[FileSystemBasedCheckpointFileManager], + classOf[FileContextBasedCheckpointFileManager] + ).map(_.getCanonicalName).foreach { cls => + test(s"HDFSMetadataLog: purge - explicit file manager - $cls") { + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> cls) { + testPurge() + } + } + } + + private def testPurge(): Unit = { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) @@ -75,12 +91,16 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext { assert(metadataLog.get(2).isDefined) assert(metadataLog.getLatest().get._1 == 2) - // There should be exactly one file, called "2", in the metadata directory. + // There should be at most two files, called "2", and optionally crc file, + // in the metadata directory. // This check also tests for regressions of SPARK-17475 - val allFiles = new File(metadataLog.metadataPath.toString).listFiles() - .filter(!_.getName.startsWith(".")).toSeq - assert(allFiles.size == 1) - assert(allFiles(0).getName() == "2") + val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + assert(allFiles.size <= 2) + assert(allFiles.exists(_.getName == "2")) + if (allFiles.size == 2) { + // there's possibly crc file being left as well + assert(allFiles.exists(_.getName == ".2.crc")) + } } }