Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,13 @@ object DataSource extends Logging {

/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
val customBackwardCompatibilityMap =
conf.getAllConfs
.filter(_._1.startsWith("spark.sql.datasource.map"))
.map{ case (k, v) => (k.replaceFirst("^spark.sql.datasource.map.", ""), v) }
val compatibilityMap = backwardCompatibilityMap ++ customBackwardCompatibilityMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so at this point we are leaving com.databricks.spark.avro -> internal avro as the default and users have to set it back to com.databricks.spark.avro or do they set it empty? Although if set to empty I think it will return empty below which will cause an issue.

We should have a test case for empty and perhaps have a check for it below in that case.

What about documentation? Is there a jira for documenting all avro stuff? If we do leave it as default we definitely want a release note with change in behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answering one of my own questions found avro docs here: #22121

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is merged, we can remove the internal Avro mapping code and put that into documents before branch cut.

so at this point we are leaving com.databricks.spark.avro -> internal avro as the default and users have to set it back to com.databricks.spark.avro or do they set it empty?

This will be better than another option like the following avro specific option.

val ENABLE_AVRO_BACKWARD_COMPATIBILITY = 
  buildConf("spark.sql.avro.backwardCompatibility")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same concern as @tgravescs . It seems tricky to unset the default mapping.

For example, if by default we map com.databricks.spark.avro to internal avro, then to unset it we have to set
spark.sql.datasource.map.com.databricks.spark.avro -> com.databricks.spark.avro .

Currently we only have to deal with Avro and CSV, so I think it is ok to have one single straightforward configuration like #22133 proposed.


val provider1 = compatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcFileFormat].getCanonicalName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,28 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext {
}
assert(error.getMessage.contains("Failed to find data source: asfdwefasdfasdf."))
}

test("support custom mapping for data source names") {
val csv = classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat]

// Map a new data source name to a built-in data source
withSQLConf("spark.sql.datasource.map.myDatasource" -> csv.getCanonicalName) {
assert(getProvidingClass("myDatasource") === csv)
}

// Map a existing built-in data source name to new data source
val testDataSource = classOf[TestDataSource]
withSQLConf(
"spark.sql.datasource.map.org.apache.spark.sql.avro" -> testDataSource.getCanonicalName,
"spark.sql.datasource.map.com.databricks.spark.csv" -> testDataSource.getCanonicalName,
"spark.sql.datasource.map.com.databricks.spark.avro" -> testDataSource.getCanonicalName) {
assert(getProvidingClass("org.apache.spark.sql.avro") === testDataSource)
assert(getProvidingClass("com.databricks.spark.csv") === testDataSource)
assert(getProvidingClass("com.databricks.spark.avro") === testDataSource)
}
}
}

class TestDataSource extends DataSourceRegister {
override def shortName(): String = "test"
}