From 48c19c8f50db52d91c1b59ca853fb3083ee850ce Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 9 Jul 2016 03:45:16 -0700 Subject: [PATCH 1/6] [SPARK-16459][SQL] Curren database should become `default` when current database is dropped --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 8 +++++--- .../org/apache/spark/sql/execution/command/ddl.scala | 6 +++++- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 2 ++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ffaefeb09aed..ab2650f7fc6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -49,6 +49,8 @@ class SessionCatalog( hadoopConf: Configuration) extends Logging { import CatalogTypes.TablePartitionSpec + val DEFAULT_DATABASE = "default" + // For testing only. def this( externalCatalog: ExternalCatalog, @@ -77,7 +79,7 @@ class SessionCatalog( // the corresponding item in the current database. @GuardedBy("this") protected var currentDb = { - val defaultName = "default" + val defaultName = DEFAULT_DATABASE val defaultDbDefinition = CatalogDatabase(defaultName, "default database", conf.warehousePath, Map()) // Initialize default database if it doesn't already exist @@ -146,7 +148,7 @@ class SessionCatalog( def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { val dbName = formatDatabaseName(db) - if (dbName == "default") { + if (dbName == DEFAULT_DATABASE) { throw new AnalysisException(s"Can not drop default database") } externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade) @@ -878,7 +880,7 @@ class SessionCatalog( * This is mainly used for tests. */ private[sql] def reset(): Unit = synchronized { - val default = "default" + val default = DEFAULT_DATABASE listDatabases().filter(_ != default).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 226f61ef404a..c0369eaaf470 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -93,7 +93,11 @@ case class DropDatabaseCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade) + val catalog = sparkSession.sessionState.catalog + catalog.dropDatabase(databaseName, ifExists, cascade) + if (catalog.getCurrentDatabase == databaseName) { + catalog.setCurrentDatabase(catalog.DEFAULT_DATABASE) + } Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 7d1f1d1e62fc..3d7c2270887e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -154,6 +154,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) + catalog.setCurrentDatabase(dbNameWithoutBackTicks) val expectedLocation = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" assert(db1 == CatalogDatabase( @@ -162,6 +163,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { expectedLocation, Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") + assert(catalog.getCurrentDatabase != dbNameWithoutBackTicks) assert(!catalog.databaseExists(dbNameWithoutBackTicks)) } finally { catalog.reset() From ac5f3ea8397f6ebeed42905c0fed133e45c6b5dd Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 9 Jul 2016 14:07:42 -0700 Subject: [PATCH 2/6] Prevent dropping the current database instead. --- .../sql/catalyst/catalog/SessionCatalog.scala | 6 +++--- .../apache/spark/sql/execution/command/ddl.scala | 6 +----- .../spark/sql/execution/command/DDLSuite.scala | 15 ++++++++++----- .../spark/sql/hive/execution/HiveDDLSuite.scala | 10 ++++++++-- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ab2650f7fc6d..42e4b4501d89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -148,8 +148,8 @@ class SessionCatalog( def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { val dbName = formatDatabaseName(db) - if (dbName == DEFAULT_DATABASE) { - throw new AnalysisException(s"Can not drop default database") + if (dbName == DEFAULT_DATABASE || dbName == getCurrentDatabase) { + throw new AnalysisException(s"Can not drop `${DEFAULT_DATABASE}` or current database") } externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade) } @@ -881,6 +881,7 @@ class SessionCatalog( */ private[sql] def reset(): Unit = synchronized { val default = DEFAULT_DATABASE + setCurrentDatabase(default) listDatabases().filter(_ != default).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } @@ -904,7 +905,6 @@ class SessionCatalog( require(functionBuilder.isDefined, s"built-in function '$f' is missing function builder") functionRegistry.registerFunction(f, expressionInfo.get, functionBuilder.get) } - setCurrentDatabase(default) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index c0369eaaf470..226f61ef404a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -93,11 +93,7 @@ case class DropDatabaseCommand( extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { - val catalog = sparkSession.sessionState.catalog - catalog.dropDatabase(databaseName, ifExists, cascade) - if (catalog.getCurrentDatabase == databaseName) { - catalog.setCurrentDatabase(catalog.DEFAULT_DATABASE) - } + sparkSession.sessionState.catalog.dropDatabase(databaseName, ifExists, cascade) Seq.empty[Row] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 3d7c2270887e..ada6e71379c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -154,7 +154,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - catalog.setCurrentDatabase(dbNameWithoutBackTicks) val expectedLocation = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" assert(db1 == CatalogDatabase( @@ -163,7 +162,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { expectedLocation, Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") - assert(catalog.getCurrentDatabase != dbNameWithoutBackTicks) assert(!catalog.databaseExists(dbNameWithoutBackTicks)) } finally { catalog.reset() @@ -1272,13 +1270,20 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { "WITH SERDEPROPERTIES ('spark.sql.sources.me'='anything')") } - test("drop default database") { + test("drop default or current database") { + sql("CREATE DATABASE temp") + sql("USE temp") + val m = intercept[AnalysisException] { + sql("DROP DATABASE temp") + }.getMessage + assert(m.contains("Can not drop `default` or current database")) + Seq("true", "false").foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { var message = intercept[AnalysisException] { sql("DROP DATABASE default") }.getMessage - assert(message.contains("Can not drop default database")) + assert(message.contains("Can not drop `default` or current database")) message = intercept[AnalysisException] { sql("DROP DATABASE DeFault") @@ -1286,7 +1291,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (caseSensitive == "true") { assert(message.contains("Database 'DeFault' not found")) } else { - assert(message.contains("Can not drop default database")) + assert(message.contains("Can not drop `default` or current database")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 93e50f4ee907..3917f41bc231 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -472,6 +472,7 @@ class HiveDDLSuite sql(s"DROP TABLE $tabName") assert(tmpDir.listFiles.isEmpty) + sql("USE default") sql(s"DROP DATABASE $dbName") assert(!fs.exists(new Path(tmpDir.toString))) } @@ -526,6 +527,7 @@ class HiveDDLSuite assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) } + sql(s"USE default") val sqlDropDatabase = s"DROP DATABASE $dbName ${if (cascade) "CASCADE" else "RESTRICT"}" if (tableExists && !cascade) { val message = intercept[AnalysisException] { @@ -565,14 +567,18 @@ class HiveDDLSuite var message = intercept[AnalysisException] { sql("DROP DATABASE default") }.getMessage - assert(message.contains("Can not drop default database")) + assert(message.contains("Can not drop `default` or current database")) // SQLConf.CASE_SENSITIVE does not affect the result // because the Hive metastore is not case sensitive. message = intercept[AnalysisException] { sql("DROP DATABASE DeFault") }.getMessage - assert(message.contains("Can not drop default database")) + if (caseSensitive == "true") { + assert(message.contains("Can not drop default database")) + } else { + assert(message.contains("Can not drop `default` or current database")) + } } } } From 8aa1c2d8589b62657d004cf97cceaec2ae60a8cd Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 9 Jul 2016 14:58:35 -0700 Subject: [PATCH 3/6] Keep the current behavior and add new error message for dropping current db. --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 6 ++++-- .../apache/spark/sql/execution/command/DDLSuite.scala | 10 ++++++---- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 8 ++------ 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 42e4b4501d89..9c0c4c2b4b2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -148,8 +148,10 @@ class SessionCatalog( def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { val dbName = formatDatabaseName(db) - if (dbName == DEFAULT_DATABASE || dbName == getCurrentDatabase) { - throw new AnalysisException(s"Can not drop `${DEFAULT_DATABASE}` or current database") + if (dbName == DEFAULT_DATABASE) { + throw new AnalysisException(s"Can not drop default database") + } else if (dbName == getCurrentDatabase) { + throw new AnalysisException(s"Can not drop current database `${dbName}`") } externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ada6e71379c2..b4294ed7ff1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1270,20 +1270,22 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { "WITH SERDEPROPERTIES ('spark.sql.sources.me'='anything')") } - test("drop default or current database") { + test("drop current database") { sql("CREATE DATABASE temp") sql("USE temp") val m = intercept[AnalysisException] { sql("DROP DATABASE temp") }.getMessage - assert(m.contains("Can not drop `default` or current database")) + assert(m.contains("Can not drop current database `temp`")) + } + test("drop default database") { Seq("true", "false").foreach { caseSensitive => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { var message = intercept[AnalysisException] { sql("DROP DATABASE default") }.getMessage - assert(message.contains("Can not drop `default` or current database")) + assert(message.contains("Can not drop default database")) message = intercept[AnalysisException] { sql("DROP DATABASE DeFault") @@ -1291,7 +1293,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { if (caseSensitive == "true") { assert(message.contains("Database 'DeFault' not found")) } else { - assert(message.contains("Can not drop `default` or current database")) + assert(message.contains("Can not drop default database")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3917f41bc231..343d7bae98bf 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -567,18 +567,14 @@ class HiveDDLSuite var message = intercept[AnalysisException] { sql("DROP DATABASE default") }.getMessage - assert(message.contains("Can not drop `default` or current database")) + assert(message.contains("Can not drop default database")) // SQLConf.CASE_SENSITIVE does not affect the result // because the Hive metastore is not case sensitive. message = intercept[AnalysisException] { sql("DROP DATABASE DeFault") }.getMessage - if (caseSensitive == "true") { - assert(message.contains("Can not drop default database")) - } else { - assert(message.contains("Can not drop `default` or current database")) - } + assert(message.contains("Can not drop default database")) } } } From 805b2f7f99ddaf15b18887c1ef2d5940442bc167 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 9 Jul 2016 15:59:38 -0700 Subject: [PATCH 4/6] Fix HiveContextCompatibilitySuite not to try drop current db. --- .../apache/spark/sql/hive/HiveContextCompatibilitySuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala index 3aa817470251..57363b7259c6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala @@ -93,6 +93,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac hc.sql("DROP TABLE mee_table") val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) assert(tables2.isEmpty) + hc.sql("USE default") hc.sql("DROP DATABASE mee_db CASCADE") val databases3 = hc.sql("SHOW DATABASES").collect().map(_.getString(0)) assert(databases3.toSeq == Seq("default")) From 1702c7e37f083cf765c7a81a298db582b9790a91 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 10 Jul 2016 11:45:50 -0700 Subject: [PATCH 5/6] Make SessionCatalog object. --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 9c0c4c2b4b2c..3e4f3bf3e1a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -34,6 +34,10 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.util.StringUtils +object SessionCatalog { + val DEFAULT_DATABASE = "default" +} + /** * An internal catalog that is used by a Spark Session. This internal catalog serves as a * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary @@ -47,10 +51,9 @@ class SessionCatalog( functionRegistry: FunctionRegistry, conf: CatalystConf, hadoopConf: Configuration) extends Logging { + import SessionCatalog._ import CatalogTypes.TablePartitionSpec - val DEFAULT_DATABASE = "default" - // For testing only. def this( externalCatalog: ExternalCatalog, From 19a31601c248d239aafcdcba1123c7a7c585c924 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 11 Jul 2016 03:13:36 -0700 Subject: [PATCH 6/6] Remove useless local variable. --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 3e4f3bf3e1a9..d88b5ffc0511 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -885,15 +885,14 @@ class SessionCatalog( * This is mainly used for tests. */ private[sql] def reset(): Unit = synchronized { - val default = DEFAULT_DATABASE - setCurrentDatabase(default) - listDatabases().filter(_ != default).foreach { db => + setCurrentDatabase(DEFAULT_DATABASE) + listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } - listTables(default).foreach { table => + listTables(DEFAULT_DATABASE).foreach { table => dropTable(table, ignoreIfNotExists = false) } - listFunctions(default).map(_._1).foreach { func => + listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func => if (func.database.isDefined) { dropFunction(func, ignoreIfNotExists = false) } else {