From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 01/33] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdb..5a5b71e52dd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da0..5e00546a74c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 02/33] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00a..47962ebe6ef8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c0..61d9dcd37572 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 03/33] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572..3427152b2da0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 7cd839e81f8c5b157c832a16ef5355378cc7b7c0 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 30 Mar 2016 23:52:31 -0700 Subject: [PATCH 04/33] correct the database path. --- .../sql/catalyst/catalog/SessionCatalog.scala | 6 ++-- .../spark/sql/execution/command/ddl.scala | 9 +++++- .../sql/execution/command/DDLSuite.scala | 30 +++++++++++++++---- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 569b99e414c3..4c27c4f69516 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.catalyst.catalog -import java.io.File - import scala.collection.mutable +import org.apache.hadoop.fs.Path + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} @@ -117,7 +117,7 @@ class SessionCatalog( } def getDefaultDBPath(db: String): String = { - System.getProperty("java.io.tmpdir") + File.separator + db + ".db" + new Path(new Path(System.getProperty("java.io.tmpdir")), db + ".db").toString } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 6c2a67f81c50..febd0a23f2bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier @@ -63,13 +65,18 @@ case class CreateDatabase( props: Map[String, String]) extends RunnableCommand { + private def getDBPath(path: String, dbName: String) = + new Path(new Path(path), dbName + ".db").toString + override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog + val databasePath = path.map(getDBPath(_, databaseName)) + .getOrElse(catalog.getDefaultDBPath(databaseName)) catalog.createDatabase( CatalogDatabase( databaseName, comment.getOrElse(""), - path.getOrElse(catalog.getDefaultDBPath(databaseName)), + databasePath, props), ifNotExists) Seq.empty[Row] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 47c9a22acd44..76680ff6e80b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.command -import java.io.File - import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.CatalogDatabase import org.apache.spark.sql.catalyst.parser.ParserUtils._ @@ -51,7 +49,29 @@ class DDLSuite extends QueryTest with SharedSQLContext { assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", + System.getProperty("java.io.tmpdir") + s"$dbNameWithoutBackTicks.db", + Map.empty)) + sql(s"DROP DATABASE $dbName CASCADE") + assert(!catalog.databaseExists(dbNameWithoutBackTicks)) + } + } + } + + test("Create/Drop Database - location") { + val catalog = sqlContext.sessionState.catalog + + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + withDatabase(dbName) { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + + sql(s"CREATE DATABASE $dbName Location '${System.getProperty("java.io.tmpdir")}'") + val db1 = catalog.getDatabase(dbNameWithoutBackTicks) + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, + "", + System.getProperty("java.io.tmpdir") + s"$dbNameWithoutBackTicks.db", Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") assert(!catalog.databaseExists(dbNameWithoutBackTicks)) @@ -71,7 +91,7 @@ class DDLSuite extends QueryTest with SharedSQLContext { assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db", + System.getProperty("java.io.tmpdir") + s"$dbNameWithoutBackTicks.db", Map.empty)) val message = intercept[AnalysisException] { @@ -90,7 +110,7 @@ class DDLSuite extends QueryTest with SharedSQLContext { withDatabase(dbName) { val dbNameWithoutBackTicks = cleanIdentifier(dbName) val location = - System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db" + System.getProperty("java.io.tmpdir") + s"$dbNameWithoutBackTicks.db" sql(s"CREATE DATABASE $dbName") checkAnswer( From a9ddedc6b735bd104ee45674e30486fd27a6f6f6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 31 Mar 2016 01:22:20 -0700 Subject: [PATCH 05/33] fixed the test case. --- .../spark/sql/execution/command/DDLSuite.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 76680ff6e80b..e74d8907eca6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -35,6 +35,10 @@ class DDLSuite extends QueryTest with SharedSQLContext { } } + private def appendTrailingSlash(path: String): String = { + if (!path.endsWith("/")) path + "/" else path + } + test("Create/Drop Database") { val catalog = sqlContext.sessionState.catalog @@ -49,7 +53,7 @@ class DDLSuite extends QueryTest with SharedSQLContext { assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - System.getProperty("java.io.tmpdir") + s"$dbNameWithoutBackTicks.db", + appendTrailingSlash(System.getProperty("java.io.tmpdir")) + s"$dbNameWithoutBackTicks.db", Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") assert(!catalog.databaseExists(dbNameWithoutBackTicks)) @@ -71,7 +75,7 @@ class DDLSuite extends QueryTest with SharedSQLContext { assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - System.getProperty("java.io.tmpdir") + s"$dbNameWithoutBackTicks.db", + appendTrailingSlash(System.getProperty("java.io.tmpdir")) + s"$dbNameWithoutBackTicks.db", Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") assert(!catalog.databaseExists(dbNameWithoutBackTicks)) @@ -91,7 +95,7 @@ class DDLSuite extends QueryTest with SharedSQLContext { assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - System.getProperty("java.io.tmpdir") + s"$dbNameWithoutBackTicks.db", + appendTrailingSlash(System.getProperty("java.io.tmpdir")) + s"$dbNameWithoutBackTicks.db", Map.empty)) val message = intercept[AnalysisException] { @@ -110,7 +114,7 @@ class DDLSuite extends QueryTest with SharedSQLContext { withDatabase(dbName) { val dbNameWithoutBackTicks = cleanIdentifier(dbName) val location = - System.getProperty("java.io.tmpdir") + s"$dbNameWithoutBackTicks.db" + appendTrailingSlash(System.getProperty("java.io.tmpdir")) + s"$dbNameWithoutBackTicks.db" sql(s"CREATE DATABASE $dbName") checkAnswer( From 522a49400945fcd1b30c25d044263552a12fdbed Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 31 Mar 2016 19:25:36 -0700 Subject: [PATCH 06/33] address comments. --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 5 +++-- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 7 +------ .../org/apache/spark/sql/hive/HiveSessionCatalog.scala | 6 +++--- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4c27c4f69516..84ac71205316 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -116,8 +116,9 @@ class SessionCatalog( currentDb = db } - def getDefaultDBPath(db: String): String = { - new Path(new Path(System.getProperty("java.io.tmpdir")), db + ".db").toString + def getDatabasePath(dbName: String, path: Option[String]): String = { + val dbPath = path.getOrElse(System.getProperty("java.io.tmpdir")) + new Path(new Path(dbPath), dbName + ".db").toString } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 244302561ca4..b4735b617c57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -65,18 +65,13 @@ case class CreateDatabase( props: Map[String, String]) extends RunnableCommand { - private def getDBPath(path: String, dbName: String) = - new Path(new Path(path), dbName + ".db").toString - override def run(sqlContext: SQLContext): Seq[Row] = { val catalog = sqlContext.sessionState.catalog - val databasePath = path.map(getDBPath(_, databaseName)) - .getOrElse(catalog.getDefaultDBPath(databaseName)) catalog.createDatabase( CatalogDatabase( databaseName, comment.getOrElse(""), - databasePath, + catalog.getDatabasePath(databaseName, path), props), ifNotExists) Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 1cd783e63a25..392659ec6017 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -62,9 +62,9 @@ class HiveSessionCatalog( // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- - override def getDefaultDBPath(db: String): String = { - val defaultPath = context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) - new Path(new Path(defaultPath), db + ".db").toString + override def getDatabasePath(dbName: String, path: Option[String] = None): String = { + val dbPath = path.getOrElse(context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)) + new Path(new Path(dbPath), dbName + ".db").toString } // Catalog for handling data source tables. TODO: This really doesn't belong here since it is From 16ac0b1a548fedf7f602097ebb4aa1e7ed285515 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 1 Apr 2016 00:02:53 -0700 Subject: [PATCH 07/33] address comments. --- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index b4735b617c57..66b87f2cfead 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.command -import org.apache.hadoop.fs.Path - import org.apache.spark.internal.Logging import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.TableIdentifier @@ -54,7 +52,10 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand { * unless 'ifNotExists' is true. * The syntax of using this command in SQL is: * {{{ - * CREATE DATABASE|SCHEMA [IF NOT EXISTS] database_name + * CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name + * [COMMENT database_comment] + * [LOCATION file_path] + * [WITH DBPROPERTIES (property_name=property_value, ...)]; * }}} */ case class CreateDatabase( From 3c03dbfe0e5192b3a106154f2b322f74cf02d3c8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 5 Apr 2016 19:21:16 -0700 Subject: [PATCH 08/33] Merge branch 'mkdir' into mkdirNew # Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b2b5681f8f2e..a02d0a197c21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -98,6 +98,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") assert(!catalog.databaseExists(dbNameWithoutBackTicks)) + } finally { + catalog.reset() } } } From 8b6f86bbb059699c114c2b5815850c479d00a50c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 5 Apr 2016 19:23:21 -0700 Subject: [PATCH 09/33] style fix. --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index a02d0a197c21..bfba226415a4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -106,13 +106,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("Create/Drop Database - location") { val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") databaseNames.foreach { dbName => try { val dbNameWithoutBackTicks = cleanIdentifier(dbName) - sql(s"CREATE DATABASE $dbName Location '${System.getProperty("java.io.tmpdir")}'") val db1 = catalog.getDatabase(dbNameWithoutBackTicks) assert(db1 == CatalogDatabase( From f666fa287e51d7746296a621091c975f907c6eca Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 7 Apr 2016 22:32:13 -0700 Subject: [PATCH 10/33] added test cases --- .../sql/execution/command/DDLSuite.scala | 39 +++++++++++- .../sql/hive/execution/HiveDDLSuite.scala | 62 +++++++++++++++++++ 2 files changed, 98 insertions(+), 3 deletions(-) create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index bfba226415a4..28a97ccbaa37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -68,6 +68,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { schema = Seq()), ignoreIfExists = false) } + private def dropTable(catalog: SessionCatalog, name: TableIdentifier): Unit = { + catalog.dropTable(name, ignoreIfNotExists = false) + } + private def createTablePartition( catalog: SessionCatalog, spec: TablePartitionSpec, @@ -218,7 +222,38 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } - // TODO: test drop database in restrict mode + test("drop non-empty database in restrict mode") { + val catalog = sqlContext.sessionState.catalog + val dbName = "db1" + sql(s"CREATE DATABASE $dbName") + + // create a table in database + val tableIdent1 = TableIdentifier("tab1", Some(dbName)) + createTable(catalog, tableIdent1) + + // drop a non-empty database in Restrict mode + val message = intercept[AnalysisException] { + sql(s"DROP DATABASE $dbName RESTRICT") + }.getMessage + assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist")) + + dropTable(catalog, tableIdent1) + sql(s"DROP DATABASE $dbName RESTRICT") + } + + test("drop non-empty database in cascade mode") { + val catalog = sqlContext.sessionState.catalog + val dbName = "db1" + sql(s"CREATE DATABASE $dbName") + + // create a table in database + val tableIdent1 = TableIdentifier("tab1", Some(dbName)) + createTable(catalog, tableIdent1) + + // drop a non-empty database in CASCADE mode + dropTable(catalog, tableIdent1) + sql(s"DROP DATABASE $dbName CASCADE") + } test("alter table: rename") { val catalog = sqlContext.sessionState.catalog @@ -343,8 +378,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES") } - // TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext - test("show tables") { withTempTable("show1a", "show2b") { sql( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala new file mode 100644 index 000000000000..de7c9f20b5bb --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.catalog.CatalogDatabase +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class HiveDDLSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { + + override def afterEach(): Unit = { + try { + // drop all databases, tables and functions after each test + sqlContext.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + test("create/drop database - checking directory") { + val catalog = sqlContext.sessionState.catalog + val dbName = "db1" + val path = catalog.getDatabasePath(dbName, None) + val dbPath = new Path(path) + val fs = dbPath.getFileSystem(hiveContext.hiveconf) + // the database directory does not exist + assert (!fs.exists(dbPath)) + + sql("CREATE DATABASE db1") + val db1 = catalog.getDatabase(dbName) + assert(db1 == CatalogDatabase( + dbName, + "", + path, + Map.empty)) + // the database directory was created + assert (fs.exists(dbPath) && fs.isDirectory(dbPath)) + sql("DROP DATABASE db1") + // the database directory was removed + assert (!fs.exists(dbPath)) + } +} \ No newline at end of file From b120b77608c426d293f16c9c5fe5643b145ec131 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 7 Apr 2016 22:33:45 -0700 Subject: [PATCH 11/33] style fix --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index de7c9f20b5bb..5d334cb9ba0f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -59,4 +59,4 @@ class HiveDDLSuite // the database directory was removed assert (!fs.exists(dbPath)) } -} \ No newline at end of file +} From b2cb03daa614116bf01c38a590de2a5c63cf5e01 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 7 Apr 2016 23:46:04 -0700 Subject: [PATCH 12/33] compilation error fix --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5314ec644fcb..167d782b1b22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -116,7 +116,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { try { val dbNameWithoutBackTicks = cleanIdentifier(dbName) sql(s"CREATE DATABASE $dbName Location '${System.getProperty("java.io.tmpdir")}'") - val db1 = catalog.getDatabase(dbNameWithoutBackTicks) + val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 5d334cb9ba0f..70281d47f0ec 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -47,7 +47,7 @@ class HiveDDLSuite assert (!fs.exists(dbPath)) sql("CREATE DATABASE db1") - val db1 = catalog.getDatabase(dbName) + val db1 = catalog.getDatabaseMetadata(dbName) assert(db1 == CatalogDatabase( dbName, "", From 2ba5a73922ff93b7e6788bb93fcd9e4393c6c6ef Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 9 Apr 2016 17:01:11 -0700 Subject: [PATCH 13/33] unable to reproduce it in the local environment and print more info to debug. --- .../spark/sql/hive/HiveSessionCatalog.scala | 6 +++++- .../sql/hive/execution/HiveQuerySuite.scala | 16 +++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 4c4a0d7e26ca..7c730d24a2e8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -76,7 +76,11 @@ private[sql] class HiveSessionCatalog( override def getDatabasePath(dbName: String, path: Option[String] = None): String = { val dbPath = path.getOrElse(context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)) - new Path(new Path(dbPath), dbName + ".db").toString + val returnPath = new Path(new Path(dbPath), dbName + ".db").toString + // scalastyle:off println + println(s"Return of getDatabasePath: $returnPath") + // scalastyle:on println + returnPath } // Catalog for handling data source tables. TODO: This really doesn't belong here since it is diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 0c57ede9ed0a..b204d520a63c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -28,7 +28,6 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkException, SparkFiles} import org.apache.spark.sql.{AnalysisException, DataFrame, Row} -import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin @@ -1181,10 +1180,25 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("current_database with multiple sessions") { sql("create database a") + // TestHiveContex + // scalastyle:off println + println("location of database a:" + hiveCatalog.getDatabase("a").locationUri) + println("list of databases:" + hiveCatalog.listDatabases()) + // scalastyle:on println + sql("select current_database()").show() sql("use a") + sql("select current_database()").show() val s2 = newSession() s2.sql("create database b") + // scalastyle:off println + println("location of database a:" + hiveCatalog.getDatabase("a").locationUri) + println("location of database b:" + hiveCatalog.getDatabase("b").locationUri) + println("list of databases:" + hiveCatalog.listDatabases()) + // scalastyle:on println + sql("select current_database()").show() s2.sql("use b") + sql("select current_database()").show() + s2.sql("select current_database()").show() assert(sql("select current_database()").first() === Row("a")) assert(s2.sql("select current_database()").first() === Row("b")) From 4890c65757b7d64f9a35e5d06dc9acb7be7f73b6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Apr 2016 10:31:05 -0700 Subject: [PATCH 14/33] one more try --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index b204d520a63c..a72e3af7525a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1188,6 +1188,10 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { sql("select current_database()").show() sql("use a") sql("select current_database()").show() + sql("use a") + sql("select current_database()").show() + sql("use a") + sql("select current_database()").show() val s2 = newSession() s2.sql("create database b") // scalastyle:off println From 1618f505c4d99a9e557b7c10a85c70e85d2672cb Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Apr 2016 10:39:16 -0700 Subject: [PATCH 15/33] style fix. --- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 8e1e527046e3..dba7667e5bb6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.hive.execution -import org.scalatest.BeforeAndAfterEach - import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType} @@ -28,7 +27,6 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils - class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { import hiveContext.implicits._ From 924f132d8b2facdf8963488a4141cfbb1700dac7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Apr 2016 14:16:37 -0700 Subject: [PATCH 16/33] print more --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 3 +++ .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 3 +++ .../org/apache/spark/sql/execution/command/commands.scala | 3 +++ .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 3 +++ 4 files changed, 12 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 3d570df80308..8affb313154d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -118,6 +118,9 @@ class SessionCatalog( if (!databaseExists(db)) { throw new AnalysisException(s"cannot set current database to non-existent '$db'") } + // scalastyle:off println + println(s"setCurrentDatabase - from '$currentDb' to '$db'") + // scalastyle:on println currentDb = db } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3da715cdb330..c24922628107 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -65,6 +65,9 @@ class SparkSqlAstBuilder extends AstBuilder { * Create a [[SetDatabaseCommand]] logical plan. */ override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) { + // scalastyle:off println + println("visitUse - db:" + ctx.db.getText) + // scalastyle:on println SetDatabaseCommand(ctx.db.getText) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 5d00c805a6af..b54124b40d69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -522,6 +522,9 @@ case class DescribeFunction( case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { + // scalastyle:off println + println("SetDatabaseCommand - db:" + databaseName) + // scalastyle:on println sqlContext.sessionState.catalog.setCurrentDatabase(databaseName) Seq.empty[Row] } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 505e5c0bb62f..110857661524 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -69,6 +69,9 @@ private[hive] case class CurrentDatabase(ctx: HiveContext) override def foldable: Boolean = true override def nullable: Boolean = false override def eval(input: InternalRow): Any = { + // scalastyle:off println + println("CurrentDatabase - currentDb:" + ctx.sessionState.catalog.getCurrentDatabase) + // scalastyle:on println UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase) } } From 04e1d83ec8bc947bac95734c3d0ea163cdb2c7bc Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Apr 2016 16:42:40 -0700 Subject: [PATCH 17/33] clear the cache!!!! --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index a72e3af7525a..a77a82f0ff6b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1185,12 +1185,17 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { println("location of database a:" + hiveCatalog.getDatabase("a").locationUri) println("list of databases:" + hiveCatalog.listDatabases()) // scalastyle:on println + sql("select current_database()").explain(true) sql("select current_database()").show() sql("use a") + sql("CLEAR CACHE") + sql("select current_database()").explain(true) sql("select current_database()").show() sql("use a") + sql("select current_database()").explain(true) sql("select current_database()").show() sql("use a") + sql("select current_database()").explain(true) sql("select current_database()").show() val s2 = newSession() s2.sql("create database b") From 3ae138b7507f2d924915bd46ace9db68bf072f9e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Apr 2016 22:32:14 -0700 Subject: [PATCH 18/33] code clean and avoid dropping the function current_database when reset the sessionCatalog --- .../sql/catalyst/catalog/SessionCatalog.scala | 10 +++++--- .../spark/sql/execution/SparkSqlParser.scala | 3 --- .../sql/execution/command/commands.scala | 3 --- .../apache/spark/sql/hive/HiveContext.scala | 3 --- .../spark/sql/hive/HiveSessionCatalog.scala | 6 +---- .../sql/hive/execution/HiveQuerySuite.scala | 25 +------------------ 6 files changed, 8 insertions(+), 42 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8affb313154d..17dc99f2e6cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -118,9 +118,6 @@ class SessionCatalog( if (!databaseExists(db)) { throw new AnalysisException(s"cannot set current database to non-existent '$db'") } - // scalastyle:off println - println(s"setCurrentDatabase - from '$currentDb' to '$db'") - // scalastyle:on println currentDb = db } @@ -659,7 +656,12 @@ class SessionCatalog( dropDatabase(db, ignoreIfNotExists = false, cascade = true) } tempTables.clear() - functionRegistry.clear() + // Do not remove the function `current_database`, which is registered in each + // new session of HiveContext. Otherwise, it could load the Hive UDF function + // with the same function name. + functionRegistry.listFunction().filter(_ != "current_database").foreach { f => + functionRegistry.dropFunction(f) + } // restore built-in functions FunctionRegistry.builtin.listFunction().foreach { f => val expressionInfo = FunctionRegistry.builtin.lookupFunction(f) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index c24922628107..3da715cdb330 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -65,9 +65,6 @@ class SparkSqlAstBuilder extends AstBuilder { * Create a [[SetDatabaseCommand]] logical plan. */ override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) { - // scalastyle:off println - println("visitUse - db:" + ctx.db.getText) - // scalastyle:on println SetDatabaseCommand(ctx.db.getText) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index b54124b40d69..5d00c805a6af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -522,9 +522,6 @@ case class DescribeFunction( case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { override def run(sqlContext: SQLContext): Seq[Row] = { - // scalastyle:off println - println("SetDatabaseCommand - db:" + databaseName) - // scalastyle:on println sqlContext.sessionState.catalog.setCurrentDatabase(databaseName) Seq.empty[Row] } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 110857661524..505e5c0bb62f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -69,9 +69,6 @@ private[hive] case class CurrentDatabase(ctx: HiveContext) override def foldable: Boolean = true override def nullable: Boolean = false override def eval(input: InternalRow): Any = { - // scalastyle:off println - println("CurrentDatabase - currentDb:" + ctx.sessionState.catalog.getCurrentDatabase) - // scalastyle:on println UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index e5e31624a71b..8ea28d21858f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -78,11 +78,7 @@ private[sql] class HiveSessionCatalog( override def getDatabasePath(dbName: String, path: Option[String] = None): String = { val dbPath = path.getOrElse(context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)) - val returnPath = new Path(new Path(dbPath), dbName + ".db").toString - // scalastyle:off println - println(s"Return of getDatabasePath: $returnPath") - // scalastyle:on println - returnPath + new Path(new Path(dbPath), dbName + ".db").toString } // Catalog for handling data source tables. TODO: This really doesn't belong here since it is diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index a77a82f0ff6b..0c57ede9ed0a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkException, SparkFiles} import org.apache.spark.sql.{AnalysisException, DataFrame, Row} +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin @@ -1180,34 +1181,10 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { test("current_database with multiple sessions") { sql("create database a") - // TestHiveContex - // scalastyle:off println - println("location of database a:" + hiveCatalog.getDatabase("a").locationUri) - println("list of databases:" + hiveCatalog.listDatabases()) - // scalastyle:on println - sql("select current_database()").explain(true) - sql("select current_database()").show() sql("use a") - sql("CLEAR CACHE") - sql("select current_database()").explain(true) - sql("select current_database()").show() - sql("use a") - sql("select current_database()").explain(true) - sql("select current_database()").show() - sql("use a") - sql("select current_database()").explain(true) - sql("select current_database()").show() val s2 = newSession() s2.sql("create database b") - // scalastyle:off println - println("location of database a:" + hiveCatalog.getDatabase("a").locationUri) - println("location of database b:" + hiveCatalog.getDatabase("b").locationUri) - println("list of databases:" + hiveCatalog.listDatabases()) - // scalastyle:on println - sql("select current_database()").show() s2.sql("use b") - sql("select current_database()").show() - s2.sql("select current_database()").show() assert(sql("select current_database()").first() === Row("a")) assert(s2.sql("select current_database()").first() === Row("b")) From 9cf48e1adc070bbc0820efb13745b1538d8301b6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 10 Apr 2016 23:22:13 -0700 Subject: [PATCH 19/33] address comments. --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 6 +++++- .../org/apache/spark/sql/hive/HiveSessionCatalog.scala | 2 +- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 4 ++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index bd6430705189..9e67b48cc1f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -123,7 +123,7 @@ class SessionCatalog( def getDatabasePath(dbName: String, path: Option[String]): String = { val dbPath = path.getOrElse(System.getProperty("java.io.tmpdir")) - new Path(new Path(dbPath), dbName + ".db").toString + new Path(new Path(dbPath), dbName.toLowerCase() + ".db").toString } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 4b85f94ddf11..242ded6c1b8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -238,7 +238,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist")) dropTable(catalog, tableIdent1) + assert(catalog.listDatabases().contains(dbName)) sql(s"DROP DATABASE $dbName RESTRICT") + assert(!catalog.listDatabases().contains(dbName)) } test("drop non-empty database in cascade mode") { @@ -251,8 +253,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { createTable(catalog, tableIdent1) // drop a non-empty database in CASCADE mode - dropTable(catalog, tableIdent1) + assert(catalog.listTables(dbName).contains(tableIdent1)) + assert(catalog.listDatabases().contains(dbName)) sql(s"DROP DATABASE $dbName CASCADE") + assert(!catalog.listDatabases().contains(dbName)) } test("alter table: rename") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 4c4a0d7e26ca..96c223d7e7b1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -76,7 +76,7 @@ private[sql] class HiveSessionCatalog( override def getDatabasePath(dbName: String, path: Option[String] = None): String = { val dbPath = path.getOrElse(context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)) - new Path(new Path(dbPath), dbName + ".db").toString + new Path(new Path(dbPath), dbName.toLowerCase() + ".db").toString } // Catalog for handling data source tables. TODO: This really doesn't belong here since it is diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index dba7667e5bb6..0e55ef2dcaef 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -64,7 +64,7 @@ class HiveDDLSuite } } - test("drop managed tables") { + test("drop managed tables in default database") { withTempDir { tmpDir => val tabName = "tab1" withTable(tabName) { @@ -91,7 +91,7 @@ class HiveDDLSuite } } - test("drop external data source table") { + test("drop external data source table in default database") { withTempDir { tmpDir => val tabName = "tab1" withTable(tabName) { From 38130a80719a02c5a55e94cf9018f08f4880ce82 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 11 Apr 2016 16:46:01 -0700 Subject: [PATCH 20/33] address comments. --- .../sql/catalyst/catalog/SessionCatalog.scala | 8 +- .../sql/execution/command/DDLSuite.scala | 3 +- .../spark/sql/hive/HiveSessionCatalog.scala | 14 ++- .../sql/hive/execution/HiveDDLSuite.scala | 95 +++++++++++++++++-- 4 files changed, 105 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 9e67b48cc1f6..863c29bafc82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -122,8 +122,12 @@ class SessionCatalog( } def getDatabasePath(dbName: String, path: Option[String]): String = { - val dbPath = path.getOrElse(System.getProperty("java.io.tmpdir")) - new Path(new Path(dbPath), dbName.toLowerCase() + ".db").toString + val dbPath = + path.map(new Path(_)) + .getOrElse( + new Path(new Path(System.getProperty("java.io.tmpdir")), dbName.toLowerCase() + ".db")) + + org.apache.commons.lang.StringUtils.removeEnd(dbPath.toString, "/") } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 242ded6c1b8d..8133988e012b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -86,7 +86,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("Create/Drop Database") { val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") databaseNames.foreach { dbName => @@ -120,7 +119,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - appendTrailingSlash(System.getProperty("java.io.tmpdir")) + s"$dbNameWithoutBackTicks.db", + org.apache.commons.lang.StringUtils.removeEnd(System.getProperty("java.io.tmpdir"), "/"), Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") assert(!catalog.databaseExists(dbNameWithoutBackTicks)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 96c223d7e7b1..265cf4fcb6c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal +import org.apache.commons.lang.StringUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} @@ -74,9 +75,18 @@ private[sql] class HiveSessionCatalog( // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- + // This function is to get the path for creating a non-default database override def getDatabasePath(dbName: String, path: Option[String] = None): String = { - val dbPath = path.getOrElse(context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)) - new Path(new Path(dbPath), dbName.toLowerCase() + ".db").toString + val dbPath = path.map(new Path(_)).getOrElse { + val defaultPath = context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) + if (StringUtils.isBlank(defaultPath)) { + throw new AnalysisException( + s"${HiveConf.ConfVars.METASTOREWAREHOUSE.varname} is not set in the config or blank") + } + new Path(new Path(defaultPath), dbName.toLowerCase() + ".db") + } + val fs = dbPath.getFileSystem(context.hiveconf) + new Path(fs.getUri.getScheme, fs.getUri.getAuthority, dbPath.toUri.getPath).toString } // Catalog for handling data source tables. TODO: This really doesn't belong here since it is diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0e55ef2dcaef..caaa42ab4d78 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -40,12 +40,17 @@ class HiveDDLSuite } } // check if the directory for recording the data of the table exists. - private def tableDirectoryExists(tableIdentifier: TableIdentifier): Boolean = { + private def tableDirectoryExists( + tableIdentifier: TableIdentifier, + dbPath: Option[String] = None): Boolean = { val expectedTablePath = - hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) - val filesystemPath = new Path(expectedTablePath) - val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration) - fs.exists(filesystemPath) + if (dbPath.isEmpty) { + new Path (hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)) + } else { + new Path(new Path(dbPath.get), tableIdentifier.table) + } + val fs = expectedTablePath.getFileSystem(sparkContext.hadoopConfiguration) + fs.exists(expectedTablePath) } test("drop tables") { @@ -164,7 +169,38 @@ class HiveDDLSuite } } - test("create/drop database - checking directory") { + test("create/drop database - location") { + val catalog = sqlContext.sessionState.catalog + withTempDir { tmpDir => + val dbName = "db1" + val tabName = "tab1" + val path = catalog.getDatabasePath(dbName, Option(tmpDir.toString)) + val fs = new Path(path).getFileSystem(hiveContext.hiveconf) + withTable(tabName) { + assert(tmpDir.listFiles.isEmpty) + sql(s"CREATE DATABASE $dbName Location '$tmpDir'") + val db1 = catalog.getDatabaseMetadata(dbName) + assert(db1 == CatalogDatabase( + dbName, + "", + path, + Map.empty)) + sql("USE db1") + + sql(s"CREATE TABLE $tabName as SELECT 1") + assert(tableDirectoryExists(TableIdentifier(tabName), Option(tmpDir.toString))) + + assert(tmpDir.listFiles.nonEmpty) + sql(s"DROP TABLE $tabName") + + assert(tmpDir.listFiles.isEmpty) + sql(s"DROP DATABASE $dbName") + assert(!fs.exists(new Path(tmpDir.toString))) + } + } + } + + test("create/drop database - RESTRICT") { val catalog = sqlContext.sessionState.catalog val dbName = "db1" val path = catalog.getDatabasePath(dbName, None) @@ -173,7 +209,7 @@ class HiveDDLSuite // the database directory does not exist assert (!fs.exists(dbPath)) - sql("CREATE DATABASE db1") + sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabaseMetadata(dbName) assert(db1 == CatalogDatabase( dbName, @@ -181,9 +217,50 @@ class HiveDDLSuite path, Map.empty)) // the database directory was created - assert (fs.exists(dbPath) && fs.isDirectory(dbPath)) - sql("DROP DATABASE db1") + assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) + sql("USE db1") + + val tabName = "tab1" + assert(!tableDirectoryExists(TableIdentifier(tabName), Option(path))) + sql(s"CREATE TABLE $tabName as SELECT 1") + assert(tableDirectoryExists(TableIdentifier(tabName), Option(path))) + sql(s"DROP TABLE $tabName") + assert(!tableDirectoryExists(TableIdentifier(tabName), Option(path))) + + sql(s"DROP DATABASE $dbName") // the database directory was removed + assert(!fs.exists(dbPath)) + } + + test("create/drop database - CASCADE") { + val catalog = sqlContext.sessionState.catalog + val dbName = "db1" + val path = catalog.getDatabasePath(dbName, None) + val dbPath = new Path(path) + val fs = dbPath.getFileSystem(hiveContext.hiveconf) + // the database directory does not exist assert (!fs.exists(dbPath)) + + sql(s"CREATE DATABASE $dbName") + assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) + sql("USE db1") + + val tabName = "tab1" + assert(!tableDirectoryExists(TableIdentifier(tabName), Option(path))) + sql(s"CREATE TABLE $tabName as SELECT 1") + assert(tableDirectoryExists(TableIdentifier(tabName), Option(path))) + sql(s"DROP TABLE $tabName") + assert(!tableDirectoryExists(TableIdentifier(tabName), Option(path))) + + sql(s"DROP DATABASE $dbName CASCADE") + // the database directory was removed and the inclusive table directories are also removed + assert(!fs.exists(dbPath)) + } + + test("drop default database") { + val message = intercept[AnalysisException] { + sql("DROP DATABASE default") + }.getMessage + assert(message.contains("Can not drop default database")) } } From eeec2c1b25bfee216727732b56b0e6a21a42fa20 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 11 Apr 2016 19:01:06 -0700 Subject: [PATCH 21/33] fix os specific separator --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 6 ++++-- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 863c29bafc82..cd255ff6653b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import java.io.File + import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -126,8 +128,8 @@ class SessionCatalog( path.map(new Path(_)) .getOrElse( new Path(new Path(System.getProperty("java.io.tmpdir")), dbName.toLowerCase() + ".db")) - - org.apache.commons.lang.StringUtils.removeEnd(dbPath.toString, "/") + val dbLocation = dbPath.toString + if (dbLocation.endsWith(File.separator)) dbLocation.dropRight(1) else dbLocation } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 8133988e012b..fcefad1ebf7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import java.io.File + import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row} @@ -110,6 +112,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("Create/Drop Database - location") { val catalog = sqlContext.sessionState.catalog val databaseNames = Seq("db1", "`database`") + val defaultPath = System.getProperty("java.io.tmpdir") databaseNames.foreach { dbName => try { @@ -119,7 +122,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - org.apache.commons.lang.StringUtils.removeEnd(System.getProperty("java.io.tmpdir"), "/"), + if (defaultPath.endsWith(File.separator)) defaultPath.dropRight(1) else defaultPath, Map.empty)) sql(s"DROP DATABASE $dbName CASCADE") assert(!catalog.databaseExists(dbNameWithoutBackTicks)) From 3353a390acb2db05aef39d6ea20e96521b263d6e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 12 Apr 2016 10:13:15 -0700 Subject: [PATCH 22/33] address comments --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 2 +- .../org/apache/spark/sql/hive/HiveSessionCatalog.scala | 6 +++++- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 6 +++--- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index cd255ff6653b..97e8e9f225f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -123,7 +123,7 @@ class SessionCatalog( currentDb = db } - def getDatabasePath(dbName: String, path: Option[String]): String = { + def createDatabasePath(dbName: String, path: Option[String]): String = { val dbPath = path.map(new Path(_)) .getOrElse( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 8f8894b373a2..812f73f39a37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -72,7 +72,7 @@ case class CreateDatabase( CatalogDatabase( databaseName, comment.getOrElse(""), - catalog.getDatabasePath(databaseName, path), + catalog.createDatabasePath(databaseName, path), props), ifNotExists) Seq.empty[Row] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 265cf4fcb6c7..fb613b447857 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -76,7 +76,7 @@ private[sql] class HiveSessionCatalog( // ---------------------------------------------------------------- // This function is to get the path for creating a non-default database - override def getDatabasePath(dbName: String, path: Option[String] = None): String = { + override def createDatabasePath(dbName: String, path: Option[String] = None): String = { val dbPath = path.map(new Path(_)).getOrElse { val defaultPath = context.hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) if (StringUtils.isBlank(defaultPath)) { @@ -85,6 +85,10 @@ private[sql] class HiveSessionCatalog( } new Path(new Path(defaultPath), dbName.toLowerCase() + ".db") } + // When dbPath does not have scheme and authority, qualify the input path against the + // default file system indicated by the configuration. Otherwise, replace the scheme and + // authority of a path with the scheme and authority of the file system that it maps to. + // This is based on hive.metastore.Warehouse.getDnsPath val fs = dbPath.getFileSystem(context.hiveconf) new Path(fs.getUri.getScheme, fs.getUri.getAuthority, dbPath.toUri.getPath).toString } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index caaa42ab4d78..3b3b6f42515d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -174,7 +174,7 @@ class HiveDDLSuite withTempDir { tmpDir => val dbName = "db1" val tabName = "tab1" - val path = catalog.getDatabasePath(dbName, Option(tmpDir.toString)) + val path = catalog.createDatabasePath(dbName, Option(tmpDir.toString)) val fs = new Path(path).getFileSystem(hiveContext.hiveconf) withTable(tabName) { assert(tmpDir.listFiles.isEmpty) @@ -203,7 +203,7 @@ class HiveDDLSuite test("create/drop database - RESTRICT") { val catalog = sqlContext.sessionState.catalog val dbName = "db1" - val path = catalog.getDatabasePath(dbName, None) + val path = catalog.createDatabasePath(dbName, None) val dbPath = new Path(path) val fs = dbPath.getFileSystem(hiveContext.hiveconf) // the database directory does not exist @@ -235,7 +235,7 @@ class HiveDDLSuite test("create/drop database - CASCADE") { val catalog = sqlContext.sessionState.catalog val dbName = "db1" - val path = catalog.getDatabasePath(dbName, None) + val path = catalog.createDatabasePath(dbName, None) val dbPath = new Path(path) val fs = dbPath.getFileSystem(hiveContext.hiveconf) // the database directory does not exist From 4989110e4e30c57efb28fc37b3b1487c4c78ac76 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 13 Apr 2016 18:19:28 -0700 Subject: [PATCH 23/33] address comments. --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 10 ++++------ .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 2 +- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 97e8e9f225f5..7177f0d28725 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -124,12 +124,10 @@ class SessionCatalog( } def createDatabasePath(dbName: String, path: Option[String]): String = { - val dbPath = - path.map(new Path(_)) - .getOrElse( - new Path(new Path(System.getProperty("java.io.tmpdir")), dbName.toLowerCase() + ".db")) - val dbLocation = dbPath.toString - if (dbLocation.endsWith(File.separator)) dbLocation.dropRight(1) else dbLocation + val dbPath = path.map(new Path(_)).getOrElse { + new Path(new Path(System.getProperty("java.io.tmpdir")), dbName.toLowerCase() + ".db") + } + dbPath.toString.stripSuffix(File.separator) } // ---------------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 3b3b6f42515d..a63087bb50d5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -45,7 +45,7 @@ class HiveDDLSuite dbPath: Option[String] = None): Boolean = { val expectedTablePath = if (dbPath.isEmpty) { - new Path (hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)) + new Path(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)) } else { new Path(new Path(dbPath.get), tableIdentifier.table) } From 8f185bea355739ccf465702be76ac78730b7ac36 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 19 Apr 2016 22:59:05 -0700 Subject: [PATCH 24/33] merge --- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 4839838daaaf..f49704b09337 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -370,7 +370,7 @@ class HiveDDLSuite val dbName = "db1" val tabName = "tab1" val path = catalog.createDatabasePath(dbName, Option(tmpDir.toString)) - val fs = new Path(path).getFileSystem(hiveContext.hiveconf) + val fs = new Path(path).getFileSystem(hiveContext.sessionState.hiveconf) withTable(tabName) { assert(tmpDir.listFiles.isEmpty) sql(s"CREATE DATABASE $dbName Location '$tmpDir'") @@ -400,9 +400,9 @@ class HiveDDLSuite val dbName = "db1" val path = catalog.createDatabasePath(dbName, None) val dbPath = new Path(path) - val fs = dbPath.getFileSystem(hiveContext.hiveconf) + val fs = dbPath.getFileSystem(hiveContext.sessionState.hiveconf) // the database directory does not exist - assert (!fs.exists(dbPath)) + assert(!fs.exists(dbPath)) sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabaseMetadata(dbName) @@ -432,9 +432,9 @@ class HiveDDLSuite val dbName = "db1" val path = catalog.createDatabasePath(dbName, None) val dbPath = new Path(path) - val fs = dbPath.getFileSystem(hiveContext.hiveconf) + val fs = dbPath.getFileSystem(hiveContext.sessionState.hiveconf) // the database directory does not exist - assert (!fs.exists(dbPath)) + assert(!fs.exists(dbPath)) sql(s"CREATE DATABASE $dbName") assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) From 44fdf83c9f519a3671c3db3aae404168e7613be6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 21 Apr 2016 18:47:42 -0700 Subject: [PATCH 25/33] address comments. --- .../main/scala/org/apache/spark/sql/execution/command/ddl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index e6b87a92c0d2..6cc917c36aca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -57,7 +57,7 @@ abstract class NativeDDLCommand(val sql: String) extends RunnableCommand { * {{{ * CREATE (DATABASE|SCHEMA) [IF NOT EXISTS] database_name * [COMMENT database_comment] - * [LOCATION file_path] + * [LOCATION database_directory] * [WITH DBPROPERTIES (property_name=property_value, ...)]; * }}} */ From a2d71fdb1af71d67b744b2f89a08d12837559c9b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 25 Apr 2016 23:52:18 -0700 Subject: [PATCH 26/33] address comments. --- .../apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 7 +------ .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 7 +++++++ .../org/apache/spark/sql/hive/HiveSessionCatalog.scala | 6 +++--- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 323b05934e5a..fe5f8d00c53e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -124,6 +124,7 @@ class SessionCatalog( currentDb = db } + /** Get the path for creating a non-default database. */ def createDatabasePath(dbName: String, path: Option[String]): String = { val dbPath = path.map(new Path(_)).getOrElse { new Path(new Path(System.getProperty("java.io.tmpdir")), dbName.toLowerCase() + ".db") @@ -696,12 +697,6 @@ class SessionCatalog( dropDatabase(db, ignoreIfNotExists = false, cascade = true) } tempTables.clear() - // Do not remove the function `current_database`, which is registered in each - // new session of HiveContext. Otherwise, it could load the Hive UDF function - // with the same function name. - functionRegistry.listFunction().filter(_ != "current_database").foreach { f => - functionRegistry.dropFunction(f) - } // restore built-in functions FunctionRegistry.builtin.listFunction().foreach { f => val expressionInfo = FunctionRegistry.builtin.lookupFunction(f) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bbc424cba684..eff5173f097b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -247,6 +247,11 @@ object SQLConf { .booleanConf .createWithDefault(false) + val HIVE_METASTORE_WAREHOUSE_DIR = SQLConfigBuilder("hive.metastore.warehouse.dir") + .doc("The location of default database for the warehouse.") + .stringConf + .createWithDefault("/user/hive/warehouse") + val HIVE_METASTORE_PARTITION_PRUNING = SQLConfigBuilder("spark.sql.hive.metastorePartitionPruning") .doc("When true, some predicates will be pushed down into the Hive metastore so that " + @@ -620,6 +625,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME) + def hiveMetastoreWarehouse: String = getConf(HIVE_METASTORE_WAREHOUSE_DIR) + def partitionDiscoveryEnabled(): Boolean = getConf(SQLConf.PARTITION_DISCOVERY_ENABLED) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index b1acbcc0b772..fa84fd282bfc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -74,13 +74,13 @@ private[sql] class HiveSessionCatalog( // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- - // This function is to get the path for creating a non-default database + /** Get the path for creating a non-default database. */ override def createDatabasePath(dbName: String, path: Option[String] = None): String = { val dbPath = path.map(new Path(_)).getOrElse { - val defaultPath = hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) + val defaultPath = conf.hiveMetastoreWarehouse if (StringUtils.isBlank(defaultPath)) { throw new AnalysisException( - s"${HiveConf.ConfVars.METASTOREWAREHOUSE.varname} is not set in the config or blank") + "hive.metastore.warehouse.dir is not set in the config or blank") } new Path(new Path(defaultPath), dbName.toLowerCase() + ".db") } From 375b69258be2b9b632f681964258b7f963babb4c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 26 Apr 2016 09:22:16 -0700 Subject: [PATCH 27/33] Unable to reproduce it in my local environment. Print more info. --- .../org/apache/spark/sql/hive/HiveSessionCatalog.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/HiveSessionState.scala | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index fa84fd282bfc..32243ebeb606 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -47,8 +47,8 @@ private[sql] class HiveSessionCatalog( sparkSession: SparkSession, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, - conf: SQLConf, - hiveconf: HiveConf) + hiveconf: HiveConf, + conf: SQLConf) extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) { override def setCurrentDatabase(db: String): Unit = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 4a8978e553f9..b31585180b70 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -63,6 +63,9 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) lazy val hiveconf: HiveConf = { val c = executionHive.conf conf.setConf(c.getAllProperties) + // scalastyle:off println + println("conf.hiveMetastoreWarehouse: " + conf.hiveMetastoreWarehouse) + // scalastyle:on println c } @@ -78,8 +81,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) sparkSession, functionResourceLoader, functionRegistry, - conf, - hiveconf) + hiveconf, + conf) } /** From 1da261a395bd799b6942e04fa142372a5bee3a8c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 26 Apr 2016 17:10:28 -0700 Subject: [PATCH 28/33] reload the conf after reset --- .../apache/spark/sql/hive/HiveSessionState.scala | 15 +++++++++------ .../org/apache/spark/sql/hive/test/TestHive.scala | 1 + .../spark/sql/hive/execution/HiveQuerySuite.scala | 1 + 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index b31585180b70..d4ed22f74d6e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -61,12 +61,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) * set in the SQLConf *as well as* in the HiveConf. */ lazy val hiveconf: HiveConf = { - val c = executionHive.conf - conf.setConf(c.getAllProperties) - // scalastyle:off println - println("conf.hiveMetastoreWarehouse: " + conf.hiveMetastoreWarehouse) - // scalastyle:on println - c + loadHiveConfToSQLConf() + executionHive.conf } setDefaultOverrideConfs() @@ -147,6 +143,13 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) setConf(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false") } + /** + * Load all the params in HiveConf into SQLConf. + */ + def loadHiveConfToSQLConf(): Unit = { + conf.setConf(executionHive.conf.getAllProperties) + } + override def setConf(key: String, value: String): Unit = { super.setConf(key, value) executionHive.runSqlHive(s"SET $key=$value") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index f74e5cd6f5c7..066a9008f0ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -450,6 +450,7 @@ private[hive] class TestHiveSparkSession( TestHiveContext.hiveClientConfigurations( sessionState.hiveconf, warehousePath, scratchDirPath, metastoreTemporaryConf) .foreach { case (k, v) => sessionState.metadataHive.runSqlHive(s"SET $k=$v") } + sessionState.loadHiveConfToSQLConf() sessionState.setDefaultOverrideConfs() sessionState.catalog.setCurrentDatabase("default") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index d7f6d18b5ebc..9f9489625dc4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1177,6 +1177,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { } conf.clear() + TestHive.reset() } test("current_database with multiple sessions") { From 95d9a00d0679b932bc6e2478b49c6f8b189054f5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 1 May 2016 07:10:24 -0700 Subject: [PATCH 29/33] address comments. --- .../sql/execution/command/DDLSuite.scala | 231 +++++++++--------- 1 file changed, 118 insertions(+), 113 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 656b7c73a869..5ea28ff36078 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -104,52 +104,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("the qualified path of a database is stored in the catalog") { val catalog = sqlContext.sessionState.catalog - val path = System.getProperty("java.io.tmpdir") - // The generated temp path is not qualified. - assert(!path.startsWith("file:/")) - sql(s"CREATE DATABASE db1 LOCATION '$path'") - val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri - assert("file" === pathInCatalog.getScheme) - val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path - assert(expectedPath === pathInCatalog.getPath) - - withSQLConf( - SQLConf.WAREHOUSE_PATH.key -> System.getProperty("java.io.tmpdir")) { - sql(s"CREATE DATABASE db2") - val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri + withTempDir { tmpDir => + val path = tmpDir.toString + // The generated temp path is not qualified. + assert(!path.startsWith("file:/")) + sql(s"CREATE DATABASE db1 LOCATION '$path'") + val pathInCatalog = new Path(catalog.getDatabaseMetadata("db1").locationUri).toUri assert("file" === pathInCatalog.getScheme) - val expectedPath = appendTrailingSlash(sqlContext.conf.warehousePath) + "db2.db" + val expectedPath = if (path.endsWith(File.separator)) path.dropRight(1) else path assert(expectedPath === pathInCatalog.getPath) - } - sql("DROP DATABASE db1") - sql("DROP DATABASE db2") + withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { + sql(s"CREATE DATABASE db2") + val pathInCatalog = new Path(catalog.getDatabaseMetadata("db2").locationUri).toUri + assert("file" === pathInCatalog.getScheme) + val expectedPath = appendTrailingSlash(sqlContext.conf.warehousePath) + "db2.db" + assert(expectedPath === pathInCatalog.getPath) + } + + sql("DROP DATABASE db1") + sql("DROP DATABASE db2") + } } test("Create/Drop Database") { - withSQLConf( - SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { - val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") + withTempDir { tmpDir => + val path = tmpDir.toString + withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { + val catalog = sqlContext.sessionState.catalog + val databaseNames = Seq("db1", "`database`") - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) + databaseNames.foreach { dbName => + try { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) - sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - val expectedLocation = - "file:" + appendTrailingSlash(System.getProperty("java.io.tmpdir")) + - s"$dbNameWithoutBackTicks.db" - assert(db1 == CatalogDatabase( - dbNameWithoutBackTicks, - "", - expectedLocation, - Map.empty)) - sql(s"DROP DATABASE $dbName CASCADE") - assert(!catalog.databaseExists(dbNameWithoutBackTicks)) - } finally { - catalog.reset() + sql(s"CREATE DATABASE $dbName") + val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) + val expectedLocation = + "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, + "", + expectedLocation, + Map.empty)) + sql(s"DROP DATABASE $dbName CASCADE") + assert(!catalog.databaseExists(dbNameWithoutBackTicks)) + } finally { + catalog.reset() + } } } } @@ -158,50 +160,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("Create/Drop Database - location") { val catalog = sqlContext.sessionState.catalog val databaseNames = Seq("db1", "`database`") - val dbath = "file:" + System.getProperty("java.io.tmpdir") - - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - sql(s"CREATE DATABASE $dbName Location '${System.getProperty("java.io.tmpdir")}'") - val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - assert(db1 == CatalogDatabase( - dbNameWithoutBackTicks, - "", - if (dbath.endsWith(File.separator)) dbath.dropRight(1) else dbath, - Map.empty)) - sql(s"DROP DATABASE $dbName CASCADE") - assert(!catalog.databaseExists(dbNameWithoutBackTicks)) - } finally { - catalog.reset() - } - } - } - - test("Create Database - database already exists") { - withSQLConf( - SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { - val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") - + withTempDir { tmpDir => + val path = tmpDir.toString + val dbPath = "file:" + path databaseNames.foreach { dbName => try { val dbNameWithoutBackTicks = cleanIdentifier(dbName) - sql(s"CREATE DATABASE $dbName") + sql(s"CREATE DATABASE $dbName Location '$path'") val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - val expectedLocation = - "file:" + appendTrailingSlash(System.getProperty("java.io.tmpdir")) + - s"$dbNameWithoutBackTicks.db" assert(db1 == CatalogDatabase( dbNameWithoutBackTicks, "", - expectedLocation, + if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath, Map.empty)) - - val message = intercept[AnalysisException] { - sql(s"CREATE DATABASE $dbName") - }.getMessage - assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) + sql(s"DROP DATABASE $dbName CASCADE") + assert(!catalog.databaseExists(dbNameWithoutBackTicks)) } finally { catalog.reset() } @@ -209,47 +182,79 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("Create Database - database already exists") { + withTempDir { tmpDir => + val path = tmpDir.toString + withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { + val catalog = sqlContext.sessionState.catalog + val databaseNames = Seq("db1", "`database`") + + databaseNames.foreach { dbName => + try { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + sql(s"CREATE DATABASE $dbName") + val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) + val expectedLocation = + "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" + assert(db1 == CatalogDatabase( + dbNameWithoutBackTicks, + "", + expectedLocation, + Map.empty)) + + val message = intercept[AnalysisException] { + sql(s"CREATE DATABASE $dbName") + }.getMessage + assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) + } finally { + catalog.reset() + } + } + } + } + } + test("Alter/Describe Database") { - withSQLConf( - SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) { - val catalog = sqlContext.sessionState.catalog - val databaseNames = Seq("db1", "`database`") + withTempDir { tmpDir => + val path = tmpDir.toString + withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { + val catalog = sqlContext.sessionState.catalog + val databaseNames = Seq("db1", "`database`") - databaseNames.foreach { dbName => - try { - val dbNameWithoutBackTicks = cleanIdentifier(dbName) - val location = - "file:" + appendTrailingSlash(System.getProperty("java.io.tmpdir")) + - s"$dbNameWithoutBackTicks.db" - - sql(s"CREATE DATABASE $dbName") - - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "") :: Nil) - - sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") - - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) - - sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") - - checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), - Row("Database Name", dbNameWithoutBackTicks) :: - Row("Description", "") :: - Row("Location", location) :: - Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) - } finally { - catalog.reset() + databaseNames.foreach { dbName => + try { + val dbNameWithoutBackTicks = cleanIdentifier(dbName) + val location = "file:" + appendTrailingSlash(path) + s"$dbNameWithoutBackTicks.db" + + sql(s"CREATE DATABASE $dbName") + + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "") :: Nil) + + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") + + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "((a,a), (b,b), (c,c))") :: Nil) + + sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") + + checkAnswer( + sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + Row("Database Name", dbNameWithoutBackTicks) :: + Row("Description", "") :: + Row("Location", location) :: + Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil) + } finally { + catalog.reset() + } } } } From 59a9805ceacfc129bb3f8fef1cd2cd7d8d7f7f70 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 3 May 2016 22:22:51 -0700 Subject: [PATCH 30/33] address comments. --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 9bb992dd15d4..53efb51a8364 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -149,14 +149,9 @@ class SessionCatalog( /** Get the path for creating a non-default database. */ def createDatabasePath(db: String, path: Option[String]): String = { val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase - val dbPath = path.map(new Path(_)).getOrElse { - val defaultPath = conf.warehousePath - if (org.apache.commons.lang.StringUtils.isBlank(defaultPath)) { - throw new AnalysisException("spark.sql.warehouse.dir is blank") - } - new Path(new Path(defaultPath), database + ".db") - } - dbPath.toString.stripSuffix(File.separator) + path.map(new Path(_)).getOrElse { + new Path(new Path(conf.warehousePath), database + ".db") + }.toString } // ---------------------------------------------------------------------------- From b885f7be20a4e6a5355edfea86aefa7b702a56f7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 3 May 2016 22:49:41 -0700 Subject: [PATCH 31/33] added a new test case --- .../sql/hive/execution/HiveDDLSuite.scala | 62 ++++++++++++------- 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index ba3d885bfc7f..fe229c34a3c2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -365,34 +365,48 @@ class HiveDDLSuite } } - test("create/drop database - location") { + private def createDatabaseWithLocation(tmpDir: File, dirExists: Boolean): Unit = { val catalog = sqlContext.sessionState.catalog - withTempDir { tmpDir => - val dbName = "db1" - val tabName = "tab1" - val path = "file:" + catalog.createDatabasePath(dbName, Option(tmpDir.toString)) - val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf()) - withTable(tabName) { - assert(tmpDir.listFiles.isEmpty) - sql(s"CREATE DATABASE $dbName Location '$tmpDir'") - val db1 = catalog.getDatabaseMetadata(dbName) - assert(db1 == CatalogDatabase( - dbName, - "", - path, - Map.empty)) - sql("USE db1") - - sql(s"CREATE TABLE $tabName as SELECT 1") - assert(tableDirectoryExists(TableIdentifier(tabName), Option(tmpDir.toString))) - - assert(tmpDir.listFiles.nonEmpty) - sql(s"DROP TABLE $tabName") - + val dbName = "db1" + val tabName = "tab1" + val path = "file:" + catalog.createDatabasePath(dbName, Option(tmpDir.toString)) + val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf()) + withTable(tabName) { + if (dirExists) { assert(tmpDir.listFiles.isEmpty) - sql(s"DROP DATABASE $dbName") + } else { assert(!fs.exists(new Path(tmpDir.toString))) } + sql(s"CREATE DATABASE $dbName Location '$tmpDir'") + val db1 = catalog.getDatabaseMetadata(dbName) + assert(db1 == CatalogDatabase( + dbName, + "", + path, + Map.empty)) + sql("USE db1") + + sql(s"CREATE TABLE $tabName as SELECT 1") + assert(tableDirectoryExists(TableIdentifier(tabName), Option(tmpDir.toString))) + + assert(tmpDir.listFiles.nonEmpty) + sql(s"DROP TABLE $tabName") + + assert(tmpDir.listFiles.isEmpty) + sql(s"DROP DATABASE $dbName") + assert(!fs.exists(new Path(tmpDir.toString))) + } + } + + test("create/drop database - location without pre-created directory") { + withTempPath { tmpDir => + createDatabaseWithLocation(tmpDir, dirExists = false) + } + } + + test("create/drop database - location with pre-created directory") { + withTempDir { tmpDir => + createDatabaseWithLocation(tmpDir, dirExists = true) } } From 55cf518ec293eeec275aa8a2ca1f160c46d8eb58 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 4 May 2016 19:50:39 -0700 Subject: [PATCH 32/33] address comments --- .../sql/catalyst/catalog/SessionCatalog.scala | 11 +- .../spark/sql/execution/command/ddl.scala | 2 +- .../sql/hive/execution/HiveDDLSuite.scala | 129 ++++++++++-------- 3 files changed, 78 insertions(+), 64 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 53efb51a8364..eff420eb4c5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -146,12 +146,13 @@ class SessionCatalog( currentDb = db } - /** Get the path for creating a non-default database. */ - def createDatabasePath(db: String, path: Option[String]): String = { + /** + * Get the path for creating a non-default database when database location is not provided + * by users. + */ + def getDefaultDBPath(db: String): String = { val database = if (conf.caseSensitiveAnalysis) db else db.toLowerCase - path.map(new Path(_)).getOrElse { - new Path(new Path(conf.warehousePath), database + ".db") - }.toString + new Path(new Path(conf.warehousePath), database + ".db").toString } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7233a063e229..085bdaff4e03 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -60,7 +60,7 @@ case class CreateDatabase( CatalogDatabase( databaseName, comment.getOrElse(""), - catalog.createDatabasePath(databaseName, path), + path.getOrElse(catalog.getDefaultDBPath(databaseName)), props), ifNotExists) Seq.empty[Row] diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index fbe88a8f7d9d..d55ddb251d00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -46,11 +46,11 @@ class HiveDDLSuite tableIdentifier: TableIdentifier, dbPath: Option[String] = None): Boolean = { val expectedTablePath = - if (dbPath.isEmpty) { - hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) - } else { - new Path(new Path(dbPath.get), tableIdentifier.table).toString - } + if (dbPath.isEmpty) { + hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier) + } else { + new Path(new Path(dbPath.get), tableIdentifier.table).toString + } val filesystemPath = new Path(expectedTablePath) val fs = filesystemPath.getFileSystem(hiveContext.sessionState.newHadoopConf()) fs.exists(filesystemPath) @@ -386,8 +386,7 @@ class HiveDDLSuite val catalog = sqlContext.sessionState.catalog val dbName = "db1" val tabName = "tab1" - val path = "file:" + catalog.createDatabasePath(dbName, Option(tmpDir.toString)) - val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf()) + val fs = new Path(tmpDir.toString).getFileSystem(hiveContext.sessionState.newHadoopConf()) withTable(tabName) { if (dirExists) { assert(tmpDir.listFiles.isEmpty) @@ -396,10 +395,11 @@ class HiveDDLSuite } sql(s"CREATE DATABASE $dbName Location '$tmpDir'") val db1 = catalog.getDatabaseMetadata(dbName) + val dbPath = "file:" + tmpDir assert(db1 == CatalogDatabase( dbName, "", - path, + if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath, Map.empty)) sql("USE db1") @@ -427,61 +427,74 @@ class HiveDDLSuite } } - test("create/drop database - RESTRICT") { - val catalog = sqlContext.sessionState.catalog - val dbName = "db1" - val path = "file:" + catalog.createDatabasePath(dbName, None) - val dbPath = new Path(path) - val fs = dbPath.getFileSystem(hiveContext.sessionState.newHadoopConf()) - // the database directory does not exist - assert(!fs.exists(dbPath)) - - sql(s"CREATE DATABASE $dbName") - val db1 = catalog.getDatabaseMetadata(dbName) - assert(db1 == CatalogDatabase( - dbName, - "", - path, - Map.empty)) - // the database directory was created - assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) - sql("USE db1") + private def appendTrailingSlash(path: String): String = { + if (!path.endsWith(File.separator)) path + File.separator else path + } - val tabName = "tab1" - assert(!tableDirectoryExists(TableIdentifier(tabName), Option(path))) - sql(s"CREATE TABLE $tabName as SELECT 1") - assert(tableDirectoryExists(TableIdentifier(tabName), Option(path))) - sql(s"DROP TABLE $tabName") - assert(!tableDirectoryExists(TableIdentifier(tabName), Option(path))) - - sql(s"DROP DATABASE $dbName") - // the database directory was removed - assert(!fs.exists(dbPath)) + private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = { + withTempPath { tmpDir => + val path = tmpDir.toString + withSQLConf(SQLConf.WAREHOUSE_PATH.key -> path) { + val dbName = "db1" + val fs = new Path(path).getFileSystem(hiveContext.sessionState.newHadoopConf()) + val dbPath = new Path(path) + // the database directory does not exist + assert(!fs.exists(dbPath)) + + sql(s"CREATE DATABASE $dbName") + val catalog = sqlContext.sessionState.catalog + val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db" + val db1 = catalog.getDatabaseMetadata(dbName) + assert(db1 == CatalogDatabase( + dbName, + "", + expectedDBLocation, + Map.empty)) + // the database directory was created + assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) + sql(s"USE $dbName") + + val tabName = "tab1" + assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) + sql(s"CREATE TABLE $tabName as SELECT 1") + assert(tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) + + if (!tableExists) { + sql(s"DROP TABLE $tabName") + assert(!tableDirectoryExists(TableIdentifier(tabName), Option(expectedDBLocation))) + } + + val sqlDropDatabase = s"DROP DATABASE $dbName ${if (cascade) "CASCADE" else "RESTRICT"}" + if (tableExists && !cascade) { + val message = intercept[AnalysisException] { + sql(sqlDropDatabase) + }.getMessage + assert(message.contains(s"Database $dbName is not empty. One or more tables exist.")) + // the database directory was not removed + assert(fs.exists(new Path(expectedDBLocation))) + } else { + sql(sqlDropDatabase) + // the database directory was removed and the inclusive table directories are also removed + assert(!fs.exists(new Path(expectedDBLocation))) + } + } + } } - test("create/drop database - CASCADE") { - val catalog = sqlContext.sessionState.catalog - val dbName = "db1" - val path = catalog.createDatabasePath(dbName, None) - val dbPath = new Path(path) - val fs = dbPath.getFileSystem(hiveContext.sessionState.newHadoopConf()) - // the database directory does not exist - assert(!fs.exists(dbPath)) + test("drop database containing tables - CASCADE") { + dropDatabase(cascade = true, tableExists = true) + } - sql(s"CREATE DATABASE $dbName") - assert(fs.exists(dbPath) && fs.isDirectory(dbPath)) - sql("USE db1") + test("drop an empty database - CASCADE") { + dropDatabase(cascade = true, tableExists = false) + } - val tabName = "tab1" - assert(!tableDirectoryExists(TableIdentifier(tabName), Option(path))) - sql(s"CREATE TABLE $tabName as SELECT 1") - assert(tableDirectoryExists(TableIdentifier(tabName), Option(path))) - sql(s"DROP TABLE $tabName") - assert(!tableDirectoryExists(TableIdentifier(tabName), Option(path))) - - sql(s"DROP DATABASE $dbName CASCADE") - // the database directory was removed and the inclusive table directories are also removed - assert(!fs.exists(dbPath)) + test("drop database containing tables - RESTRICT") { + dropDatabase(cascade = false, tableExists = true) + } + + test("drop an empty database - RESTRICT") { + dropDatabase(cascade = false, tableExists = false) } test("drop default database") { From b190b86d36445056b0d15e903498e765cf1555a9 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 4 May 2016 19:53:31 -0700 Subject: [PATCH 33/33] address comments --- .../org/apache/spark/sql/execution/command/DDLSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b77a4c3e4e04..6085098a709e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -95,10 +95,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false) } - private def dropTable(catalog: SessionCatalog, name: TableIdentifier): Unit = { - catalog.dropTable(name, ignoreIfNotExists = false) - } - private def appendTrailingSlash(path: String): String = { if (!path.endsWith(File.separator)) path + File.separator else path } @@ -303,7 +299,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { }.getMessage assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist")) - dropTable(catalog, tableIdent1) + catalog.dropTable(tableIdent1, ignoreIfNotExists = false) + assert(catalog.listDatabases().contains(dbName)) sql(s"DROP DATABASE $dbName RESTRICT") assert(!catalog.listDatabases().contains(dbName))