Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
import org.apache.spark.util.Utils

object SessionCatalog {
val DEFAULT_DATABASE = "default"
val DEFAULT_DATABASE = SQLConf.get.defaultDatabase
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it look like it's a runtime config. Let's write getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW it's a bit tricky to access the active SQLConf in scala object. Can we read the conf in BaseSessionStateBuilder and pass it to SessionCatalog?

}

/**
Expand All @@ -67,8 +67,8 @@ class SessionCatalog(
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader,
cacheSize: Int = SQLConf.get.tableRelationCacheSize,
cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging {
import SessionCatalog._
cacheTTL: Long = SQLConf.get.metadataCacheTTL,
defaultDatabase: String = SQLConf.get.defaultDatabase) extends SQLConfHelper with Logging {
import CatalogTypes.TablePartitionSpec

// For testing only.
Expand All @@ -86,7 +86,8 @@ class SessionCatalog(
new CatalystSqlParser(),
DummyFunctionResourceLoader,
conf.tableRelationCacheSize,
conf.metadataCacheTTL)
conf.metadataCacheTTL,
conf.defaultDatabase)
}

// For testing only.
Expand Down Expand Up @@ -127,7 +128,7 @@ class SessionCatalog(
// check whether the temporary view or function exists, then, if not, operate on
// the corresponding item in the current database.
@GuardedBy("this")
protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)
protected var currentDb: String = formatDatabaseName(defaultDatabase)

private val validNameFormat = "([\\w_]+)".r

Expand Down Expand Up @@ -262,7 +263,7 @@ class SessionCatalog(

def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = formatDatabaseName(db)
if (dbName == DEFAULT_DATABASE) {
if (dbName == defaultDatabase) {
throw QueryCompilationErrors.cannotDropDefaultDatabaseError
}
if (cascade && databaseExists(dbName)) {
Expand Down Expand Up @@ -1696,15 +1697,15 @@ class SessionCatalog(
* This is mainly used for tests.
*/
def reset(): Unit = synchronized {
setCurrentDatabase(DEFAULT_DATABASE)
externalCatalog.setCurrentDatabase(DEFAULT_DATABASE)
listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db =>
setCurrentDatabase(defaultDatabase)
externalCatalog.setCurrentDatabase(defaultDatabase)
listDatabases().filter(_ != defaultDatabase).foreach { db =>
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
listTables(DEFAULT_DATABASE).foreach { table =>
listTables(defaultDatabase).foreach { table =>
dropTable(table, ignoreIfNotExists = false, purge = false)
}
listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func =>
listFunctions(defaultDatabase).map(_._1).foreach { func =>
if (func.database.isDefined) {
dropFunction(func, ignoreIfNotExists = false)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3849,6 +3849,8 @@ class SQLConf extends Serializable with Logging {

def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)

def defaultDatabase: String = getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.util.Utils


Expand All @@ -37,6 +38,13 @@ object StaticSQLConf {
.stringConf
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)

val CATALOG_DEFAULT_DATABASE =
buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase")
.doc("The default database for session catalog.")
.version("3.2.0")
.stringConf
.createWithDefault("default")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test this in CliSuite? AFAIK, spark actually never got a chance to create the default database if not exists, which will be done during hive metastore client initialization. If it is configured to default2 for example, Spark now will get the opportunity to create and there might be 2 default databases then.

Copy link
Contributor Author

@hddong hddong May 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, the database need exits when not connect to default. Now, spark shell(submit) always need a read permision of default when init.


val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
.internal()
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,18 @@ private[sql] class SharedState(
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(conf), conf, hadoopConf)

val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"default database",
CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
Map())
// Create default database if it doesn't exist
// If database name not equals 'default', throw exception
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
if ("default" != SessionCatalog.DEFAULT_DATABASE) {
throw new SparkException(s"Default catalog database '${SessionCatalog.DEFAULT_DATABASE}' " +
s"not exist, please change default database to 'default' and create it first.")
}
val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"default database",
CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
Map())
// There may be another Spark application creating default database at the same time, here we
// set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception.
externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,4 +611,19 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
Seq("--conf", s"${BUILTIN_HIVE_VERSION.key}=$builtinHiveVersion"))(
s"set ${BUILTIN_HIVE_VERSION.key};" -> builtinHiveVersion, "SET -v;" -> builtinHiveVersion)
}

test("SPARK-35242: Support change catalog default database for spark") {
// Create db and table first
runCliWithin(2.minute,
Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))(
"create database spark_35242;" -> "",
"use spark_35242;" -> "",
"CREATE TABLE spark_test(key INT, val STRING);" -> "")

// Set default db
runCliWithin(2.minute,
Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}",
"--conf", s"${StaticSQLConf.CATALOG_DEFAULT_DATABASE.key}=spark_35242"))(
"show tables;" -> "spark_test")
}
}