diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 5694b2c9d0f9..793b2b94dd52 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -156,6 +156,11 @@ ], "sqlState" : "22008" }, + "DEFAULT_DATABASE_NOT_EXISTS" : { + "message" : [ + "Default database does not exist, please create it first or change default database to 'default'." + ] + }, "DIVIDE_BY_ZERO" : { "message" : [ "Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set to \"false\" to bypass this error." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 1ada2ffa4fc1..cc2ba8ac7e4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -68,7 +68,8 @@ class SessionCatalog( functionResourceLoader: FunctionResourceLoader, functionExpressionBuilder: FunctionExpressionBuilder, cacheSize: Int = SQLConf.get.tableRelationCacheSize, - cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging { + cacheTTL: Long = SQLConf.get.metadataCacheTTL, + defaultDatabase: String = SQLConf.get.defaultDatabase) extends SQLConfHelper with Logging { import SessionCatalog._ import CatalogTypes.TablePartitionSpec @@ -88,7 +89,8 @@ class SessionCatalog( DummyFunctionResourceLoader, DummyFunctionExpressionBuilder, conf.tableRelationCacheSize, - conf.metadataCacheTTL) + conf.metadataCacheTTL, + conf.defaultDatabase) } // For testing only. @@ -129,7 +131,7 @@ class SessionCatalog( // check whether the temporary view or function exists, then, if not, operate on // the corresponding item in the current database. @GuardedBy("this") - protected var currentDb: String = format(DEFAULT_DATABASE) + protected var currentDb: String = format(defaultDatabase) private val validNameFormat = "([\\w_]+)".r diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 62c0772f8658..cf9dd7fdf476 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -130,7 +130,7 @@ class CatalogManager( _currentNamespace = None // Reset the current database of v1 `SessionCatalog` when switching current catalog, so that // when we switch back to session catalog, the current namespace definitely is ["default"]. - v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE) + v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } @@ -144,7 +144,7 @@ class CatalogManager( catalogs.clear() _currentNamespace = None _currentCatalogName = None - v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE) + v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 1a93014ecff3..7c7561b3a71c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1932,6 +1932,14 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast") } + def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { + new SparkException( + errorClass = "DEFAULT_DATABASE_NOT_EXISTS", + messageParameters = Map("defaultDatabase" -> defaultDatabase), + cause = null + ) + } + def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB: String): Throwable = { new SparkException( s""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bc6700a3b561..626c30cb9c2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4705,6 +4705,8 @@ class SQLConf extends Serializable with Logging { def errorMessageFormat: ErrorMessageFormat.Value = ErrorMessageFormat.withName(getConf(SQLConf.ERROR_MESSAGE_FORMAT)) + def defaultDatabase: String = getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 3be02f69f232..aaeac8ce6fce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.util.Locale import java.util.concurrent.TimeUnit +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.util.Utils @@ -37,6 +38,13 @@ object StaticSQLConf { .stringConf .createWithDefault(Utils.resolveURI("spark-warehouse").toString) + val CATALOG_DEFAULT_DATABASE = + buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase") + .doc("The default database for session catalog.") + .version("3.4.0") + .stringConf + .createWithDefault("default") + val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .version("2.0.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index efbc9dd75589..b9afe71d243e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -43,7 +44,7 @@ class V2SessionCatalog(catalog: SessionCatalog) extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper { import V2SessionCatalog._ - override val defaultNamespace: Array[String] = Array("default") + override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase) override def name: String = CatalogManager.SESSION_CATALOG_NAME diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index f6b748d24245..92c3ec888d6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -148,13 +148,19 @@ private[sql] class SharedState( val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( SharedState.externalCatalogClassName(conf), conf, hadoopConf) - val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, - "default database", - CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), - Map()) // Create default database if it doesn't exist - if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { + // If database name not equals 'default', throw exception + if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { + if (!SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase)) { + throw QueryExecutionErrors.defaultDatabaseNotExistsError( + SQLConf.get.defaultDatabase + ) + } + val defaultDbDefinition = CatalogDatabase( + SQLConf.get.defaultDatabase, + "default database", + CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), + Map()) // There may be another Spark application creating default database at the same time, here we // set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception. externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 6bbc26bc8caa..11a602e326cf 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -772,4 +772,19 @@ class CliSuite extends SparkFunSuite { } } // scalastyle:on line.size.limit + + test("SPARK-35242: Support change catalog default database for spark") { + // Create db and table first + runCliWithin(2.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))( + "create database spark_35242;" -> "", + "use spark_35242;" -> "", + "CREATE TABLE spark_test(key INT, val STRING);" -> "") + + // Set default db + runCliWithin(2.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}", + "--conf", s"${StaticSQLConf.CATALOG_DEFAULT_DATABASE.key}=spark_35242"))( + "show tables;" -> "spark_test") + } }