diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 87e43fe0e38c7..738b13df9a3be 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -875,6 +875,12 @@ ], "sqlState" : "42K01" }, + "DATA_SOURCE_ALREADY_EXISTS" : { + "message" : [ + "Data source '' already exists. Please choose a different name for the new data source." + ], + "sqlState" : "42710" + }, "DATA_SOURCE_NOT_EXIST" : { "message" : [ "Data source '' not found. Please make sure the data source is registered." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 3f4074af9b780..fcb39529653e2 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -474,6 +474,12 @@ For more details see [DATATYPE_MISMATCH](sql-error-conditions-datatype-mismatch- DataType `` requires a length parameter, for example ``(10). Please specify the length. +### DATA_SOURCE_ALREADY_EXISTS + +[SQLSTATE: 42710](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +Data source '``' already exists. Please choose a different name for the new data source. + ### DATA_SOURCE_NOT_EXIST [SQLSTATE: 42704](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index bc847d1c00699..87be47c0b01b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3856,6 +3856,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat "reason" -> reason)) } + def dataSourceAlreadyExists(name: String): Throwable = { + new AnalysisException( + errorClass = "DATA_SOURCE_ALREADY_EXISTS", + messageParameters = Map("provider" -> name)) + } + def dataSourceDoesNotExist(name: String): Throwable = { new AnalysisException( errorClass = "DATA_SOURCE_NOT_EXIST", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataSourceRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataSourceRegistration.scala index 936286eb0da51..2434103f4b806 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataSourceRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataSourceRegistration.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql +import org.apache.spark.SparkClassNotFoundException import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.DataSourceManager +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceManager} import org.apache.spark.sql.execution.python.UserDefinedPythonDataSource +import org.apache.spark.sql.internal.SQLConf /** * Functions for registering user-defined data sources. @@ -43,6 +46,34 @@ private[sql] class DataSourceRegistration private[sql] (dataSourceManager: DataS | pythonExec: ${dataSource.dataSourceCls.pythonExec} """.stripMargin) + checkDataSourceExists(name) + dataSourceManager.registerDataSource(name, dataSource) } + + /** + * Checks if the specified data source exists. + * + * This method allows for user-defined data sources to be registered even if they + * have the same name as an existing data source in the registry. However, if the + * data source can be successfully loaded and is not a user-defined one, an error + * is thrown to prevent lookup errors with built-in or Scala/Java data sources. + */ + private def checkDataSourceExists(name: String): Unit = { + // Allow re-registration of user-defined data sources. + // TODO(SPARK-46616): disallow re-registration of statically registered data sources. + if (dataSourceManager.dataSourceExists(name)) return + + try { + DataSource.lookupDataSource(name, SQLConf.get) + throw QueryCompilationErrors.dataSourceAlreadyExists(name) + } catch { + case e: SparkClassNotFoundException if e.getErrorClass == "DATA_SOURCE_NOT_FOUND" => // OK + case _: Throwable => + // If there are other errors when resolving the data source, it's unclear whether + // it's safe to proceed. To prevent potential lookup errors, treat it as an existing + // data source and prevent re-registration. + throw QueryCompilationErrors.dataSourceAlreadyExists(name) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala index 3e7cd82db8d72..02bccc363e37e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonDataSourceSuite.scala @@ -284,6 +284,33 @@ class PythonDataSourceSuite extends QueryTest with SharedSparkSession { } } + test("SPARK-46522: data source name conflicts") { + assume(shouldTestPandasUDFs) + val dataSourceScript = + s""" + |from pyspark.sql.datasource import DataSource + | + |class $dataSourceName(DataSource): + | ... + |""".stripMargin + val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript) + Seq( + "text", "json", "csv", "avro", "orc", "parquet", "jdbc", + "binaryFile", "xml", "kafka", "noop", + "org.apache.spark.sql.test", + "org.apache.spark.sql.hive.orc" + ).foreach { provider => + withClue(s"Data source: $provider") { + checkError( + exception = intercept[AnalysisException] { + spark.dataSource.registerPython(provider, dataSource) + }, + errorClass = "DATA_SOURCE_ALREADY_EXISTS", + parameters = Map("provider" -> provider)) + } + } + } + test("reader not implemented") { assume(shouldTestPandasUDFs) val dataSourceScript =