Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -483,35 +483,42 @@ case class DataSource(

object DataSource {

private val jdbc = classOf[JdbcRelationProvider].getCanonicalName
private val json = classOf[JsonFileFormat].getCanonicalName
private val parquet = classOf[ParquetFileFormat].getCanonicalName
private val csv = classOf[CSVFileFormat].getCanonicalName
private val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
private val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"

/** A map to maintain backward compatibility in case we move data sources around. */
private val backwardCompatibilityMap: Map[String, String] = {
val jdbc = classOf[JdbcRelationProvider].getCanonicalName
val json = classOf[JsonFileFormat].getCanonicalName
val parquet = classOf[ParquetFileFormat].getCanonicalName
val csv = classOf[CSVFileFormat].getCanonicalName
val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
"org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
"org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
"org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
"org.apache.spark.sql.json" -> json,
"org.apache.spark.sql.json.DefaultSource" -> json,
"org.apache.spark.sql.execution.datasources.json" -> json,
"org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
"org.apache.spark.sql.parquet" -> parquet,
"org.apache.spark.sql.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.execution.datasources.parquet" -> parquet,
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
"org.apache.spark.sql.hive.orc" -> orc,
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv
)
}
private val backwardCompatibilityMap: Map[String, String] = Map(
"org.apache.spark.sql.jdbc" -> jdbc,
"org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
"org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
"org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
"org.apache.spark.sql.json" -> json,
"org.apache.spark.sql.json.DefaultSource" -> json,
"org.apache.spark.sql.execution.datasources.json" -> json,
"org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
"org.apache.spark.sql.parquet" -> parquet,
"org.apache.spark.sql.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.execution.datasources.parquet" -> parquet,
"org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
"org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
"org.apache.spark.sql.hive.orc" -> orc,
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv
)

private val builtinShortNamesMap: Map[String, String] = Map(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, it is nicer if we explain why this one is needed with a small comment about why the shorten names of internal datasources should be mapped to fully qualified names.

"jdbc" -> jdbc,
"json" -> json,
"parquet" -> parquet,
"csv" -> csv,
"libsvm" -> libsvm,
"orc" -> orc
)

/**
* Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
Expand All @@ -523,7 +530,8 @@ object DataSource {

/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
val provider1 = builtinShortNamesMap.getOrElse(provider,
backwardCompatibilityMap.getOrElse(provider, provider))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe combine both builtinShortNamesMap and backwardCompatibilityMap and use a single getOrElse? It seems probably confusing to read a bit.

Copy link
Member

@HyukjinKwon HyukjinKwon May 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I guess these should be case-insensitive for shorten names.

 ./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.5.0
scala> spark.range(1).write.format("Csv").save("/tmp/abc")
java.lang.RuntimeException: Multiple sources found for Csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15), please specify the fully qualified class name.
  at scala.sys.package$.error(package.scala:27)
  ...
 ./bin/spark-shell
scala> spark.range(1).write.format("Csv").save("/tmp/abc1")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, short names should be case insensitive

val provider2 = s"$provider1.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
Expand Down