Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,12 @@
],
"sqlState" : "42K01"
},
"DATA_SOURCE_ALREADY_EXISTS" : {
"message" : [
"Data source '<provider>' already exists. Please choose a different name for the new data source."
],
"sqlState" : "42710"
},
"DATA_SOURCE_NOT_EXIST" : {
"message" : [
"Data source '<provider>' not found. Please make sure the data source is registered."
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,12 @@ For more details see [DATATYPE_MISMATCH](sql-error-conditions-datatype-mismatch-

DataType `<type>` requires a length parameter, for example `<type>`(10). Please specify the length.

### DATA_SOURCE_ALREADY_EXISTS

[SQLSTATE: 42710](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)

Data source '`<provider>`' already exists. Please choose a different name for the new data source.

### DATA_SOURCE_NOT_EXIST

[SQLSTATE: 42704](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3856,6 +3856,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
"reason" -> reason))
}

def dataSourceAlreadyExists(name: String): Throwable = {
new AnalysisException(
errorClass = "DATA_SOURCE_ALREADY_EXISTS",
messageParameters = Map("provider" -> name))
}

def dataSourceDoesNotExist(name: String): Throwable = {
new AnalysisException(
errorClass = "DATA_SOURCE_NOT_EXIST",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql

import org.apache.spark.SparkClassNotFoundException
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.DataSourceManager
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceManager}
import org.apache.spark.sql.execution.python.UserDefinedPythonDataSource
import org.apache.spark.sql.internal.SQLConf

/**
* Functions for registering user-defined data sources.
Expand All @@ -43,6 +46,34 @@ private[sql] class DataSourceRegistration private[sql] (dataSourceManager: DataS
| pythonExec: ${dataSource.dataSourceCls.pythonExec}
""".stripMargin)

checkDataSourceExists(name)

dataSourceManager.registerDataSource(name, dataSource)
}

/**
* Checks if the specified data source exists.
*
* This method allows for user-defined data sources to be registered even if they
* have the same name as an existing data source in the registry. However, if the
* data source can be successfully loaded and is not a user-defined one, an error
* is thrown to prevent lookup errors with built-in or Scala/Java data sources.
*/
private def checkDataSourceExists(name: String): Unit = {
// Allow re-registration of user-defined data sources.
// TODO(SPARK-46616): disallow re-registration of statically registered data sources.
if (dataSourceManager.dataSourceExists(name)) return

try {
DataSource.lookupDataSource(name, SQLConf.get)
throw QueryCompilationErrors.dataSourceAlreadyExists(name)
} catch {
case e: SparkClassNotFoundException if e.getErrorClass == "DATA_SOURCE_NOT_FOUND" => // OK
case _: Throwable =>
// If there are other errors when resolving the data source, it's unclear whether
// it's safe to proceed. To prevent potential lookup errors, treat it as an existing
// data source and prevent re-registration.
throw QueryCompilationErrors.dataSourceAlreadyExists(name)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,33 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession {
}
}

test("SPARK-46522: data source name conflicts") {
assume(shouldTestPandasUDFs)
val dataSourceScript =
s"""
|from pyspark.sql.datasource import DataSource
|
|class $dataSourceName(DataSource):
| ...
|""".stripMargin
val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript)
Seq(
"text", "json", "csv", "avro", "orc", "parquet", "jdbc",
"binaryFile", "xml", "kafka", "noop",
"org.apache.spark.sql.test",
"org.apache.spark.sql.hive.orc"
).foreach { provider =>
withClue(s"Data source: $provider") {
checkError(
exception = intercept[AnalysisException] {
spark.dataSource.registerPython(provider, dataSource)
},
errorClass = "DATA_SOURCE_ALREADY_EXISTS",
parameters = Map("provider" -> provider))
}
}
}

test("reader not implemented") {
assume(shouldTestPandasUDFs)
val dataSourceScript =
Expand Down