Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,16 +375,13 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio

var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]

var leafDirs = mutable.Map.empty[Path, FileStatus]

def refresh(): Unit = {
def listLeafFilesAndDirs(fs: FileSystem, status: FileStatus): Set[FileStatus] = {
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDir)
val leafDirs = if (dirs.isEmpty) Set(status) else Set.empty[FileStatus]
files.toSet ++ leafDirs ++ dirs.flatMap(dir => listLeafFilesAndDirs(fs, dir))
}

leafDirs.clear()
leafFiles.clear()

// We don't filter files/directories like _temporary/_SUCCESS here, as specific data sources
Expand All @@ -397,7 +394,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}

val (dirs, files) = statuses.partition(_.isDir)
leafDirs ++= dirs.map(d => d.getPath -> d).toMap
leafFiles ++= files.map(f => f.getPath -> f).toMap
leafDirToChildrenFiles ++= files.groupBy(_.getPath.getParent)
}
Expand Down Expand Up @@ -461,7 +457,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}

private def discoverPartitions(): PartitionSpec = {
val leafDirs = fileStatusCache.leafDirs.keys.toSeq
// We use leaf dirs containing data files to discover the schema.
val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.parquet

import java.io.File

import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -175,11 +177,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
pi <- Seq(1, 2)
ps <- Seq("foo", "bar")
} {
val dir = makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)
makeParquetFile(
(1 to 10).map(i => ParquetData(i, i.toString)),
makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
dir)
// Introduce _temporary dir to test the robustness of the schema discovery process.
new File(dir.toString, "_temporary").mkdir()
}
// Introduce _temporary dir to the base dir the robustness of the schema discovery process.
new File(base.getCanonicalPath, "_temporary").mkdir()

println("load the partitioned table")
read.parquet(base.getCanonicalPath).registerTempTable("t")

withTempTable("t") {
Expand Down