diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 04e740039f005..19c5cb9cecc64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3150,6 +3150,14 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ENABLE_SUBDIRECTORY_SUPPORT_WITH_NON_PARTITIONED_TABLE = + buildConf("spark.sql.nonPartitionedTable.subdirectory.read.enabled") + .doc("When set to true, Spark SQL could read the files of " + + " Non-partitioned hive table from subdirectories under root path of table") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + /** * Holds information about keys that have been deprecated. * @@ -3837,6 +3845,9 @@ class SQLConf extends Serializable with Logging { def legacyIntervalEnabled: Boolean = getConf(LEGACY_INTERVAL_ENABLED) + def nonPartitionedTableSubDirectoryReadSupport: Boolean = + getConf(ENABLE_SUBDIRECTORY_SUPPORT_WITH_NON_PARTITIONED_TABLE) + def decorrelateInnerQueryEnabled: Boolean = getConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e02589e5cad00..90793b7449742 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path import org.apache.spark.SparkException +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} @@ -241,7 +242,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log LogicalRelation( DataSource( sparkSession = sparkSession, - paths = rootPath.toString :: Nil, + paths = getDirectoryPathSeq(rootPath), userSpecifiedSchema = Option(updatedTable.dataSchema), bucketSpec = None, options = options, @@ -277,6 +278,18 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log result.copy(output = newOutput) } + private def getDirectoryPathSeq(rootPath: Path): Seq[String] = { + val enableSupportSubDirectories = + sparkSession.sessionState.conf.nonPartitionedTableSubDirectoryReadSupport + + if (enableSupportSubDirectories) { + val fs = rootPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) + SparkHadoopUtil.get.listLeafDirStatuses(fs, rootPath).map(_.getPath.toString) + } else { + rootPath.toString :: Nil + } + } + private def inferIfNeeded( relation: HiveTableRelation, options: Map[String, String], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 1a6f6843d3911..3440b073fd168 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -363,4 +363,30 @@ class DataSourceWithHiveMetastoreCatalogSuite } }) } + + test("SPARK-28098: allow reader could read files from subdirectories") { + withTempPath(dir => { + withTable("dirTest") { + withSQLConf( + "spark.sql.nonPartitionedTable.subdirectory.read.enabled" -> "true") { + val testData = java.util.Arrays.asList(Row(1), Row(2), Row(3), Row(4), Row(5)) + + val dataFrame = spark.sqlContext + .createDataFrame(testData, StructType(Seq(StructField("val", IntegerType)))) + + dataFrame + .coalesce(1) + .write + .mode(SaveMode.Overwrite) + .format("orc") + .save(s"${dir.getCanonicalPath}/sub1/sub2") + + spark.sql("CREATE EXTERNAL TABLE dirTest (val INT)" + + s" STORED AS ORC LOCATION '${dir.toURI}'") + + checkAnswer(spark.sql("select * from dirTest"), dataFrame) + } + } + }) + } }