Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Aug 16, 2019
1 parent 0ea8db9 commit 47854e8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {

protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
val objectMapper = new ObjectMapper()
Option(map.get("paths")).map { pathStr =>
val paths = Option(map.get("paths")).map { pathStr =>
objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
}.getOrElse {
Option(map.get("path")).toSeq
}
}.getOrElse(Seq.empty)
paths ++ Option(map.get("path")).toSeq
}

protected def getTableName(paths: Seq[String]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.TestingUDT.{IntervalData, IntervalUDT, NullData, NullUDT}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -707,6 +710,27 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
}
}
}

test("File table location should include both values of option `path` and `paths`") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") {
withTempPaths(3) { paths =>
paths.zipWithIndex.foreach { case (path, index) =>
Seq(index).toDF("a").write.mode("overwrite").parquet(path.getCanonicalPath)
}
val df = spark
.read
.option("path", paths.head.getCanonicalPath)
.parquet(paths(1).getCanonicalPath, paths(2).getCanonicalPath)
df.queryExecution.optimizedPlan match {
case PhysicalOperation(_, _, DataSourceV2Relation(table: ParquetTable, _, _)) =>
assert(table.paths.toSet == paths.map(_.getCanonicalPath).toSet)
case _ =>
throw new AnalysisException("Can not match ParquetTable in the query.")
}
checkAnswer(df, Seq(0, 1, 2).map(Row(_)))
}
}
}
}

object TestingUDT {
Expand Down

0 comments on commit 47854e8

Please sign in to comment.