Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
parallelismMax: Int,
recursiveFileLookup: Boolean = true): Seq[(Path, Seq[FileStatus])] = {
parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true,
ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax, recursiveFileLookup)
}

/**
Expand Down Expand Up @@ -115,7 +116,8 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
parallelismMax: Int,
recursiveFileLookup: Boolean = true): Seq[(Path, Seq[FileStatus])] = {

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= parallelismThreshold) {
Expand All @@ -129,7 +131,8 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax)
parallelismMax = parallelismMax,
recursiveFileLookup = recursiveFileLookup)
(path, leafFiles)
}
}
Expand Down Expand Up @@ -169,7 +172,8 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
parallelismThreshold = Int.MaxValue,
parallelismMax = 0)
parallelismMax = 0,
recursiveFileLookup = recursiveFileLookup)
(path, leafFiles)
}
}.collect().toImmutableArraySeq
Expand All @@ -196,7 +200,8 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreLocality: Boolean,
isRootPath: Boolean,
parallelismThreshold: Int,
parallelismMax: Int): Seq[FileStatus] = {
parallelismMax: Int,
recursiveFileLookup: Boolean = true): Seq[FileStatus] = {

logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
Expand Down Expand Up @@ -255,7 +260,9 @@ private[spark] object HadoopFSUtils extends Logging {

val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val filteredNestedFiles: Seq[FileStatus] = contextOpt match {
val filteredNestedFiles: Seq[FileStatus] = if (!recursiveFileLookup) {
Seq.empty
} else contextOpt match {
case Some(context) if dirs.length > parallelismThreshold =>
parallelListLeafFilesInternal(
context,
Expand All @@ -266,7 +273,8 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax
parallelismMax = parallelismMax,
recursiveFileLookup = recursiveFileLookup
).flatMap(_._2)
case _ =>
dirs.flatMap { dir =>
Expand All @@ -279,7 +287,8 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreLocality = ignoreLocality,
isRootPath = false,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax)
parallelismMax = parallelismMax,
recursiveFileLookup = recursiveFileLookup)
}.toImmutableArraySeq
}
val filteredTopLevelFiles = if (filter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ object InMemoryFileIndex extends Logging {
sparkSession.sessionState.conf.useListFilesFileSystemList.split(",").map(_.trim)
val ignoreMissingFiles =
new FileSourceOptions(CaseInsensitiveMap(parameters)).ignoreMissingFiles
val recursiveFileLookup = parameters
.get(FileIndexOptions.RECURSIVE_FILE_LOOKUP).map(_.toBoolean).getOrElse(true)
val useListFiles = try {
val scheme = paths.head.getFileSystem(hadoopConf).getScheme
paths.size == 1 && fileSystemList.contains(scheme)
Expand All @@ -176,7 +178,8 @@ object InMemoryFileIndex extends Logging {
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism,
recursiveFileLookup = recursiveFileLookup)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ abstract class PartitioningAwareFileIndex(
pathFilters.forall(_.accept(file))

protected lazy val recursiveFileLookup: Boolean = {
caseInsensitiveMap.getOrElse(FileIndexOptions.RECURSIVE_FILE_LOOKUP, "false").toBoolean
caseInsensitiveMap.getOrElse(FileIndexOptions.RECURSIVE_FILE_LOOKUP, "true").toBoolean
}

protected lazy val ignoreInvalidPartitionPaths: Boolean = {
Expand Down Expand Up @@ -117,36 +117,7 @@ abstract class PartitioningAwareFileIndex(
override def sizeInBytes: Long = allFiles().map(_.getLen).sum

def allFiles(): Seq[FileStatus] = {
val files = if (partitionSpec().partitionColumns.isEmpty && !recursiveFileLookup) {
// For each of the root input paths, get the list of files inside them
rootPaths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles).
val fs = path.getFileSystem(hadoopConf)
val qualifiedPathPre = fs.makeQualified(path)
val qualifiedPath: Path = if (qualifiedPathPre.isRoot && !qualifiedPathPre.isAbsolute) {
// SPARK-17613: Always append `Path.SEPARATOR` to the end of parent directories,
// because the `leafFile.getParent` would have returned an absolute path with the
// separator at the end.
new Path(qualifiedPathPre, Path.SEPARATOR)
} else {
qualifiedPathPre
}

// There are three cases possible with each path
// 1. The path is a directory and has children files in it. Then it must be present in
// leafDirToChildrenFiles as those children files will have been found as leaf files.
// Find its children files from leafDirToChildrenFiles and include them.
// 2. The path is a file, then it will be present in leafFiles. Include this path.
// 3. The path is a directory, but has no children files. Do not include this path.

leafDirToChildrenFiles.get(qualifiedPath)
.orElse { leafFiles.get(qualifiedPath).map(Array(_)) }
.getOrElse(Array.empty)
}
} else {
leafFiles.values.toSeq
}
files.filter(matchPathPattern)
leafFiles.values.toSeq.filter(matchPathPattern)
}

protected def inferPartitioning(): PartitionSpec = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,45 @@ class FileIndexSuite extends SharedSparkSession {
assert(fileIndex2a != fileIndex2b)
}
}

test("InMemoryFileIndex: recursiveFileLookup=false should not list children directories") {
withTempDir { dir =>
// Create directory structure:
// dir/
// file1.txt
// subdir1/
// file2.txt
// subdir2/
// file3.txt

val file1 = new File(dir, "file1.txt")
stringToFile(file1, "content1")

val subdir1 = new File(dir, "subdir1")
subdir1.mkdir()
val file2 = new File(subdir1, "file2.txt")
stringToFile(file2, "content2")

val subdir2 = new File(subdir1, "subdir2")
subdir2.mkdir()
val file3 = new File(subdir2, "file3.txt")
stringToFile(file3, "content3")

val rootPath = new Path(dir.getCanonicalPath)

// Test with recursiveFileLookup=true (default behavior)
val recursiveFileIndex = new InMemoryFileIndex(
spark, Seq(rootPath), Map("recursiveFileLookup" -> "true"), None)
val recursiveFiles = recursiveFileIndex.allFiles().map(_.getPath.getName).sorted
assert(recursiveFiles === Seq("file1.txt", "file2.txt", "file3.txt"))

// Test with recursiveFileLookup=false (should only list top-level files)
val nonRecursiveFileIndex = new InMemoryFileIndex(
spark, Seq(rootPath), Map("recursiveFileLookup" -> "false"), None)
val nonRecursiveFiles = nonRecursiveFileIndex.allFiles().map(_.getPath.getName).sorted
assert(nonRecursiveFiles === Seq("file1.txt"))
}
}
}

object DeletionRaceFileSystem {
Expand Down