diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index fe6362d878d5..26f42b6e3f47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -327,6 +327,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) + // TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved + mayRemoveCrcFile(srcPath) } @@ -343,5 +345,17 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs case _ => false } + + private def mayRemoveCrcFile(path: Path): Unit = { + try { + val checksumFile = new Path(path.getParent, s".${path.getName}.crc") + if (exists(checksumFile)) { + // checksum file exists, deleting it + delete(checksumFile) + } + } catch { + case NonFatal(_) => // ignore, we are removing crc file as "best-effort" + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index c57b40c977e4..79bcd490a245 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper { assert(fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception + // crc file should not be leaked when origin file doesn't exist. + // The implementation of Hadoop filesystem may filter out checksum file, so + // listing files from local filesystem. + val fileNames = new File(path.getParent.toString).listFiles().toSeq + .filter(p => p.isFile).map(p => p.getName) + val crcFiles = fileNames.filter(n => n.startsWith(".") && n.endsWith(".crc")) + val originFileNamesForExistingCrcFiles = crcFiles.map { name => + // remove first "." and last ".crc" + name.substring(1, name.length - 4) + } + + // Check all origin files exist for all crc files. + assert(originFileNamesForExistingCrcFiles.toSet.subsetOf(fileNames.toSet), + s"Some of origin files for crc files don't exist - crc files: $crcFiles / " + + s"expected origin files: $originFileNamesForExistingCrcFiles / actual files: $fileNames") + // Open and delete fm.open(path).close() fm.delete(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index c09756cd1b24..67dd88cbab63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala @@ -25,6 +25,7 @@ import scala.language.implicitConversions import org.scalatest.concurrent.Waiters._ import org.scalatest.time.SpanSugar._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.UninterruptibleThread @@ -58,6 +59,21 @@ class HDFSMetadataLogSuite extends SharedSparkSession { } test("HDFSMetadataLog: purge") { + testPurge() + } + + Seq( + classOf[FileSystemBasedCheckpointFileManager], + classOf[FileContextBasedCheckpointFileManager] + ).map(_.getCanonicalName).foreach { cls => + test(s"HDFSMetadataLog: purge - explicit file manager - $cls") { + withSQLConf(SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key -> cls) { + testPurge() + } + } + } + + private def testPurge(): Unit = { withTempDir { temp => val metadataLog = new HDFSMetadataLog[String](spark, temp.getAbsolutePath) assert(metadataLog.add(0, "batch0")) @@ -74,12 +90,16 @@ class HDFSMetadataLogSuite extends SharedSparkSession { assert(metadataLog.get(2).isDefined) assert(metadataLog.getLatest().get._1 == 2) - // There should be exactly one file, called "2", in the metadata directory. + // There should be at most two files, called "2", and optionally crc file, + // in the metadata directory. // This check also tests for regressions of SPARK-17475 - val allFiles = new File(metadataLog.metadataPath.toString).listFiles() - .filter(!_.getName.startsWith(".")).toSeq - assert(allFiles.size == 1) - assert(allFiles(0).getName() == "2") + val allFiles = new File(metadataLog.metadataPath.toString).listFiles().toSeq + assert(allFiles.size <= 2) + assert(allFiles.exists(_.getName == "2")) + if (allFiles.size == 2) { + // there's possibly crc file being left as well + assert(allFiles.exists(_.getName == ".2.crc")) + } } }