From 23940cea4fe2d7eb424258011bd4e768a93d0836 Mon Sep 17 00:00:00 2001 From: hongdongdong Date: Thu, 6 May 2021 17:53:08 +0800 Subject: [PATCH 01/20] [SPARK-35242][SQL]support change catalog default database for spark (cherry picked from commit 1fe6c00e5a82298e5bd9c1798c1158f530cb7c9c) --- .../sql/catalyst/catalog/SessionCatalog.scala | 27 ++++++++++--------- .../apache/spark/sql/internal/SQLConf.scala | 2 ++ .../spark/sql/internal/StaticSQLConf.scala | 8 ++++++ .../spark/sql/internal/SharedState.scala | 15 +++++++---- .../sql/hive/thriftserver/CliSuite.scala | 16 ++++++++++- 5 files changed, 49 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 1ada2ffa4fc1..32fc4c7cefa6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils} import org.apache.spark.util.Utils object SessionCatalog { - val DEFAULT_DATABASE = "default" + val DEFAULT_DATABASE = SQLConf.get.defaultDatabase } /** @@ -68,8 +68,8 @@ class SessionCatalog( functionResourceLoader: FunctionResourceLoader, functionExpressionBuilder: FunctionExpressionBuilder, cacheSize: Int = SQLConf.get.tableRelationCacheSize, - cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging { - import SessionCatalog._ + cacheTTL: Long = SQLConf.get.metadataCacheTTL, + defaultDatabase: String = SQLConf.get.defaultDatabase) extends SQLConfHelper with Logging { import CatalogTypes.TablePartitionSpec // For testing only. @@ -88,7 +88,8 @@ class SessionCatalog( DummyFunctionResourceLoader, DummyFunctionExpressionBuilder, conf.tableRelationCacheSize, - conf.metadataCacheTTL) + conf.metadataCacheTTL, + conf.defaultDatabase) } // For testing only. @@ -129,7 +130,7 @@ class SessionCatalog( // check whether the temporary view or function exists, then, if not, operate on // the corresponding item in the current database. @GuardedBy("this") - protected var currentDb: String = format(DEFAULT_DATABASE) + protected var currentDb: String = formatDatabaseName(defaultDatabase) private val validNameFormat = "([\\w_]+)".r @@ -284,8 +285,8 @@ class SessionCatalog( } def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { - val dbName = format(db) - if (dbName == DEFAULT_DATABASE) { + val dbName = formatDatabaseName(db) + if (dbName == defaultDatabase) { throw QueryCompilationErrors.cannotDropDefaultDatabaseError } if (!ignoreIfNotExists) { @@ -1846,17 +1847,17 @@ class SessionCatalog( * This is mainly used for tests. */ def reset(): Unit = synchronized { - setCurrentDatabase(DEFAULT_DATABASE) - externalCatalog.setCurrentDatabase(DEFAULT_DATABASE) - listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db => + setCurrentDatabase(defaultDatabase) + externalCatalog.setCurrentDatabase(defaultDatabase) + listDatabases().filter(_ != defaultDatabase).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } - listTables(DEFAULT_DATABASE).foreach { table => + listTables(defaultDatabase).foreach { table => dropTable(table, ignoreIfNotExists = false, purge = false) } // Temp functions are dropped below, we only need to drop permanent functions here. - externalCatalog.listFunctions(DEFAULT_DATABASE, "*").map { f => - FunctionIdentifier(f, Some(DEFAULT_DATABASE)) + externalCatalog.listFunctions(defaultDatabase, "*").map { f => + FunctionIdentifier(f, Some(defaultDatabase)) }.foreach(dropFunction(_, ignoreIfNotExists = false)) clearTempTables() globalTempViewManager.clear() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bc6700a3b561..626c30cb9c2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4705,6 +4705,8 @@ class SQLConf extends Serializable with Logging { def errorMessageFormat: ErrorMessageFormat.Value = ErrorMessageFormat.withName(getConf(SQLConf.ERROR_MESSAGE_FORMAT)) + def defaultDatabase: String = getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 3be02f69f232..10191234cc8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.util.Locale import java.util.concurrent.TimeUnit +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.util.Utils @@ -37,6 +38,13 @@ object StaticSQLConf { .stringConf .createWithDefault(Utils.resolveURI("spark-warehouse").toString) + val CATALOG_DEFAULT_DATABASE = + buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase") + .doc("The default database for session catalog.") + .version("3.2.0") + .stringConf + .createWithDefault("default") + val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .version("2.0.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index f6b748d24245..1d567fcf228c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -148,13 +148,18 @@ private[sql] class SharedState( val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( SharedState.externalCatalogClassName(conf), conf, hadoopConf) - val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, - "default database", - CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), - Map()) // Create default database if it doesn't exist + // If database name not equals 'default', throw exception if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { + if ("default" != SessionCatalog.DEFAULT_DATABASE) { + throw new SparkException(s"Default catalog database '${SessionCatalog.DEFAULT_DATABASE}' " + + s"not exist, please change default database to 'default' and create it first.") + } + val defaultDbDefinition = CatalogDatabase( + SessionCatalog.DEFAULT_DATABASE, + "default database", + CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), + Map()) // There may be another Spark application creating default database at the same time, here we // set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception. externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 6bbc26bc8caa..1c21c748bfb0 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -169,7 +169,6 @@ class CliSuite extends SparkFunSuite { } } } - val process = new ProcessBuilder(command: _*).start() val stdinWriter = new OutputStreamWriter(process.getOutputStream, StandardCharsets.UTF_8) @@ -772,4 +771,19 @@ class CliSuite extends SparkFunSuite { } } // scalastyle:on line.size.limit + + test("SPARK-35242: Support change catalog default database for spark") { + // Create db and table first + runCliWithin(2.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))( + "create database spark_35242;" -> "", + "use spark_35242;" -> "", + "CREATE TABLE spark_test(key INT, val STRING);" -> "") + + // Set default db + runCliWithin(2.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}", + "--conf", s"${StaticSQLConf.CATALOG_DEFAULT_DATABASE.key}=spark_35242"))( + "show tables;" -> "spark_test") + } } From dda39f349c21539cae73ef310d84ec36c9f060fb Mon Sep 17 00:00:00 2001 From: hongdd Date: Fri, 7 May 2021 09:26:21 +0800 Subject: [PATCH 02/20] update format update format (cherry picked from commit 8dbd8bbde58a72f9f66b0870e26e3bd32b520077) --- .../org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 1c21c748bfb0..11a602e326cf 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -169,6 +169,7 @@ class CliSuite extends SparkFunSuite { } } } + val process = new ProcessBuilder(command: _*).start() val stdinWriter = new OutputStreamWriter(process.getOutputStream, StandardCharsets.UTF_8) @@ -777,8 +778,8 @@ class CliSuite extends SparkFunSuite { runCliWithin(2.minute, Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))( "create database spark_35242;" -> "", - "use spark_35242;" -> "", - "CREATE TABLE spark_test(key INT, val STRING);" -> "") + "use spark_35242;" -> "", + "CREATE TABLE spark_test(key INT, val STRING);" -> "") // Set default db runCliWithin(2.minute, From b25e82770dc675bbf6a2e79ba323d284868e72a3 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Fri, 26 Aug 2022 13:38:10 +0200 Subject: [PATCH 03/20] Remove DEFAULT_DATABASE --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 3 --- .../spark/sql/connector/catalog/CatalogManager.scala | 6 +++--- .../org/apache/spark/sql/internal/StaticSQLConf.scala | 2 +- .../spark/sql/connector/catalog/CatalogManagerSuite.scala | 2 +- .../scala/org/apache/spark/sql/internal/SharedState.scala | 8 ++++---- .../scala/org/apache/spark/sql/test/SQLTestUtils.scala | 5 ++--- 6 files changed, 11 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 32fc4c7cefa6..ff9958b0cbf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -47,9 +47,6 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils} import org.apache.spark.util.Utils -object SessionCatalog { - val DEFAULT_DATABASE = SQLConf.get.defaultDatabase -} /** * An internal catalog that is used by a Spark Session. This internal catalog serves as a diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 62c0772f8658..1cfc4734cbb6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} /** * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow @@ -130,7 +130,7 @@ class CatalogManager( _currentNamespace = None // Reset the current database of v1 `SessionCatalog` when switching current catalog, so that // when we switch back to session catalog, the current namespace definitely is ["default"]. - v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE) + v1SessionCatalog.setCurrentDatabase(conf.getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)) } } @@ -144,7 +144,7 @@ class CatalogManager( catalogs.clear() _currentNamespace = None _currentCatalogName = None - v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE) + v1SessionCatalog.setCurrentDatabase(conf.getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 10191234cc8b..aaeac8ce6fce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -41,7 +41,7 @@ object StaticSQLConf { val CATALOG_DEFAULT_DATABASE = buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase") .doc("The default database for session catalog.") - .version("3.2.0") + .version("3.4.0") .stringConf .createWithDefault("default") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala index bfff3ee855e6..e7bc1eb86b1c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala @@ -33,7 +33,7 @@ class CatalogManagerSuite extends SparkFunSuite with SQLHelper { private def createSessionCatalog(): SessionCatalog = { val catalog = new V1InMemoryCatalog() catalog.createDatabase( - CatalogDatabase(SessionCatalog.DEFAULT_DATABASE, "", new URI("fake"), Map.empty), + CatalogDatabase(SQLConf.get.defaultDatabase, "", new URI("fake"), Map.empty), ignoreIfExists = true) new SessionCatalog(catalog, EmptyFunctionRegistry) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 1d567fcf228c..f60cd67fcfba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -150,13 +150,13 @@ private[sql] class SharedState( // Create default database if it doesn't exist // If database name not equals 'default', throw exception - if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { - if ("default" != SessionCatalog.DEFAULT_DATABASE) { - throw new SparkException(s"Default catalog database '${SessionCatalog.DEFAULT_DATABASE}' " + + if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { + if ("default" != SQLConf.get.defaultDatabase) { + throw new SparkException(s"Default catalog database '${SQLConf.get.defaultDatabase}' " + s"not exist, please change default database to 'default' and create it first.") } val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, + SQLConf.get.defaultDatabase, "default database", CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), Map()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index ae425419c540..a70d9367ee84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -36,7 +36,6 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -361,7 +360,7 @@ private[sql] trait SQLTestUtilsBase try f(dbName) finally { if (spark.catalog.currentDatabase == dbName) { - spark.sql(s"USE $DEFAULT_DATABASE") + spark.sql(s"USE ${SQLConf.get.defaultDatabase}") } spark.sql(s"DROP DATABASE $dbName CASCADE") } @@ -375,7 +374,7 @@ private[sql] trait SQLTestUtilsBase dbNames.foreach { name => spark.sql(s"DROP DATABASE IF EXISTS $name CASCADE") } - spark.sql(s"USE $DEFAULT_DATABASE") + spark.sql(s"USE ${SQLConf.get.defaultDatabase}") } } From 51dd8192fa492895de1743555c06ef97c48d8f3f Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Fri, 26 Aug 2022 17:37:34 +0200 Subject: [PATCH 04/20] Fixed the compilation errors --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 4 ++-- .../scala/org/apache/spark/sql/internal/SharedState.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ff9958b0cbf8..0ce681622b3d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -127,7 +127,7 @@ class SessionCatalog( // check whether the temporary view or function exists, then, if not, operate on // the corresponding item in the current database. @GuardedBy("this") - protected var currentDb: String = formatDatabaseName(defaultDatabase) + protected var currentDb: String = format(defaultDatabase) private val validNameFormat = "([\\w_]+)".r @@ -282,7 +282,7 @@ class SessionCatalog( } def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { - val dbName = formatDatabaseName(db) + val dbName = format(db) if (dbName == defaultDatabase) { throw QueryCompilationErrors.cannotDropDefaultDatabaseError } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index f60cd67fcfba..5e48bc75991d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -28,7 +28,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.diagnostic.DiagnosticListener From c77ac761d9690a315ca67bf92c152ca5382d6bdc Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sun, 18 Sep 2022 21:48:29 +0200 Subject: [PATCH 05/20] Set defaultNamespace to SQLConf.get.defaultDatabase --- .../spark/sql/execution/datasources/v2/V2SessionCatalog.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index efbc9dd75589..23775c3ae0db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.connector.V1Function +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -43,7 +44,7 @@ class V2SessionCatalog(catalog: SessionCatalog) extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper { import V2SessionCatalog._ - override val defaultNamespace: Array[String] = Array("default") + override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase) override def name: String = CatalogManager.SESSION_CATALOG_NAME From 0b73b026a55efe051ccef9400c51617a0886bba7 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sun, 18 Sep 2022 23:26:20 +0200 Subject: [PATCH 06/20] Fix the "org.apache.spark.sql.internal.SQLConf is in wrong order relative to org.apache.spark.sql.internal.connector.V1Function" error --- .../spark/sql/execution/datasources/v2/V2SessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 23775c3ae0db..b9afe71d243e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -32,8 +32,8 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap From e939da0b630b743bbee314b328e8a26db6c40e5c Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Mon, 19 Sep 2022 15:59:27 +0200 Subject: [PATCH 07/20] Deny the database drop for "default" and allow for the value of spark.sql.catalog.spark_catalog.defaultDatabase --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0ce681622b3d..c998d3c9656c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -283,7 +283,7 @@ class SessionCatalog( def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { val dbName = format(db) - if (dbName == defaultDatabase) { + if (dbName == "default") { throw QueryCompilationErrors.cannotDropDefaultDatabaseError } if (!ignoreIfNotExists) { From 8afd89a7b88048900aa4c30c0e27c2a4bb14de99 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Tue, 20 Sep 2022 09:19:29 +0200 Subject: [PATCH 08/20] Add val DEFAULT_DATABASE = "default" --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index c998d3c9656c..1560c88580fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -47,6 +47,9 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils} import org.apache.spark.util.Utils +object SessionCatalog { + val DEFAULT_DATABASE = "default" +} /** * An internal catalog that is used by a Spark Session. This internal catalog serves as a @@ -67,6 +70,7 @@ class SessionCatalog( cacheSize: Int = SQLConf.get.tableRelationCacheSize, cacheTTL: Long = SQLConf.get.metadataCacheTTL, defaultDatabase: String = SQLConf.get.defaultDatabase) extends SQLConfHelper with Logging { + import SessionCatalog._ import CatalogTypes.TablePartitionSpec // For testing only. @@ -283,7 +287,7 @@ class SessionCatalog( def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { val dbName = format(db) - if (dbName == "default") { + if (dbName == DEFAULT_DATABASE) { throw QueryCompilationErrors.cannotDropDefaultDatabaseError } if (!ignoreIfNotExists) { @@ -1838,8 +1842,8 @@ class SessionCatalog( // ----------------- /** - * Drop all existing databases (except "default"), tables, partitions and functions, - * and set the current database to "default". + * Drop all existing databases (except defaultDatabase), tables, partitions and functions, + * and set the current database to defaultDatabase. * * This is mainly used for tests. */ From 270b60c8e4a802b9562d297a8dc71195ae8b5c54 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Wed, 21 Sep 2022 13:33:30 +0200 Subject: [PATCH 09/20] Implement the latest feedbacks --- .../sql/catalyst/catalog/SessionCatalog.scala | 16 ++++++++-------- .../sql/connector/catalog/CatalogManager.scala | 6 +++--- .../connector/catalog/CatalogManagerSuite.scala | 2 +- .../apache/spark/sql/internal/SharedState.scala | 4 ++-- .../org/apache/spark/sql/test/SQLTestUtils.scala | 5 +++-- 5 files changed, 17 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 1560c88580fe..cc2ba8ac7e4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -1842,23 +1842,23 @@ class SessionCatalog( // ----------------- /** - * Drop all existing databases (except defaultDatabase), tables, partitions and functions, - * and set the current database to defaultDatabase. + * Drop all existing databases (except "default"), tables, partitions and functions, + * and set the current database to "default". * * This is mainly used for tests. */ def reset(): Unit = synchronized { - setCurrentDatabase(defaultDatabase) - externalCatalog.setCurrentDatabase(defaultDatabase) - listDatabases().filter(_ != defaultDatabase).foreach { db => + setCurrentDatabase(DEFAULT_DATABASE) + externalCatalog.setCurrentDatabase(DEFAULT_DATABASE) + listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } - listTables(defaultDatabase).foreach { table => + listTables(DEFAULT_DATABASE).foreach { table => dropTable(table, ignoreIfNotExists = false, purge = false) } // Temp functions are dropped below, we only need to drop permanent functions here. - externalCatalog.listFunctions(defaultDatabase, "*").map { f => - FunctionIdentifier(f, Some(defaultDatabase)) + externalCatalog.listFunctions(DEFAULT_DATABASE, "*").map { f => + FunctionIdentifier(f, Some(DEFAULT_DATABASE)) }.foreach(dropFunction(_, ignoreIfNotExists = false)) clearTempTables() globalTempViewManager.clear() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 1cfc4734cbb6..cf9dd7fdf476 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.internal.SQLConf /** * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow @@ -130,7 +130,7 @@ class CatalogManager( _currentNamespace = None // Reset the current database of v1 `SessionCatalog` when switching current catalog, so that // when we switch back to session catalog, the current namespace definitely is ["default"]. - v1SessionCatalog.setCurrentDatabase(conf.getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)) + v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } @@ -144,7 +144,7 @@ class CatalogManager( catalogs.clear() _currentNamespace = None _currentCatalogName = None - v1SessionCatalog.setCurrentDatabase(conf.getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)) + v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala index e7bc1eb86b1c..bfff3ee855e6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala @@ -33,7 +33,7 @@ class CatalogManagerSuite extends SparkFunSuite with SQLHelper { private def createSessionCatalog(): SessionCatalog = { val catalog = new V1InMemoryCatalog() catalog.createDatabase( - CatalogDatabase(SQLConf.get.defaultDatabase, "", new URI("fake"), Map.empty), + CatalogDatabase(SessionCatalog.DEFAULT_DATABASE, "", new URI("fake"), Map.empty), ignoreIfExists = true) new SessionCatalog(catalog, EmptyFunctionRegistry) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 5e48bc75991d..acb20f578b36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -151,9 +151,9 @@ private[sql] class SharedState( // Create default database if it doesn't exist // If database name not equals 'default', throw exception if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { - if ("default" != SQLConf.get.defaultDatabase) { + if (SessionCatalog.DEFAULT_DATABASE != SQLConf.get.defaultDatabase) { throw new SparkException(s"Default catalog database '${SQLConf.get.defaultDatabase}' " + - s"not exist, please change default database to 'default' and create it first.") + s"not exist, please create it first or change default database to 'default'.") } val defaultDbDefinition = CatalogDatabase( SQLConf.get.defaultDatabase, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index a70d9367ee84..d9d572d221ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -36,6 +36,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -360,7 +361,7 @@ private[sql] trait SQLTestUtilsBase try f(dbName) finally { if (spark.catalog.currentDatabase == dbName) { - spark.sql(s"USE ${SQLConf.get.defaultDatabase}") + spark.sql(s"USE ${SessionCatalog.DEFAULT_DATABASE}") } spark.sql(s"DROP DATABASE $dbName CASCADE") } @@ -374,7 +375,7 @@ private[sql] trait SQLTestUtilsBase dbNames.foreach { name => spark.sql(s"DROP DATABASE IF EXISTS $name CASCADE") } - spark.sql(s"USE ${SQLConf.get.defaultDatabase}") + spark.sql(s"USE ${SessionCatalog.DEFAULT_DATABASE}") } } From 0d294e1964a18dcd08c4cb2bcd83212a4497e291 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Fri, 23 Sep 2022 02:42:53 +0200 Subject: [PATCH 10/20] Add new DEFAULT_CATALOG_DATABASE_NOT_EXISTS type and implemented the latest feedbacks --- .../src/main/resources/error/error-classes.json | 5 +++++ .../scala/org/apache/spark/SparkException.scala | 17 +++++++++++++++++ .../sql/errors/QueryCompilationErrors.scala | 8 ++++++++ .../apache/spark/sql/internal/SharedState.scala | 12 +++++------- .../apache/spark/sql/test/SQLTestUtils.scala | 6 +++--- 5 files changed, 38 insertions(+), 10 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 5694b2c9d0f9..9f650e34e9d5 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -156,6 +156,11 @@ ], "sqlState" : "22008" }, + "DEFAULT_CATALOG_DATABASE_NOT_EXISTS" : { + "message" : [ + "Default catalog database not exist, please create it first or change default database to 'default'. " + ] + }, "DIVIDE_BY_ZERO" : { "message" : [ "Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set to \"false\" to bypass this error." diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index ff59cfb3455f..5af219b862ae 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -195,6 +195,23 @@ private[spark] class SparkUnsupportedOperationException( override def getErrorSubClass: String = errorSubClass.orNull } +/** + * Default catalog database not exists exception thrown from Spark with an error class. + */ +private[spark] class SparkDefaultCatalogDatabaseNotExistsException( + errorClass: String, + errorSubClass: Option[String] = None, + messageParameters: Array[String], + cause: Throwable = null) + extends IOException( + SparkThrowableHelper.getMessage(errorClass, errorSubClass.orNull, messageParameters)) + with SparkThrowable { + + override def getMessageParameters: Array[String] = messageParameters + override def getErrorClass: String = errorClass + override def getErrorSubClass: String = errorSubClass.orNull +} + /** * Class not found exception thrown from Spark with an error class. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index af39d8860e59..780d80e2c7c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -21,6 +21,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path +import org.apache.spark.SparkDefaultCatalogDatabaseNotExistsException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex} @@ -685,6 +686,13 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { new AnalysisException("Can not drop default database") } + def defaultCatalogDatabaseNotExistsError(defaultDatabase: String): Throwable = { + new SparkDefaultCatalogDatabaseNotExistsException( + errorClass = "DEFAULT_CATALOG_DATABASE_NOT_EXISTS", + messageParameters = Array(defaultDatabase), + ) + } + def cannotUsePreservedDatabaseAsCurrentDatabaseError(database: String): Throwable = { new AnalysisException(s"$database is a system preserved database, you cannot use it as " + "current database. To access global temporary views, you should use qualified name with " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index acb20f578b36..633db2e170c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -21,18 +21,15 @@ import java.net.URL import java.util.UUID import java.util.concurrent.ConcurrentHashMap import javax.annotation.concurrent.GuardedBy - import scala.reflect.ClassTag import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} - -import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.diagnostic.DiagnosticListener -import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab, StreamingQueryStatusStore} @@ -152,8 +149,9 @@ private[sql] class SharedState( // If database name not equals 'default', throw exception if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { if (SessionCatalog.DEFAULT_DATABASE != SQLConf.get.defaultDatabase) { - throw new SparkException(s"Default catalog database '${SQLConf.get.defaultDatabase}' " + - s"not exist, please create it first or change default database to 'default'.") + throw QueryCompilationErrors.defaultCatalogDatabaseNotExistsError( + SQLConf.get.defaultDatabase + ) } val defaultDbDefinition = CatalogDatabase( SQLConf.get.defaultDatabase, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index d9d572d221ce..ae425419c540 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -36,7 +36,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -361,7 +361,7 @@ private[sql] trait SQLTestUtilsBase try f(dbName) finally { if (spark.catalog.currentDatabase == dbName) { - spark.sql(s"USE ${SessionCatalog.DEFAULT_DATABASE}") + spark.sql(s"USE $DEFAULT_DATABASE") } spark.sql(s"DROP DATABASE $dbName CASCADE") } @@ -375,7 +375,7 @@ private[sql] trait SQLTestUtilsBase dbNames.foreach { name => spark.sql(s"DROP DATABASE IF EXISTS $name CASCADE") } - spark.sql(s"USE ${SessionCatalog.DEFAULT_DATABASE}") + spark.sql(s"USE $DEFAULT_DATABASE") } } From 4cf1ea2795dddc139bb16cb517c5a310af02d79e Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Fri, 23 Sep 2022 03:51:15 +0200 Subject: [PATCH 11/20] Attempt to fix the build issues --- core/src/main/scala/org/apache/spark/SparkException.scala | 7 +++---- .../apache/spark/sql/errors/QueryCompilationErrors.scala | 2 +- .../scala/org/apache/spark/sql/internal/SharedState.scala | 3 +++ 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 5af219b862ae..6824bbc20dcd 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -199,10 +199,9 @@ private[spark] class SparkUnsupportedOperationException( * Default catalog database not exists exception thrown from Spark with an error class. */ private[spark] class SparkDefaultCatalogDatabaseNotExistsException( - errorClass: String, - errorSubClass: Option[String] = None, - messageParameters: Array[String], - cause: Throwable = null) + errorClass: String, + errorSubClass: Option[String] = None, + messageParameters: Array[String]) extends IOException( SparkThrowableHelper.getMessage(errorClass, errorSubClass.orNull, messageParameters)) with SparkThrowable { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 780d80e2c7c3..a6505efa9fc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -689,7 +689,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def defaultCatalogDatabaseNotExistsError(defaultDatabase: String): Throwable = { new SparkDefaultCatalogDatabaseNotExistsException( errorClass = "DEFAULT_CATALOG_DATABASE_NOT_EXISTS", - messageParameters = Array(defaultDatabase), + messageParameters = Array(defaultDatabase) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 633db2e170c4..3b13d49e3cff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -21,10 +21,13 @@ import java.net.URL import java.util.UUID import java.util.concurrent.ConcurrentHashMap import javax.annotation.concurrent.GuardedBy + import scala.reflect.ClassTag import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path} + import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog._ From 50b23af6ec64b8ae293d5861f5de4ae8309b863a Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sat, 24 Sep 2022 11:38:30 +0200 Subject: [PATCH 12/20] Second attempt to fix the build issues and rename the exception --- core/src/main/resources/error/error-classes.json | 4 ++-- core/src/main/scala/org/apache/spark/SparkException.scala | 4 ++-- .../apache/spark/sql/errors/QueryCompilationErrors.scala | 8 ++++---- .../scala/org/apache/spark/sql/internal/SharedState.scala | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 9f650e34e9d5..2dedcc264a5b 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -156,9 +156,9 @@ ], "sqlState" : "22008" }, - "DEFAULT_CATALOG_DATABASE_NOT_EXISTS" : { + "DEFAULT_DATABASE_NOT_EXISTS" : { "message" : [ - "Default catalog database not exist, please create it first or change default database to 'default'. " + "Default database not exist, please create it first or change default database to 'default'." ] }, "DIVIDE_BY_ZERO" : { diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 6824bbc20dcd..2854528e0a4b 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -198,11 +198,11 @@ private[spark] class SparkUnsupportedOperationException( /** * Default catalog database not exists exception thrown from Spark with an error class. */ -private[spark] class SparkDefaultCatalogDatabaseNotExistsException( +private[spark] class SparkDefaultDatabaseNotExistsException( errorClass: String, errorSubClass: Option[String] = None, messageParameters: Array[String]) - extends IOException( + extends SparkException( SparkThrowableHelper.getMessage(errorClass, errorSubClass.orNull, messageParameters)) with SparkThrowable { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index a6505efa9fc6..a726f1d985ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path -import org.apache.spark.SparkDefaultCatalogDatabaseNotExistsException +import org.apache.spark.SparkDefaultDatabaseNotExistsException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex} @@ -686,9 +686,9 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { new AnalysisException("Can not drop default database") } - def defaultCatalogDatabaseNotExistsError(defaultDatabase: String): Throwable = { - new SparkDefaultCatalogDatabaseNotExistsException( - errorClass = "DEFAULT_CATALOG_DATABASE_NOT_EXISTS", + def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { + new SparkDefaultDatabaseNotExistsException( + errorClass = "DEFAULT_DATABASE_NOT_EXISTS", messageParameters = Array(defaultDatabase) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 3b13d49e3cff..556331afe0bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -152,7 +152,7 @@ private[sql] class SharedState( // If database name not equals 'default', throw exception if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { if (SessionCatalog.DEFAULT_DATABASE != SQLConf.get.defaultDatabase) { - throw QueryCompilationErrors.defaultCatalogDatabaseNotExistsError( + throw QueryCompilationErrors.defaultDatabaseNotExistsError( SQLConf.get.defaultDatabase ) } From 978f8afb9e6c220cbaeac7db61782b354a1b669b Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sat, 24 Sep 2022 13:40:13 +0200 Subject: [PATCH 13/20] Third attempt to fix the build issues --- .../scala/org/apache/spark/SparkException.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 2854528e0a4b..dadf8b85c87a 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -200,16 +200,12 @@ private[spark] class SparkUnsupportedOperationException( */ private[spark] class SparkDefaultDatabaseNotExistsException( errorClass: String, - errorSubClass: Option[String] = None, - messageParameters: Array[String]) + messageParameters: Array[String], + cause: Throwable = null) extends SparkException( - SparkThrowableHelper.getMessage(errorClass, errorSubClass.orNull, messageParameters)) - with SparkThrowable { - - override def getMessageParameters: Array[String] = messageParameters - override def getErrorClass: String = errorClass - override def getErrorSubClass: String = errorSubClass.orNull -} + errorClass = errorClass, + messageParameters = messageParameters, + cause = cause) /** * Class not found exception thrown from Spark with an error class. From 357176260cf587e7961c8dc6408ae6edb9bdddb0 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sat, 24 Sep 2022 19:29:31 +0200 Subject: [PATCH 14/20] Use "new SparkException" by defaultDatabaseNotExistsError --- .../main/scala/org/apache/spark/SparkException.scala | 12 ------------ .../spark/sql/errors/QueryCompilationErrors.scala | 8 -------- .../spark/sql/errors/QueryExecutionErrors.scala | 7 +++++++ .../org/apache/spark/sql/internal/SharedState.scala | 4 ++-- 4 files changed, 9 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index dadf8b85c87a..ff59cfb3455f 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -195,18 +195,6 @@ private[spark] class SparkUnsupportedOperationException( override def getErrorSubClass: String = errorSubClass.orNull } -/** - * Default catalog database not exists exception thrown from Spark with an error class. - */ -private[spark] class SparkDefaultDatabaseNotExistsException( - errorClass: String, - messageParameters: Array[String], - cause: Throwable = null) - extends SparkException( - errorClass = errorClass, - messageParameters = messageParameters, - cause = cause) - /** * Class not found exception thrown from Spark with an error class. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index a726f1d985ab..af39d8860e59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path -import org.apache.spark.SparkDefaultDatabaseNotExistsException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex} @@ -686,13 +685,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { new AnalysisException("Can not drop default database") } - def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { - new SparkDefaultDatabaseNotExistsException( - errorClass = "DEFAULT_DATABASE_NOT_EXISTS", - messageParameters = Array(defaultDatabase) - ) - } - def cannotUsePreservedDatabaseAsCurrentDatabaseError(database: String): Throwable = { new AnalysisException(s"$database is a system preserved database, you cannot use it as " + "current database. To access global temporary views, you should use qualified name with " + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 1a93014ecff3..0a3514f74175 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1932,6 +1932,13 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast") } + def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { + new SparkException( + errorClass = "DEFAULT_DATABASE_NOT_EXISTS", + messageParameters = Array(defaultDatabase), + cause = null) + } + def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB: String): Throwable = { new SparkException( s""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 556331afe0bc..8ea3512c1d30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -32,7 +32,7 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.diagnostic.DiagnosticListener -import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.streaming.StreamExecution import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab, StreamingQueryStatusStore} @@ -152,7 +152,7 @@ private[sql] class SharedState( // If database name not equals 'default', throw exception if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { if (SessionCatalog.DEFAULT_DATABASE != SQLConf.get.defaultDatabase) { - throw QueryCompilationErrors.defaultDatabaseNotExistsError( + throw QueryExecutionErrors.defaultDatabaseNotExistsError( SQLConf.get.defaultDatabase ) } From 6e94b863eb630910d645e2042ff75c4538a77067 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sat, 24 Sep 2022 20:40:37 +0200 Subject: [PATCH 15/20] Use "new SparkRuntimeException" --- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 0a3514f74175..8f0ea77f6ed6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1933,10 +1933,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { } def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { - new SparkException( + new SparkRuntimeException( errorClass = "DEFAULT_DATABASE_NOT_EXISTS", - messageParameters = Array(defaultDatabase), - cause = null) + messageParameters = Array(defaultDatabase) + ) } def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB: String): Throwable = { From 1fe375b193a3dd2dad792dbe830aa20dcaa8e8a1 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sun, 25 Sep 2022 00:30:54 +0200 Subject: [PATCH 16/20] Feature branch rebase to upstream master --- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 8f0ea77f6ed6..7c7561b3a71c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1933,9 +1933,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { } def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { - new SparkRuntimeException( + new SparkException( errorClass = "DEFAULT_DATABASE_NOT_EXISTS", - messageParameters = Array(defaultDatabase) + messageParameters = Map("defaultDatabase" -> defaultDatabase), + cause = null ) } From 90a2d75e64466eb51e3aaf03e07d042b67ed1e86 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sun, 25 Sep 2022 08:56:43 +0200 Subject: [PATCH 17/20] Add class SparkDefaultDatabaseNotExistsException --- .../scala/org/apache/spark/SparkException.scala | 17 +++++++++++++++++ .../spark/sql/errors/QueryExecutionErrors.scala | 5 ++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index ff59cfb3455f..d440f51caf00 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -195,6 +195,23 @@ private[spark] class SparkUnsupportedOperationException( override def getErrorSubClass: String = errorSubClass.orNull } +/** + * Default database not exists exception thrown from Spark with an error class. + */ +private[spark] class SparkDefaultDatabaseNotExistsException( + errorClass: String, + errorSubClass: Option[String] = None, + messageParameters: Map[String, String], + cause: Throwable = null) + extends SparkException( + SparkThrowableHelper.getMessage(errorClass, errorSubClass.orNull, messageParameters)) + with SparkThrowable { + + override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava + override def getErrorClass: String = errorClass + override def getErrorSubClass: String = errorSubClass.orNull +} + /** * Class not found exception thrown from Spark with an error class. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 7c7561b3a71c..c52ba5cf8c33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1933,10 +1933,9 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { } def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { - new SparkException( + new SparkDefaultDatabaseNotExistsException( errorClass = "DEFAULT_DATABASE_NOT_EXISTS", - messageParameters = Map("defaultDatabase" -> defaultDatabase), - cause = null + messageParameters = Map("defaultDatabase" -> defaultDatabase) ) } From aa2791c0fc0c439a6b876ecce876ff80058e6c9a Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Sun, 25 Sep 2022 14:58:49 +0200 Subject: [PATCH 18/20] Update the error message to: does not exist --- core/src/main/resources/error/error-classes.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 2dedcc264a5b..793b2b94dd52 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -158,7 +158,7 @@ }, "DEFAULT_DATABASE_NOT_EXISTS" : { "message" : [ - "Default database not exist, please create it first or change default database to 'default'." + "Default database does not exist, please create it first or change default database to 'default'." ] }, "DIVIDE_BY_ZERO" : { From ead9bbc9162013a005b6841312047d493022c2ad Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Mon, 26 Sep 2022 11:13:47 +0200 Subject: [PATCH 19/20] Implement cloud-fan's latest fedbacks --- .../scala/org/apache/spark/SparkException.scala | 17 ----------------- .../spark/sql/errors/QueryExecutionErrors.scala | 5 +++-- .../apache/spark/sql/internal/SharedState.scala | 2 +- 3 files changed, 4 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index d440f51caf00..ff59cfb3455f 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -195,23 +195,6 @@ private[spark] class SparkUnsupportedOperationException( override def getErrorSubClass: String = errorSubClass.orNull } -/** - * Default database not exists exception thrown from Spark with an error class. - */ -private[spark] class SparkDefaultDatabaseNotExistsException( - errorClass: String, - errorSubClass: Option[String] = None, - messageParameters: Map[String, String], - cause: Throwable = null) - extends SparkException( - SparkThrowableHelper.getMessage(errorClass, errorSubClass.orNull, messageParameters)) - with SparkThrowable { - - override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava - override def getErrorClass: String = errorClass - override def getErrorSubClass: String = errorSubClass.orNull -} - /** * Class not found exception thrown from Spark with an error class. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index c52ba5cf8c33..7c7561b3a71c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1933,9 +1933,10 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { } def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { - new SparkDefaultDatabaseNotExistsException( + new SparkException( errorClass = "DEFAULT_DATABASE_NOT_EXISTS", - messageParameters = Map("defaultDatabase" -> defaultDatabase) + messageParameters = Map("defaultDatabase" -> defaultDatabase), + cause = null ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 8ea3512c1d30..d8caff4073a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -151,7 +151,7 @@ private[sql] class SharedState( // Create default database if it doesn't exist // If database name not equals 'default', throw exception if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { - if (SessionCatalog.DEFAULT_DATABASE != SQLConf.get.defaultDatabase) { + if (SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase)) { throw QueryExecutionErrors.defaultDatabaseNotExistsError( SQLConf.get.defaultDatabase ) From 6b00a916c292d138414587fe122e7c5cdaf59610 Mon Sep 17 00:00:00 2001 From: Gabor Roczei Date: Mon, 26 Sep 2022 15:11:03 +0200 Subject: [PATCH 20/20] ! SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase --- .../main/scala/org/apache/spark/sql/internal/SharedState.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index d8caff4073a2..92c3ec888d6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -151,7 +151,7 @@ private[sql] class SharedState( // Create default database if it doesn't exist // If database name not equals 'default', throw exception if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { - if (SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase)) { + if (!SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase)) { throw QueryExecutionErrors.defaultDatabaseNotExistsError( SQLConf.get.defaultDatabase )