diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index d259d6a706d7..f9f5911b7e86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils} import org.apache.spark.util.Utils object SessionCatalog { - val DEFAULT_DATABASE = "default" + val DEFAULT_DATABASE = SQLConf.get.defaultDatabase } /** @@ -67,8 +67,8 @@ class SessionCatalog( parser: ParserInterface, functionResourceLoader: FunctionResourceLoader, cacheSize: Int = SQLConf.get.tableRelationCacheSize, - cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging { - import SessionCatalog._ + cacheTTL: Long = SQLConf.get.metadataCacheTTL, + defaultDatabase: String = SQLConf.get.defaultDatabase) extends SQLConfHelper with Logging { import CatalogTypes.TablePartitionSpec // For testing only. @@ -86,7 +86,8 @@ class SessionCatalog( new CatalystSqlParser(), DummyFunctionResourceLoader, conf.tableRelationCacheSize, - conf.metadataCacheTTL) + conf.metadataCacheTTL, + conf.defaultDatabase) } // For testing only. @@ -127,7 +128,7 @@ class SessionCatalog( // check whether the temporary view or function exists, then, if not, operate on // the corresponding item in the current database. @GuardedBy("this") - protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE) + protected var currentDb: String = formatDatabaseName(defaultDatabase) private val validNameFormat = "([\\w_]+)".r @@ -262,7 +263,7 @@ class SessionCatalog( def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { val dbName = formatDatabaseName(db) - if (dbName == DEFAULT_DATABASE) { + if (dbName == defaultDatabase) { throw QueryCompilationErrors.cannotDropDefaultDatabaseError } if (cascade && databaseExists(dbName)) { @@ -1696,15 +1697,15 @@ class SessionCatalog( * This is mainly used for tests. */ def reset(): Unit = synchronized { - setCurrentDatabase(DEFAULT_DATABASE) - externalCatalog.setCurrentDatabase(DEFAULT_DATABASE) - listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db => + setCurrentDatabase(defaultDatabase) + externalCatalog.setCurrentDatabase(defaultDatabase) + listDatabases().filter(_ != defaultDatabase).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } - listTables(DEFAULT_DATABASE).foreach { table => + listTables(defaultDatabase).foreach { table => dropTable(table, ignoreIfNotExists = false, purge = false) } - listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func => + listFunctions(defaultDatabase).map(_._1).foreach { func => if (func.database.isDefined) { dropFunction(func, ignoreIfNotExists = false) } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9d09715d2593..ae2900c7b452 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3849,6 +3849,8 @@ class SQLConf extends Serializable with Logging { def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS) + def defaultDatabase: String = getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index bfefca4e2eba..9b0de0d67027 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.util.Locale import java.util.concurrent.TimeUnit +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.util.Utils @@ -37,6 +38,13 @@ object StaticSQLConf { .stringConf .createWithDefault(Utils.resolveURI("spark-warehouse").toString) + val CATALOG_DEFAULT_DATABASE = + buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase") + .doc("The default database for session catalog.") + .version("3.2.0") + .stringConf + .createWithDefault("default") + val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .version("2.0.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 3f676bec9ff0..1a39e0486851 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -134,13 +134,18 @@ private[sql] class SharedState( val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( SharedState.externalCatalogClassName(conf), conf, hadoopConf) - val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, - "default database", - CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), - Map()) // Create default database if it doesn't exist + // If database name not equals 'default', throw exception if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { + if ("default" != SessionCatalog.DEFAULT_DATABASE) { + throw new SparkException(s"Default catalog database '${SessionCatalog.DEFAULT_DATABASE}' " + + s"not exist, please change default database to 'default' and create it first.") + } + val defaultDbDefinition = CatalogDatabase( + SessionCatalog.DEFAULT_DATABASE, + "default database", + CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), + Map()) // There may be another Spark application creating default database at the same time, here we // set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception. externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 7067b65a6232..3fa2ec4f59ee 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -611,4 +611,19 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { Seq("--conf", s"${BUILTIN_HIVE_VERSION.key}=$builtinHiveVersion"))( s"set ${BUILTIN_HIVE_VERSION.key};" -> builtinHiveVersion, "SET -v;" -> builtinHiveVersion) } + + test("SPARK-35242: Support change catalog default database for spark") { + // Create db and table first + runCliWithin(2.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))( + "create database spark_35242;" -> "", + "use spark_35242;" -> "", + "CREATE TABLE spark_test(key INT, val STRING);" -> "") + + // Set default db + runCliWithin(2.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}", + "--conf", s"${StaticSQLConf.CATALOG_DEFAULT_DATABASE.key}=spark_35242"))( + "show tables;" -> "spark_test") + } }