Skip to content

Commit 3bb42bd

Browse files
committed
Addressed comments
1 parent b1f82ce commit 3bb42bd

File tree

1 file changed

+23
-22
lines changed

1 file changed

+23
-22
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,8 @@ class HDFSFileCatalog(
291291
refresh()
292292

293293
override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
294-
295294
if (partitionSpec().partitionColumns.isEmpty) {
296-
Partition(
297-
InternalRow.empty,
298-
unpartitionedDataFiles().filterNot(_.getPath.getName startsWith "_")
299-
) :: Nil
295+
Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
300296
} else {
301297
prunePartitions(filters, partitionSpec()).map {
302298
case PartitionDirectory(values, path) =>
@@ -341,9 +337,30 @@ class HDFSFileCatalog(
341337
}
342338
}
343339

340+
/**
341+
* All the files to consider for processing. If there is a partitioning scheme, then
342+
* consider all the leaf files in the input paths. Else consider only the input paths
343+
* (if a path is file) or their immediate children (if a path is a directory).
344+
*/
344345
def allFiles(): Seq[FileStatus] = {
345346
if (partitionSpec().partitionColumns.isEmpty) {
346-
unpartitionedDataFiles()
347+
// For each of the input paths, get the list of files inside them
348+
paths.flatMap { path =>
349+
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
350+
val fs = path.getFileSystem(hadoopConf)
351+
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
352+
353+
// There are three cases possible with each path
354+
// 1. The path is a directory and has children files in it. Then it must be present in
355+
// leafDirToChildrenFiles as those children files will have been found as leaf files.
356+
// Find its children files from leafDirToChildrenFiles and include them.
357+
// 2. The path is a file, then it will be present in leafFiles. Include this path.
358+
// 3. The path is a directory, but has no children files. Do not include this path.
359+
360+
leafDirToChildrenFiles.get(qualifiedPath)
361+
.orElse { leafFiles.get(path).map(Array(_)) }
362+
.getOrElse(Array.empty)
363+
}
347364
} else {
348365
leafFiles.values.toSeq
349366
}
@@ -453,22 +470,6 @@ class HDFSFileCatalog(
453470
}
454471
}
455472

456-
/** List of files to consider when there is not inferred partitioning scheme */
457-
private def unpartitionedDataFiles(): Seq[FileStatus] = {
458-
// For each of the input paths, get the list of files inside them
459-
paths.flatMap { path =>
460-
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
461-
val fs = path.getFileSystem(hadoopConf)
462-
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
463-
464-
// If it is a directory (i.e. exists in leafDirToChildrenFiles), return its children files
465-
// Or if it is a file (i.e. exists in leafFiles), return the path itself
466-
leafDirToChildrenFiles.get(qualifiedPath).orElse {
467-
leafFiles.get(path).map(Array(_))
468-
}.getOrElse(Array.empty)
469-
}
470-
}
471-
472473
def refresh(): Unit = {
473474
val files = listLeafFiles(paths)
474475

0 commit comments

Comments
 (0)