From 47854e8681564750af4efbe93722a218c5638729 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 16 Aug 2019 16:28:13 +0800 Subject: [PATCH] fix --- .../datasources/v2/FileDataSourceV2.scala | 7 +++--- .../spark/sql/FileBasedDataSourceSuite.scala | 24 +++++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index bcb10ae5999fc..ac786bbaac6d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -43,11 +43,10 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = { val objectMapper = new ObjectMapper() - Option(map.get("paths")).map { pathStr => + val paths = Option(map.get("paths")).map { pathStr => objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq - }.getOrElse { - Option(map.get("path")).toSeq - } + }.getOrElse(Seq.empty) + paths ++ Option(map.get("path")).toSeq } protected def getTableName(paths: Seq[String]): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 51e26d42812ce..b1bde9098e096 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -29,6 +29,9 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -707,6 +710,27 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo } } } + + test("File table location should include both values of option `path` and `paths`") { + withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { + withTempPaths(3) { paths => + paths.zipWithIndex.foreach { case (path, index) => + Seq(index).toDF("a").write.mode("overwrite").parquet(path.getCanonicalPath) + } + val df = spark + .read + .option("path", paths.head.getCanonicalPath) + .parquet(paths(1).getCanonicalPath, paths(2).getCanonicalPath) + df.queryExecution.optimizedPlan match { + case PhysicalOperation(_, _, DataSourceV2Relation(table: ParquetTable, _, _)) => + assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet) + case _ => + throw new AnalysisException("Can not match ParquetTable in the query.") + } + checkAnswer(df, Seq(0, 1, 2).map(Row(_))) + } + } + } } object TestingUDT {