Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@
],
"sqlState" : "22008"
},
"DEFAULT_DATABASE_NOT_EXISTS" : {
"message" : [
"Default database <defaultDatabase> does not exist, please create it first or change default database to 'default'."
]
},
"DIVIDE_BY_ZERO" : {
"message" : [
"Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set <config> to \"false\" to bypass this error."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class SessionCatalog(
functionResourceLoader: FunctionResourceLoader,
functionExpressionBuilder: FunctionExpressionBuilder,
cacheSize: Int = SQLConf.get.tableRelationCacheSize,
cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging {
cacheTTL: Long = SQLConf.get.metadataCacheTTL,
defaultDatabase: String = SQLConf.get.defaultDatabase) extends SQLConfHelper with Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec

Expand All @@ -88,7 +89,8 @@ class SessionCatalog(
DummyFunctionResourceLoader,
DummyFunctionExpressionBuilder,
conf.tableRelationCacheSize,
conf.metadataCacheTTL)
conf.metadataCacheTTL,
conf.defaultDatabase)
}

// For testing only.
Expand Down Expand Up @@ -129,7 +131,7 @@ class SessionCatalog(
// check whether the temporary view or function exists, then, if not, operate on
// the corresponding item in the current database.
@GuardedBy("this")
protected var currentDb: String = format(DEFAULT_DATABASE)
protected var currentDb: String = format(defaultDatabase)

private val validNameFormat = "([\\w_]+)".r

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class CatalogManager(
_currentNamespace = None
// Reset the current database of v1 `SessionCatalog` when switching current catalog, so that
// when we switch back to session catalog, the current namespace definitely is ["default"].
v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE)
v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
}
}

Expand All @@ -144,7 +144,7 @@ class CatalogManager(
catalogs.clear()
_currentNamespace = None
_currentCatalogName = None
v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE)
v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1932,6 +1932,14 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast")
}

def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = {
new SparkException(
errorClass = "DEFAULT_DATABASE_NOT_EXISTS",
messageParameters = Map("defaultDatabase" -> defaultDatabase),
cause = null
)
}

def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB: String): Throwable = {
new SparkException(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4705,6 +4705,8 @@ class SQLConf extends Serializable with Logging {
def errorMessageFormat: ErrorMessageFormat.Value =
ErrorMessageFormat.withName(getConf(SQLConf.ERROR_MESSAGE_FORMAT))

def defaultDatabase: String = getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.util.Utils


Expand All @@ -37,6 +38,13 @@ object StaticSQLConf {
.stringConf
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)

val CATALOG_DEFAULT_DATABASE =
buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase")
.doc("The default database for session catalog.")
.version("3.4.0")
.stringConf
.createWithDefault("default")

val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
.internal()
.version("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -43,7 +44,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper {
import V2SessionCatalog._

override val defaultNamespace: Array[String] = Array("default")
override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase)

override def name: String = CatalogManager.SESSION_CATALOG_NAME

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,19 @@ private[sql] class SharedState(
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(conf), conf, hadoopConf)

val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"default database",
CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
Map())
// Create default database if it doesn't exist
if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
// If database name not equals 'default', throw exception
if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) {
if (!SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase)) {
throw QueryExecutionErrors.defaultDatabaseNotExistsError(
SQLConf.get.defaultDatabase
)
}
val defaultDbDefinition = CatalogDatabase(
SQLConf.get.defaultDatabase,
"default database",
CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
Map())
// There may be another Spark application creating default database at the same time, here we
// set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception.
externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,4 +772,19 @@ class CliSuite extends SparkFunSuite {
}
}
// scalastyle:on line.size.limit

test("SPARK-35242: Support change catalog default database for spark") {
// Create db and table first
runCliWithin(2.minute,
Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))(
"create database spark_35242;" -> "",
"use spark_35242;" -> "",
"CREATE TABLE spark_test(key INT, val STRING);" -> "")

// Set default db
runCliWithin(2.minute,
Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}",
"--conf", s"${StaticSQLConf.CATALOG_DEFAULT_DATABASE.key}=spark_35242"))(
"show tables;" -> "spark_test")
}
}