Skip to content

Commit 0c58deb

Browse files
committed
[SPARK-28098][SQL]Support read partitioned Hive tables with subdirectories
1 parent dd67777 commit 0c58deb

File tree

3 files changed

+43
-1
lines changed

3 files changed

+43
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3210,6 +3210,13 @@ object SQLConf {
32103210
.intConf
32113211
.createWithDefault(0)
32123212

3213+
val READ_PARTITION_WITH_SUBDIRECTORY_ENABLED =
3214+
buildConf("spark.sql.sources.readPartitionWithSubdirectory.enabled")
3215+
.doc("When set to true, Spark SQL could read the files of " +
3216+
" partitioned hive table from subdirectories under root path of table")
3217+
.booleanConf
3218+
.createWithDefault(false)
3219+
32133220
/**
32143221
* Holds information about keys that have been deprecated.
32153222
*
@@ -3908,6 +3915,9 @@ class SQLConf extends Serializable with Logging {
39083915

39093916
def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)
39103917

3918+
def readPartitionWithSubdirectoryEnabled: Boolean =
3919+
getConf(READ_PARTITION_WITH_SUBDIRECTORY_ENABLED)
3920+
39113921
/** ********************** SQLConf functionality methods ************ */
39123922

39133923
/** Set Spark SQL configuration properties. */

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class InMemoryFileIndex(
5959
override val rootPaths =
6060
rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf))
6161

62+
val readPartitionWithSubdirectoryEnabled =
63+
sparkSession.sessionState.conf.readPartitionWithSubdirectoryEnabled
64+
6265
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
6366
@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _
6467
@volatile private var cachedPartitionSpec: PartitionSpec = _
@@ -94,10 +97,23 @@ class InMemoryFileIndex(
9497
val files = listLeafFiles(rootPaths)
9598
cachedLeafFiles =
9699
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
97-
cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
100+
cachedLeafDirToChildrenFiles =
101+
if (readPartitionWithSubdirectoryEnabled) {
102+
files.toArray.groupBy(file => getRootPathsLeafDir(file.getPath.getParent))
103+
} else {
104+
files.toArray.groupBy(_.getPath.getParent)
105+
}
98106
cachedPartitionSpec = null
99107
}
100108

109+
private def getRootPathsLeafDir(path: Path): Path = {
110+
if (rootPaths.contains(path)) {
111+
path
112+
} else {
113+
getRootPathsLeafDir(path.getParent)
114+
}
115+
}
116+
101117
override def equals(other: Any): Boolean = other match {
102118
case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
103119
case _ => false

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,22 @@ class FileIndexSuite extends SharedSparkSession {
520520
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue)
521521
}
522522
}
523+
524+
test("SPARK-28098 - supporting read partitioned Hive tables with subdirectories") {
525+
withTempPath { dir =>
526+
spark
527+
.range(2)
528+
.select(col("id").as("p"), col("id"))
529+
.write
530+
.partitionBy("p")
531+
.orc(s"${dir.getAbsolutePath}/sub1/sub2")
532+
val path = new Path(dir.getAbsolutePath)
533+
val fileIndex = new InMemoryFileIndex(spark, Seq(path), Map.empty, None)
534+
val partitionValues = fileIndex.partitionSpec().partitions.map(_.values)
535+
assert(partitionValues.length == 2 && partitionValues(0).numFields == 1 &&
536+
partitionValues(1).numFields == 1)
537+
}
538+
}
523539
}
524540

525541
object DeletionRaceFileSystem {

0 commit comments

Comments
 (0)