From 919e41a226760f9fd6fc3c91fcb486dcfae70824 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 20 Sep 2016 08:14:59 +0800 Subject: [PATCH 01/13] implement global temp view --- .../spark/sql/catalyst/parser/SqlBase.g4 | 8 +- .../catalog/GlobalTempViewManager.scala | 96 +++++++++ .../sql/catalyst/catalog/SessionCatalog.scala | 196 ++++++++++++++---- .../scala/org/apache/spark/sql/Dataset.scala | 39 +++- .../apache/spark/sql/catalog/Catalog.scala | 11 +- .../spark/sql/execution/SparkSqlParser.scala | 19 +- .../spark/sql/execution/command/tables.scala | 8 +- .../spark/sql/execution/command/views.scala | 189 ++++++++++------- .../spark/sql/execution/datasources/ddl.scala | 27 --- .../spark/sql/internal/CatalogImpl.scala | 21 +- .../spark/sql/internal/SessionState.scala | 2 + .../spark/sql/internal/SharedState.scala | 16 +- .../sql/execution/GlobalTempViewSuite.scala | 107 ++++++++++ .../spark/sql/hive/HiveSessionCatalog.scala | 6 +- .../spark/sql/hive/HiveSessionState.scala | 2 + .../sql/hive/execution/HiveDDLSuite.scala | 2 +- .../sql/hive/execution/SQLViewSuite.scala | 6 +- 17 files changed, 579 insertions(+), 176 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index de2f9ee6bc7a..42e02ba6b7eb 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -111,11 +111,12 @@ statement | ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions | DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable | DROP VIEW (IF EXISTS)? tableIdentifier #dropTable - | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier + | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? + VIEW (IF NOT EXISTS)? tableIdentifier identifierCommentList? (COMMENT STRING)? (PARTITIONED ON identifierList)? (TBLPROPERTIES tablePropertyList)? AS query #createView - | CREATE (OR REPLACE)? TEMPORARY VIEW + | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW tableIdentifier ('(' colTypeList ')')? tableProvider (OPTIONS tablePropertyList)? #createTempViewUsing | ALTER VIEW tableIdentifier AS? query #alterViewQuery @@ -669,7 +670,7 @@ nonReserved | MAP | ARRAY | STRUCT | LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES | RECORDREADER | DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | LINES | SEPARATED - | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | TEMPORARY | OPTIONS + | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY | OPTIONS | GROUPING | CUBE | ROLLUP | EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN | TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF @@ -857,6 +858,7 @@ CACHE: 'CACHE'; UNCACHE: 'UNCACHE'; LAZY: 'LAZY'; FORMATTED: 'FORMATTED'; +GLOBAL: 'GLOBAL'; TEMPORARY: 'TEMPORARY' | 'TEMP'; OPTIONS: 'OPTIONS'; UNSET: 'UNSET'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala new file mode 100644 index 000000000000..cdb778d28acc --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util.StringUtils + + +/** + * A thread-safe manager for global temporary views, providing atomic operations to manage them, + * e.g. create, update, remove, etc. + * + * Note that, the view name is always case-sensitive here, callers are responsible to format the + * view name w.r.t. case-sensitive config. + */ +class GlobalTempViewManager { + + /** List of view definitions, mapping from view name to logical plan. */ + @GuardedBy("this") + private val viewDefinitions = new mutable.HashMap[String, LogicalPlan] + + def get(name: String): Option[LogicalPlan] = synchronized { + viewDefinitions.get(name) + } + + def create( + name: String, + viewDefinition: LogicalPlan, + overrideIfExists: Boolean): Unit = synchronized { + if (!overrideIfExists && viewDefinitions.contains(name)) { + throw new TempTableAlreadyExistsException(name) + } + viewDefinitions.put(name, viewDefinition) + } + + def update( + name: String, + viewDefinition: LogicalPlan): Boolean = synchronized { + // Only update it when the view with the given name exits. + if (viewDefinitions.contains(name)) { + viewDefinitions.put(name, viewDefinition) + true + } else { + false + } + } + + def remove(name: String): Boolean = synchronized { + viewDefinitions.remove(name).isDefined + } + + def rename(oldName: String, newName: String): Boolean = synchronized { + if (viewDefinitions.contains(oldName)) { + if (viewDefinitions.contains(newName)) { + throw new AnalysisException( + s"rename temporary view from '$oldName' to '$newName': destination view already exists") + } + + val viewDefinition = viewDefinitions(oldName) + viewDefinitions.remove(oldName) + viewDefinitions.put(newName, viewDefinition) + true + } else { + false + } + } + + def listNames(pattern: String): Seq[String] = synchronized { + StringUtils.filterPattern(viewDefinitions.keys.toSeq, pattern) + } + + def clear(): Unit = synchronized { + viewDefinitions.clear() + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8c01c7a3f2bd..fd99d6085bfb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -36,6 +36,9 @@ import org.apache.spark.sql.catalyst.util.StringUtils object SessionCatalog { val DEFAULT_DATABASE = "default" + + val GLOBAL_TEMP_DB_CONF_KEY = "spark.sql.database.globalTemp" + val DEFAULT_GLOBAL_TEMP_DB = "global_temp" } /** @@ -47,6 +50,8 @@ object SessionCatalog { */ class SessionCatalog( externalCatalog: ExternalCatalog, + globalTempDB: String, + globalTempViews: GlobalTempViewManager, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, conf: CatalystConf, @@ -61,6 +66,8 @@ class SessionCatalog( conf: CatalystConf) { this( externalCatalog, + SessionCatalog.DEFAULT_GLOBAL_TEMP_DB, + new GlobalTempViewManager, DummyFunctionResourceLoader, functionRegistry, conf, @@ -142,8 +149,12 @@ class SessionCatalog( // ---------------------------------------------------------------------------- def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = { - val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString val dbName = formatDatabaseName(dbDefinition.name) + if (dbName == globalTempDB) { + throw new AnalysisException(s"$globalTempDB is a system preserved database, " + + "you cannot create a database with this name.") + } + val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString externalCatalog.createDatabase( dbDefinition.copy(name = dbName, locationUri = qualifiedPath), ignoreIfExists) @@ -154,7 +165,7 @@ class SessionCatalog( if (dbName == DEFAULT_DATABASE) { throw new AnalysisException(s"Can not drop default database") } else if (dbName == getCurrentDatabase) { - throw new AnalysisException(s"Can not drop current database `${dbName}`") + throw new AnalysisException(s"Can not drop current database `$dbName`") } externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade) } @@ -188,6 +199,10 @@ class SessionCatalog( def setCurrentDatabase(db: String): Unit = { val dbName = formatDatabaseName(db) + if (dbName == globalTempDB) { + throw new AnalysisException(s"$globalTempDB is a system preserved database, " + + "you cannot use it as current database.") + } requireDbExists(dbName) synchronized { currentDb = dbName } } @@ -275,6 +290,7 @@ class SessionCatalog( */ def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + if (db == globalTempDB) return None val table = formatTableName(name.table) requireDbExists(db) externalCatalog.getTableOption(db, table) @@ -329,33 +345,77 @@ class SessionCatalog( // ---------------------------------------------- /** - * Create a temporary table. + * Create a local temporary view. */ def createTempView( name: String, - tableDefinition: LogicalPlan, + viewDefinition: LogicalPlan, overrideIfExists: Boolean): Unit = synchronized { - val table = formatTableName(name) - if (tempTables.contains(table) && !overrideIfExists) { + val viewName = formatTableName(name) + if (tempTables.contains(viewName) && !overrideIfExists) { throw new TempTableAlreadyExistsException(name) } - tempTables.put(table, tableDefinition) + tempTables.put(viewName, viewDefinition) + } + + /** + * Create a global temporary view. + */ + def createGlobalTempView( + name: String, + viewDefinition: LogicalPlan, + overrideIfExists: Boolean): Unit = { + globalTempViews.create(formatTableName(name), viewDefinition, overrideIfExists) } /** - * Return a temporary view exactly as it was stored. + * Alter the definition of a local/global temp view matching the given name, returns true if a + * temp view is matched and altered, false otherwise. + */ + def alterTempViewDefinition( + name: TableIdentifier, + viewDefinition: LogicalPlan): Boolean = synchronized { + val viewName = formatTableName(name.table) + if (name.database.isEmpty) { + if (tempTables.contains(viewName)) { + createTempView(viewName, viewDefinition, overrideIfExists = true) + true + } else { + false + } + } else if (formatDatabaseName(name.database.get) == globalTempDB) { + globalTempViews.update(viewName, viewDefinition) + } else { + false + } + } + + /** + * Return a local temporary view exactly as it was stored. */ def getTempView(name: String): Option[LogicalPlan] = synchronized { tempTables.get(formatTableName(name)) } /** - * Drop a temporary view. + * Return a global temporary view exactly as it was stored. + */ + def getGlobalTempView(name: String): Option[LogicalPlan] = { + globalTempViews.get(formatTableName(name)) + } + + /** + * Drop a local temporary view. */ def dropTempView(name: String): Unit = synchronized { tempTables.remove(formatTableName(name)) } + /** + * Drop a global temporary view. + */ + def dropGlobalTempView(name: String): Boolean = globalTempViews.remove(formatTableName(name)) + // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | // ------------------------------------------------------------- @@ -371,9 +431,7 @@ class SessionCatalog( */ def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized { val table = formatTableName(name.table) - if (name.database.isDefined) { - getTableMetadata(name) - } else { + if (name.database.isEmpty) { getTempView(table).map { plan => CatalogTable( identifier = TableIdentifier(table), @@ -381,6 +439,16 @@ class SessionCatalog( storage = CatalogStorageFormat.empty, schema = plan.output.toStructType) }.getOrElse(getTableMetadata(name)) + } else if (formatDatabaseName(name.database.get) == globalTempDB) { + globalTempViews.get(table).map { plan => + CatalogTable( + identifier = TableIdentifier(table), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = plan.output.toStructType) + }.getOrElse(throw new NoSuchTableException(globalTempDB, table)) + } else { + getTableMetadata(name) } } @@ -393,21 +461,25 @@ class SessionCatalog( */ def renameTable(oldName: TableIdentifier, newName: String): Unit = synchronized { val db = formatDatabaseName(oldName.database.getOrElse(currentDb)) - requireDbExists(db) val oldTableName = formatTableName(oldName.table) val newTableName = formatTableName(newName) - if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { - requireTableExists(TableIdentifier(oldTableName, Some(db))) - requireTableNotExists(TableIdentifier(newTableName, Some(db))) - externalCatalog.renameTable(db, oldTableName, newTableName) + if (db == globalTempDB) { + globalTempViews.rename(oldTableName, newTableName) } else { - if (tempTables.contains(newTableName)) { - throw new AnalysisException( - s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': destination table already exists") + requireDbExists(db) + if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { + requireTableExists(TableIdentifier(oldTableName, Some(db))) + requireTableNotExists(TableIdentifier(newTableName, Some(db))) + externalCatalog.renameTable(db, oldTableName, newTableName) + } else { + if (tempTables.contains(newTableName)) { + throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " + + "destination table already exists") + } + val table = tempTables(oldTableName) + tempTables.remove(oldTableName) + tempTables.put(newTableName, table) } - val table = tempTables(oldTableName) - tempTables.remove(oldTableName) - tempTables.put(newTableName, table) } } @@ -424,17 +496,24 @@ class SessionCatalog( purge: Boolean): Unit = synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.contains(table)) { - requireDbExists(db) - // When ignoreIfNotExists is false, no exception is issued when the table does not exist. - // Instead, log it as an error message. - if (tableExists(TableIdentifier(table, Option(db)))) { - externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge) - } else if (!ignoreIfNotExists) { - throw new NoSuchTableException(db = db, table = table) + if (db == globalTempDB) { + val viewExists = globalTempViews.remove(table) + if (!viewExists && !ignoreIfNotExists) { + throw new NoSuchTableException(globalTempDB, table) } } else { - tempTables.remove(table) + if (name.database.isDefined || !tempTables.contains(table)) { + requireDbExists(db) + // When ignoreIfNotExists is false, no lexception is issued when the table does not exist. + // Instead, log it as an error message. + if (tableExists(TableIdentifier(table, Option(db)))) { + externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge) + } else if (!ignoreIfNotExists) { + throw new NoSuchTableException(db = db, table = table) + } + } else { + tempTables.remove(table) + } } } @@ -453,7 +532,11 @@ class SessionCatalog( val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) val relationAlias = alias.getOrElse(table) - if (name.database.isDefined || !tempTables.contains(table)) { + if (db == globalTempDB) { + globalTempViews.get(table).map { viewDef => + SubqueryAlias(relationAlias, viewDef, Some(name)) + }.getOrElse(throw new NoSuchTableException(db, table)) + } else if (name.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) val view = Option(metadata.tableType).collect { case CatalogTableType.VIEW => name @@ -472,26 +555,46 @@ class SessionCatalog( * explicitly specified. */ def isTemporaryTable(name: TableIdentifier): Boolean = synchronized { - name.database.isEmpty && tempTables.contains(formatTableName(name.table)) + val table = formatTableName(name.table) + if (name.database.isEmpty) { + tempTables.contains(table) + } else if (formatDatabaseName(name.database.get) == globalTempDB) { + globalTempViews.get(table).isDefined + } else { + false + } } /** - * List all tables in the specified database, including temporary tables. + * List all tables in the specified database, including local temporary tables. Returns pairs of + * table identifier and isTemporary flag. + * + * Note that, if the specified database is global temp database, we will list global temp views. */ - def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") + def listTables(db: String): Seq[(TableIdentifier, Boolean)] = listTables(db, "*") /** - * List all matching tables in the specified database, including temporary tables. + * List all matching tables in the specified database, including local temporary tables. Returns + * pairs of table identifier and isTemporary flag. + * + * Note that, if the specified database is global temp database, we will list global temp views. */ - def listTables(db: String, pattern: String): Seq[TableIdentifier] = { + def listTables(db: String, pattern: String): Seq[(TableIdentifier, Boolean)] = { val dbName = formatDatabaseName(db) - requireDbExists(dbName) - val dbTables = - externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) } + val dbTables: Seq[(TableIdentifier, Boolean)] = if (dbName == globalTempDB) { + globalTempViews.listNames(pattern).map { name => + TableIdentifier(name, Some(globalTempDB)) -> true + } + } else { + requireDbExists(dbName) + externalCatalog.listTables(dbName, pattern).map { name => + TableIdentifier(name, Some(dbName)) -> false + } + } synchronized { - val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern) - .map { t => TableIdentifier(t) } - dbTables ++ _tempTables + dbTables ++ StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name => + TableIdentifier(name) -> true + } } } @@ -504,6 +607,8 @@ class SessionCatalog( // If the database is not defined, there is a good chance this is a temp table. if (name.database.isEmpty) { tempTables.get(formatTableName(name.table)).foreach(_.refresh()) + } else if (formatDatabaseName(name.database.get) == globalTempDB) { + globalTempViews.get(formatTableName(name.table)).foreach(_.refresh()) } } @@ -908,7 +1013,7 @@ class SessionCatalog( listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } - listTables(DEFAULT_DATABASE).foreach { table => + listTables(DEFAULT_DATABASE).foreach { case (table, _) => dropTable(table, ignoreIfNotExists = false, purge = false) } listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func => @@ -919,6 +1024,7 @@ class SessionCatalog( } } tempTables.clear() + globalTempViews.clear() functionRegistry.clear() // restore built-in functions FunctionRegistry.builtin.listFunction().foreach { f => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 9cfbdffd0258..6cf43748926a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} -import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand} +import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython @@ -2433,7 +2433,7 @@ class Dataset[T] private[sql]( } /** - * Creates a temporary view using the given name. The lifetime of this + * Creates a local temporary view using the given name. The lifetime of this * temporary view is tied to the [[SparkSession]] that was used to create this Dataset. * * @throws AnalysisException if the view name already exists @@ -2443,21 +2443,46 @@ class Dataset[T] private[sql]( */ @throws[AnalysisException] def createTempView(viewName: String): Unit = withPlan { - createViewCommand(viewName, replace = false) + createTempViewCommand(viewName, replace = false, global = false) } + + /** - * Creates a temporary view using the given name. The lifetime of this + * Creates a local temporary view using the given name. The lifetime of this * temporary view is tied to the [[SparkSession]] that was used to create this Dataset. * * @group basic * @since 2.0.0 */ def createOrReplaceTempView(viewName: String): Unit = withPlan { - createViewCommand(viewName, replace = true) + createTempViewCommand(viewName, replace = true, global = false) } - private def createViewCommand(viewName: String, replace: Boolean): CreateViewCommand = { + /** + * Creates a global temporary view using the given name. The lifetime of this + * temporary view is tied to this Spark application. + * + * @throws TempTableAlreadyExistsException if the view name already exists + * + * @group basic + * @since 2.0.1 + */ + @throws[AnalysisException] + def createGlobalTempView(viewName: String): Unit = withPlan { + createTempViewCommand(viewName, replace = false, global = true) + } + + private def createTempViewCommand( + viewName: String, + replace: Boolean, + global: Boolean): CreateViewCommand = { + val viewType = if (global) { + GlobalTempView + } else { + LocalTempView + } + CreateViewCommand( name = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), userSpecifiedColumns = Nil, @@ -2467,7 +2492,7 @@ class Dataset[T] private[sql]( child = logicalPlan, allowExisting = false, replace = replace, - isTemporary = true) + viewType = viewType) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 1aed245fdd33..9e7667861b6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -178,7 +178,7 @@ abstract class Catalog { options: Map[String, String]): DataFrame /** - * Drops the temporary view with the given view name in the catalog. + * Drops the local temporary view with the given view name in the catalog. * If the view has been cached before, then it will also be uncached. * * @param viewName the name of the view to be dropped. @@ -186,6 +186,15 @@ abstract class Catalog { */ def dropTempView(viewName: String): Unit + /** + * Drops the global temporary view with the given view name in the catalog. + * If the view has been cached before, then it will also be uncached. + * + * @param viewName the name of the view to be dropped. + * @since 2.0.0 + */ + def dropGlobalTempView(viewName: String): Boolean + /** * Returns true if the table is currently cached in-memory. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 5359cedc8097..1bcde069649e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.parser._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _} +import org.apache.spark.sql.execution.datasources.{CreateTable, _} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.StructType /** * Concrete parser for Spark SQL statements. @@ -364,7 +364,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " + "CREATE TEMPORARY VIEW ... USING ... instead") - CreateTempViewUsing(table, schema, replace = true, provider, options) + CreateTempViewUsing(table, schema, replace = true, global = false, provider, options) } else { CreateTable(tableDesc, mode, None) } @@ -380,6 +380,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { tableIdent = visitTableIdentifier(ctx.tableIdentifier()), userSpecifiedSchema = Option(ctx.colTypeList()).map(createStructType), replace = ctx.REPLACE != null, + global = ctx.GLOBAL != null, provider = ctx.tableProvider.qualifiedName.getText, options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) } @@ -1248,7 +1249,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { * * For example: * {{{ - * CREATE [OR REPLACE] [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name + * CREATE [OR REPLACE] [[GLOBAL] TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name * [(column_name [COMMENT column_comment], ...) ] * [COMMENT view_comment] * [TBLPROPERTIES (property_name = property_value, ...)] @@ -1265,6 +1266,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } + val viewType = if (ctx.TEMPORARY == null) { + PermanentView + } else if (ctx.GLOBAL != null) { + GlobalTempView + } else { + LocalTempView + } + CreateViewCommand( name = visitTableIdentifier(ctx.tableIdentifier), userSpecifiedColumns = userSpecifiedColumns, @@ -1274,7 +1283,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { child = plan(ctx.query), allowExisting = ctx.EXISTS != null, replace = ctx.REPLACE != null, - isTemporary = ctx.TEMPORARY != null) + viewType = viewType) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 0f61629317c8..adede8a90249 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -517,7 +517,8 @@ case class ShowTablesCommand( // The result of SHOW TABLES has two columns, tableName and isTemporary. override val output: Seq[Attribute] = { - AttributeReference("tableName", StringType, nullable = false)() :: + AttributeReference("database", StringType, nullable = false)() :: + AttributeReference("tableName", StringType, nullable = false)() :: AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil } @@ -528,9 +529,8 @@ case class ShowTablesCommand( val db = databaseName.getOrElse(catalog.getCurrentDatabase) val tables = tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db)) - tables.map { t => - val isTemp = t.database.isEmpty - Row(t.table, isTemp) + tables.map { case (tableIdent, isTemp) => + Row(tableIdent.database.getOrElse(""), tableIdent.table, isTemp) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 15340ee921f6..f6b42a491ced 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -19,13 +19,46 @@ package org.apache.spark.sql.execution.command import scala.util.control.NonFatal -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} +import org.apache.spark.sql.types.{MetadataBuilder, StructType} + + +/** + * ViewType is used to specify the expected view type when we want to create or replace a view in + * [[CreateViewCommand]]. + */ +sealed trait ViewType + +/** + * LocalTempView means session-scoped local temporary views. Its lifetime is the lifetime of the + * session that created it, i.e. it will be automatically dropped when the session terminates. It's + * not tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view. + */ +object LocalTempView extends ViewType + +/** + * GlobalTempView means cross-session global temporary views. Its lifetime is the lifetime of the + * Spark application, i.e. it will be automatically dropped when the application terminates. It's + * tied to a system preserved database `_global_temp`, and we must use the qualified name to refer a + * global temp view, e.g. SELECT * FROM _global_temp.view1. + */ +object GlobalTempView extends ViewType + +/** + * PermanentView means cross-session permanent views. Permanent views stay until they are + * explicitly dropped by user command. It's always tied to a database, default to the current + * database if not specified. + * + * Note that, Existing permanent view with the same name are not visible to the current session + * while the local temporary view exists, unless the view name is qualified by database. + */ +object PermanentView extends ViewType /** @@ -46,10 +79,7 @@ import org.apache.spark.sql.types.StructType * already exists, throws analysis exception. * @param replace if true, and if the view already exists, updates it; if false, and if the view * already exists, throws analysis exception. - * @param isTemporary if true, the view is created as a temporary view. Temporary views are dropped - * at the end of current Spark session. Existing permanent relations with the same - * name are not visible to the current session while the temporary view exists, - * unless they are specified with full qualified table name with database prefix. + * @param viewType the expected view type to be created with this command. */ case class CreateViewCommand( name: TableIdentifier, @@ -60,20 +90,21 @@ case class CreateViewCommand( child: LogicalPlan, allowExisting: Boolean, replace: Boolean, - isTemporary: Boolean) + viewType: ViewType) extends RunnableCommand { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) - if (!isTemporary) { - require(originalText.isDefined, - "The table to created with CREATE VIEW must have 'originalText'.") + if (viewType == PermanentView) { + require(originalText.isDefined, "'originalText' must be provided to create permanent view") } if (allowExisting && replace) { throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.") } + private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView + // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE' if (allowExisting && isTemporary) { throw new AnalysisException( @@ -99,72 +130,53 @@ case class CreateViewCommand( s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " + s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).") } - val sessionState = sparkSession.sessionState - if (isTemporary) { - createTemporaryView(sparkSession, analyzedPlan) - } else { - // Adds default database for permanent table if it doesn't exist, so that tableExists() - // only check permanent tables. - val database = name.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val qualifiedName = name.copy(database = Option(database)) - - if (sessionState.catalog.tableExists(qualifiedName)) { - val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName) - if (allowExisting) { - // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view - // already exists. - } else if (tableMetadata.tableType != CatalogTableType.VIEW) { - throw new AnalysisException(s"$qualifiedName is not a view") - } else if (replace) { - // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` - sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) - } else { - // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already - // exists. - throw new AnalysisException( - s"View $qualifiedName already exists. If you want to update the view definition, " + - "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") - } - } else { - // Create the view if it doesn't exist. - sessionState.catalog.createTable( - prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false) - } - } - Seq.empty[Row] - } - - private def createTemporaryView(sparkSession: SparkSession, analyzedPlan: LogicalPlan): Unit = { - val catalog = sparkSession.sessionState.catalog - - // Projects column names to alias names - val logicalPlan = if (userSpecifiedColumns.isEmpty) { + val aliasedPlan = if (userSpecifiedColumns.isEmpty) { analyzedPlan } else { val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, _)) => Alias(attr, colName)() + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) } sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed } - catalog.createTempView(name.table, logicalPlan, replace) + val catalog = sparkSession.sessionState.catalog + if (viewType == LocalTempView) { + catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace) + } else if (viewType == GlobalTempView) { + catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = replace) + } else if (catalog.tableExists(name)) { + val tableMetadata = catalog.getTableMetadata(name) + if (allowExisting) { + // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view + // already exists. + } else if (tableMetadata.tableType != CatalogTableType.VIEW) { + throw new AnalysisException(s"$name is not a view") + } else if (replace) { + // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` + catalog.alterTable(prepareTable(sparkSession, aliasedPlan)) + } else { + // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already + // exists. + throw new AnalysisException( + s"View $name already exists. If you want to update the view definition, " + + "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS") + } + } else { + // Create the view if it doesn't exist. + catalog.createTable(prepareTable(sparkSession, aliasedPlan), ignoreIfExists = false) + } + Seq.empty[Row] } /** * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize * SQL based on the analyzed plan, and also creates the proper schema for the view. */ - private def prepareTable(sparkSession: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = { - val aliasedPlan = if (userSpecifiedColumns.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, _)) => Alias(attr, colName)() - } - sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed - } - + private def prepareTable(sparkSession: SparkSession, aliasedPlan: LogicalPlan): CatalogTable = { val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL // Validate the view SQL - make sure we can parse it and analyze it. @@ -176,19 +188,11 @@ case class CreateViewCommand( throw new RuntimeException(s"Failed to analyze the canonicalized SQL: $viewSQL", e) } - val viewSchema = if (userSpecifiedColumns.isEmpty) { - aliasedPlan.schema - } else { - StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map { - case (field, (_, comment)) => comment.map(field.withComment).getOrElse(field) - }) - } - CatalogTable( identifier = name, tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = viewSchema, + schema = aliasedPlan.schema, properties = properties, viewOriginalText = originalText, viewText = Some(viewSQL), @@ -197,6 +201,45 @@ case class CreateViewCommand( } } + +/** + * Create or replace a local/global temporary view with given data source. + */ +case class CreateTempViewUsing( + tableIdent: TableIdentifier, + userSpecifiedSchema: Option[StructType], + replace: Boolean, + global: Boolean, + provider: String, + options: Map[String, String]) extends RunnableCommand { + + if (tableIdent.database.isDefined) { + throw new AnalysisException( + s"Temporary view '$tableIdent' should not have specified a database") + } + + def run(sparkSession: SparkSession): Seq[Row] = { + val dataSource = DataSource( + sparkSession, + userSpecifiedSchema = userSpecifiedSchema, + className = provider, + options = options) + + val catalog = sparkSession.sessionState.catalog + val viewDefinition = Dataset.ofRows( + sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan + + if (global) { + catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace) + } else { + catalog.createTempView(tableIdent.table, viewDefinition, replace) + } + + Seq.empty[Row] + } +} + + /** * Alter a view with given query plan. If the view name contains database prefix, this command will * alter a permanent view matching the given name, or throw an exception if view not exist. Else, @@ -222,8 +265,8 @@ case class AlterViewAsCommand( qe.assertAnalyzed() val analyzedPlan = qe.analyzed - if (session.sessionState.catalog.isTemporaryTable(name)) { - session.sessionState.catalog.createTempView(name.table, analyzedPlan, overrideIfExists = true) + if (session.sessionState.catalog.alterTempViewDefinition(name, analyzedPlan)) { + // a local/global temp view has been altered, we are done. } else { alterPermanentView(session, analyzedPlan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index fa95af2648cf..598247f8d422 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -40,33 +40,6 @@ case class CreateTable( override def innerChildren: Seq[QueryPlan[_]] = query.toSeq } -case class CreateTempViewUsing( - tableIdent: TableIdentifier, - userSpecifiedSchema: Option[StructType], - replace: Boolean, - provider: String, - options: Map[String, String]) extends RunnableCommand { - - if (tableIdent.database.isDefined) { - throw new AnalysisException( - s"Temporary table '$tableIdent' should not have specified a database") - } - - def run(sparkSession: SparkSession): Seq[Row] = { - val dataSource = DataSource( - sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - options = options) - sparkSession.sessionState.catalog.createTempView( - tableIdent.table, - Dataset.ofRows(sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan, - replace) - - Seq.empty[Row] - } -} - case class RefreshTable(tableIdent: TableIdentifier) extends RunnableCommand { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index f25253576589..33fa46d24deb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -92,9 +92,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("database does not exist") override def listTables(dbName: String): Dataset[Table] = { - requireDatabaseExists(dbName) - val tables = sessionCatalog.listTables(dbName).map { tableIdent => - val isTemp = tableIdent.database.isEmpty + val tables = sessionCatalog.listTables(dbName).map { case (tableIdent, isTemp) => val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent)) new Table( name = tableIdent.identifier, @@ -277,7 +275,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Drops the temporary view with the given view name in the catalog. + * Drops the local temporary view with the given view name in the catalog. * If the view has been cached/persisted before, it's also unpersisted. * * @param viewName the name of the view to be dropped. @@ -291,6 +289,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } } + /** + * Drops the global temporary view with the given view name in the catalog. + * If the view has been cached/persisted before, it's also unpersisted. + * + * @param viewName the name of the view to be dropped. + * @group ddl_ops + * @since 2.0.1 + */ + override def dropGlobalTempView(viewName: String): Boolean = { + sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef => + sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, viewDef)) + sessionCatalog.dropGlobalTempView(viewName) + } + } + /** * Returns true if the table is currently cached in-memory. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index c899773b6b36..40a95ff5ff73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -94,6 +94,8 @@ private[sql] class SessionState(sparkSession: SparkSession) { */ lazy val catalog = new SessionCatalog( sparkSession.sharedState.externalCatalog, + sparkSession.sharedState.globalTempDB, + sparkSession.sharedState.globalTempViews, functionResourceLoader, functionRegistry, conf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 6387f0150631..90555500951c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -22,11 +22,11 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog} +import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.util.{MutableURLClassLoader, Utils} @@ -37,6 +37,18 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} */ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { + val globalTempDB = sparkContext.conf.get( + SessionCatalog.GLOBAL_TEMP_DB_CONF_KEY, SessionCatalog.DEFAULT_GLOBAL_TEMP_DB) + + val globalTempViews = { + if (externalCatalog.databaseExists(globalTempDB)) { + throw new SparkException( + s"$globalTempDB is a system preserved database, please rename your existing database " + + "to resolve the name conflict and launch your Spark application again.") + } + new GlobalTempViewManager + } + /** * Class for caching query results reused in future executions. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala new file mode 100644 index 000000000000..1346d81445fe --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class GlobalTempViewSuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + override protected def beforeAll(): Unit = { + super.beforeAll() + globalTempDB = spark.sharedState.globalTempDB + } + + private var globalTempDB: String = _ + + test("basic semantic") { + sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'") + + // If there is no database in table name, we should try local temp view first, if not found, + // try table/view in current database, which is "default" in this case. So we expect + // NoSuchTableException here. + intercept[NoSuchTableException](spark.table("src")) + + // Use qualified name to refer to the global temp view explicitly. + checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a")) + + // Table name without database will never refer to a global temp view. + intercept[NoSuchTableException](sql("DROP VIEW src")) + + sql(s"DROP VIEW $globalTempDB.src") + // The global temp view should be dropped successfully. + intercept[NoSuchTableException](spark.table(s"$globalTempDB.src")) + + // We can also use Dataset API to create global temp view + Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src") + checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a")) + + // Use qualified name to rename a global temp view. + sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2") + intercept[NoSuchTableException](spark.table(s"$globalTempDB.src")) + checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a")) + + // Use qualified name to alter a global temp view. + sql(s"ALTER VIEW $globalTempDB.src2 AS SELECT 2, 'b'") + checkAnswer(spark.table(s"$globalTempDB.src2"), Row(2, "b")) + + // We can also use Catalog API to drop global temp view + spark.catalog.dropGlobalTempView("src2") + intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2")) + } + + test("global temp view database should be preserved") { + val e = intercept[AnalysisException](sql(s"CREATE DATABASE $globalTempDB")) + assert(e.message.contains("system preserved database")) + + val e2 = intercept[AnalysisException](sql(s"USE $globalTempDB")) + assert(e2.message.contains("system preserved database")) + } + + test("CREATE TABLE LIKE should work for global temp view") { + try { + sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1 AS a, '2' AS b") + sql(s"CREATE TABLE cloned LIKE ${globalTempDB}.src") + val tableMeta = spark.sessionState.catalog.getTableMetadata(TableIdentifier("cloned")) + assert(tableMeta.schema == new StructType().add("a", "int", false).add("b", "string", false)) + } finally { + spark.catalog.dropGlobalTempView("src") + sql("DROP TABLE default.cloned") + } + } + + test("list global temp views") { + try { + sql("CREATE GLOBAL TEMP VIEW v1 AS SELECT 3, 4") + sql("CREATE TEMP VIEW v2 AS SELECT 1, 2") + + checkAnswer(sql(s"SHOW TABLES IN $globalTempDB"), + Row(globalTempDB, "v1", true) :: + Row("", "v2", true) :: Nil) + + assert(spark.catalog.listTables(globalTempDB).collect().toSeq.map(_.name) == Seq("v1", "v2")) + } finally { + spark.catalog.dropTempView("v1") + spark.catalog.dropGlobalTempView("v2") + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 85c509847d8e..ad0b319c2b98 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule @@ -41,6 +41,8 @@ import org.apache.spark.util.Utils private[sql] class HiveSessionCatalog( externalCatalog: HiveExternalCatalog, + globalTempDB: String, + globalTempViews: GlobalTempViewManager, sparkSession: SparkSession, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, @@ -48,6 +50,8 @@ private[sql] class HiveSessionCatalog( hadoopConf: Configuration) extends SessionCatalog( externalCatalog, + globalTempDB, + globalTempViews, functionResourceLoader, functionRegistry, conf, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index eb10c11382e8..0e41e4db58da 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -45,6 +45,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override lazy val catalog = { new HiveSessionCatalog( sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], + sparkSession.sharedState.globalTempDB, + sparkSession.sharedState.globalTempViews, sparkSession, functionResourceLoader, functionRegistry, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index c927e5d802c9..7b09ca45b0a1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -920,7 +920,7 @@ class HiveDDLSuite client.runSqlHive( s"CREATE INDEX $indexName ON TABLE $tabName (a) AS 'COMPACT' WITH DEFERRED REBUILD") val indexTabName = - spark.sessionState.catalog.listTables("default", s"*$indexName*").head.table + spark.sessionState.catalog.listTables("default", s"*$indexName*").head._1.table intercept[TableAlreadyExistsException] { sql(s"CREATE TABLE $indexTabName(b int)") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index a215c70da0c5..9884269c1457 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -62,15 +62,15 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { var e = intercept[AnalysisException] { sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt") }.getMessage - assert(e.contains("`default`.`tab1` is not a view")) + assert(e.contains("`tab1` is not a view")) e = intercept[AnalysisException] { sql("CREATE VIEW tab1 AS SELECT * FROM jt") }.getMessage - assert(e.contains("`default`.`tab1` is not a view")) + assert(e.contains("`tab1` is not a view")) e = intercept[AnalysisException] { sql("ALTER VIEW tab1 AS SELECT * FROM jt") }.getMessage - assert(e.contains("`default`.`tab1` is not a view")) + assert(e.contains("`tab1` is not a view")) } } From 67e459a48c82ef2b13ffedbc23f3921db0721204 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 23 Sep 2016 17:06:54 +0800 Subject: [PATCH 02/13] address comments --- .../spark/internal/config/package.scala | 7 ++ .../catalog/GlobalTempViewManager.scala | 31 ++++- .../sql/catalyst/catalog/SessionCatalog.scala | 116 +++++++++--------- .../scala/org/apache/spark/sql/Dataset.scala | 11 +- .../apache/spark/sql/catalog/Catalog.scala | 11 +- .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../spark/sql/execution/command/tables.scala | 3 +- .../spark/sql/execution/command/views.scala | 47 +------ .../spark/sql/execution/datasources/ddl.scala | 37 ++++++ .../spark/sql/internal/CatalogImpl.scala | 5 +- .../spark/sql/internal/SessionState.scala | 3 +- .../spark/sql/internal/SharedState.scala | 13 +- .../apache/spark/sql/SQLContextSuite.scala | 11 +- .../sql/execution/GlobalTempViewSuite.scala | 18 ++- .../sql/execution/command/DDLSuite.scala | 10 +- .../spark/sql/hive/HiveSessionCatalog.scala | 6 +- .../spark/sql/hive/HiveSessionState.scala | 3 +- .../sql/hive/execution/HiveDDLSuite.scala | 2 +- 18 files changed, 202 insertions(+), 134 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index d536cc5097b2..0896e68eca7d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -98,6 +98,13 @@ package object config { .checkValues(Set("hive", "in-memory")) .createWithDefault("in-memory") + // Note: This is a SQL config but needs to be in core because it's cross-session and can not put + // in SQLConf. + private[spark] val GLOBAL_TEMP_DATABASE = ConfigBuilder("spark.sql.globalTempDatabase") + .internal() + .stringConf + .createWithDefault("global_temp") + private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") .intConf diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala index cdb778d28acc..6095ac0bc9c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala @@ -33,17 +33,26 @@ import org.apache.spark.sql.catalyst.util.StringUtils * * Note that, the view name is always case-sensitive here, callers are responsible to format the * view name w.r.t. case-sensitive config. + * + * @param database The system preserved virtual database that keeps all the global temporary views. */ -class GlobalTempViewManager { +class GlobalTempViewManager(val database: String) { /** List of view definitions, mapping from view name to logical plan. */ @GuardedBy("this") private val viewDefinitions = new mutable.HashMap[String, LogicalPlan] + /** + * Returns the global view definition which matches the given name, or None if not found. + */ def get(name: String): Option[LogicalPlan] = synchronized { viewDefinitions.get(name) } + /** + * Creates a global temp view, or issue an exception if the view already exists and + * `overrideIfExists` is false. + */ def create( name: String, viewDefinition: LogicalPlan, @@ -54,10 +63,12 @@ class GlobalTempViewManager { viewDefinitions.put(name, viewDefinition) } + /** + * Updates the global temp view if it exists, returns true if updated, false otherwise. + */ def update( name: String, viewDefinition: LogicalPlan): Boolean = synchronized { - // Only update it when the view with the given name exits. if (viewDefinitions.contains(name)) { viewDefinitions.put(name, viewDefinition) true @@ -66,10 +77,18 @@ class GlobalTempViewManager { } } + /** + * Removes the global temp view if it exists, returns true if removed, false otherwise. + */ def remove(name: String): Boolean = synchronized { viewDefinitions.remove(name).isDefined } + /** + * Renames the global temp view if the source view exists and the destination view not exists, or + * issue an exception if the source view exists but the destination view already exists. Returns + * true if renamed, false otherwise. + */ def rename(oldName: String, newName: String): Boolean = synchronized { if (viewDefinitions.contains(oldName)) { if (viewDefinitions.contains(newName)) { @@ -86,10 +105,16 @@ class GlobalTempViewManager { } } - def listNames(pattern: String): Seq[String] = synchronized { + /** + * Lists the names of all global temporary views. + */ + def listViewNames(pattern: String): Seq[String] = synchronized { StringUtils.filterPattern(viewDefinitions.keys.toSeq, pattern) } + /** + * Clears all the global temporary views. + */ def clear(): Unit = synchronized { viewDefinitions.clear() } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index fd99d6085bfb..a2eb434df417 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} @@ -36,9 +37,6 @@ import org.apache.spark.sql.catalyst.util.StringUtils object SessionCatalog { val DEFAULT_DATABASE = "default" - - val GLOBAL_TEMP_DB_CONF_KEY = "spark.sql.database.globalTemp" - val DEFAULT_GLOBAL_TEMP_DB = "global_temp" } /** @@ -50,8 +48,7 @@ object SessionCatalog { */ class SessionCatalog( externalCatalog: ExternalCatalog, - globalTempDB: String, - globalTempViews: GlobalTempViewManager, + globalTempViewManager: GlobalTempViewManager, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, conf: CatalystConf, @@ -66,8 +63,7 @@ class SessionCatalog( conf: CatalystConf) { this( externalCatalog, - SessionCatalog.DEFAULT_GLOBAL_TEMP_DB, - new GlobalTempViewManager, + new GlobalTempViewManager(GLOBAL_TEMP_DATABASE.defaultValueString), DummyFunctionResourceLoader, functionRegistry, conf, @@ -150,9 +146,10 @@ class SessionCatalog( def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = { val dbName = formatDatabaseName(dbDefinition.name) - if (dbName == globalTempDB) { - throw new AnalysisException(s"$globalTempDB is a system preserved database, " + - "you cannot create a database with this name.") + if (dbName == globalTempViewManager.database) { + throw new AnalysisException( + s"${globalTempViewManager.database} is a system preserved database, " + + "you cannot create a database with this name.") } val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString externalCatalog.createDatabase( @@ -184,7 +181,7 @@ class SessionCatalog( def databaseExists(db: String): Boolean = { val dbName = formatDatabaseName(db) - externalCatalog.databaseExists(dbName) + dbName == globalTempViewManager.database || externalCatalog.databaseExists(dbName) } def listDatabases(): Seq[String] = { @@ -199,9 +196,10 @@ class SessionCatalog( def setCurrentDatabase(db: String): Unit = { val dbName = formatDatabaseName(db) - if (dbName == globalTempDB) { - throw new AnalysisException(s"$globalTempDB is a system preserved database, " + - "you cannot use it as current database.") + if (dbName == globalTempViewManager.database) { + throw new AnalysisException( + s"${globalTempViewManager.database} is a system preserved database, " + + "you cannot use it as current database.") } requireDbExists(dbName) synchronized { currentDb = dbName } @@ -290,7 +288,7 @@ class SessionCatalog( */ def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) - if (db == globalTempDB) return None + if (db == globalTempViewManager.database) return None val table = formatTableName(name.table) requireDbExists(db) externalCatalog.getTableOption(db, table) @@ -349,13 +347,13 @@ class SessionCatalog( */ def createTempView( name: String, - viewDefinition: LogicalPlan, + tableDefinition: LogicalPlan, overrideIfExists: Boolean): Unit = synchronized { - val viewName = formatTableName(name) - if (tempTables.contains(viewName) && !overrideIfExists) { + val table = formatTableName(name) + if (tempTables.contains(table) && !overrideIfExists) { throw new TempTableAlreadyExistsException(name) } - tempTables.put(viewName, viewDefinition) + tempTables.put(table, tableDefinition) } /** @@ -365,7 +363,7 @@ class SessionCatalog( name: String, viewDefinition: LogicalPlan, overrideIfExists: Boolean): Unit = { - globalTempViews.create(formatTableName(name), viewDefinition, overrideIfExists) + globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists) } /** @@ -383,8 +381,8 @@ class SessionCatalog( } else { false } - } else if (formatDatabaseName(name.database.get) == globalTempDB) { - globalTempViews.update(viewName, viewDefinition) + } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { + globalTempViewManager.update(viewName, viewDefinition) } else { false } @@ -401,7 +399,7 @@ class SessionCatalog( * Return a global temporary view exactly as it was stored. */ def getGlobalTempView(name: String): Option[LogicalPlan] = { - globalTempViews.get(formatTableName(name)) + globalTempViewManager.get(formatTableName(name)) } /** @@ -414,7 +412,9 @@ class SessionCatalog( /** * Drop a global temporary view. */ - def dropGlobalTempView(name: String): Boolean = globalTempViews.remove(formatTableName(name)) + def dropGlobalTempView(name: String): Boolean = { + globalTempViewManager.remove(formatTableName(name)) + } // ------------------------------------------------------------- // | Methods that interact with temporary and metastore tables | @@ -439,14 +439,14 @@ class SessionCatalog( storage = CatalogStorageFormat.empty, schema = plan.output.toStructType) }.getOrElse(getTableMetadata(name)) - } else if (formatDatabaseName(name.database.get) == globalTempDB) { - globalTempViews.get(table).map { plan => + } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { + globalTempViewManager.get(table).map { plan => CatalogTable( identifier = TableIdentifier(table), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, schema = plan.output.toStructType) - }.getOrElse(throw new NoSuchTableException(globalTempDB, table)) + }.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table)) } else { getTableMetadata(name) } @@ -463,8 +463,8 @@ class SessionCatalog( val db = formatDatabaseName(oldName.database.getOrElse(currentDb)) val oldTableName = formatTableName(oldName.table) val newTableName = formatTableName(newName) - if (db == globalTempDB) { - globalTempViews.rename(oldTableName, newTableName) + if (db == globalTempViewManager.database) { + globalTempViewManager.rename(oldTableName, newTableName) } else { requireDbExists(db) if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { @@ -496,15 +496,15 @@ class SessionCatalog( purge: Boolean): Unit = synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) - if (db == globalTempDB) { - val viewExists = globalTempViews.remove(table) + if (db == globalTempViewManager.database) { + val viewExists = globalTempViewManager.remove(table) if (!viewExists && !ignoreIfNotExists) { - throw new NoSuchTableException(globalTempDB, table) + throw new NoSuchTableException(globalTempViewManager.database, table) } } else { if (name.database.isDefined || !tempTables.contains(table)) { requireDbExists(db) - // When ignoreIfNotExists is false, no lexception is issued when the table does not exist. + // When ignoreIfNotExists is false, no exception is issued when the table does not exist. // Instead, log it as an error message. if (tableExists(TableIdentifier(table, Option(db)))) { externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge) @@ -524,6 +524,9 @@ class SessionCatalog( * If no database is specified, this will first attempt to return a temporary table/view with * the same name, then, if that does not exist, return the table/view from the current database. * + * Note that, the global temp view database is also valid here, this will return the global temp + * view matching the given name. + * * If the relation is a view, the relation will be wrapped in a [[SubqueryAlias]] which will * track the name of the view. */ @@ -532,8 +535,8 @@ class SessionCatalog( val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) val relationAlias = alias.getOrElse(table) - if (db == globalTempDB) { - globalTempViews.get(table).map { viewDef => + if (db == globalTempViewManager.database) { + globalTempViewManager.get(table).map { viewDef => SubqueryAlias(relationAlias, viewDef, Some(name)) }.getOrElse(throw new NoSuchTableException(db, table)) } else if (name.database.isDefined || !tempTables.contains(table)) { @@ -558,44 +561,45 @@ class SessionCatalog( val table = formatTableName(name.table) if (name.database.isEmpty) { tempTables.contains(table) - } else if (formatDatabaseName(name.database.get) == globalTempDB) { - globalTempViews.get(table).isDefined + } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { + globalTempViewManager.get(table).isDefined } else { false } } /** - * List all tables in the specified database, including local temporary tables. Returns pairs of - * table identifier and isTemporary flag. + * List all tables in the specified database, including local temporary tables. * - * Note that, if the specified database is global temp database, we will list global temp views. + * Note that, if the specified database is global temporary view database, we will list global + * temporary views. */ - def listTables(db: String): Seq[(TableIdentifier, Boolean)] = listTables(db, "*") + def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") /** - * List all matching tables in the specified database, including local temporary tables. Returns - * pairs of table identifier and isTemporary flag. + * List all matching tables in the specified database, including local temporary tables. * - * Note that, if the specified database is global temp database, we will list global temp views. + * Note that, if the specified database is global temporary view database, we will list global + * temporary views. */ - def listTables(db: String, pattern: String): Seq[(TableIdentifier, Boolean)] = { + def listTables(db: String, pattern: String): Seq[TableIdentifier] = { val dbName = formatDatabaseName(db) - val dbTables: Seq[(TableIdentifier, Boolean)] = if (dbName == globalTempDB) { - globalTempViews.listNames(pattern).map { name => - TableIdentifier(name, Some(globalTempDB)) -> true + val dbTables = if (dbName == globalTempViewManager.database) { + globalTempViewManager.listViewNames(pattern).map { name => + TableIdentifier(name, Some(globalTempViewManager.database)) } } else { requireDbExists(dbName) externalCatalog.listTables(dbName, pattern).map { name => - TableIdentifier(name, Some(dbName)) -> false + TableIdentifier(name, Some(dbName)) } } - synchronized { - dbTables ++ StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name => - TableIdentifier(name) -> true + val localTempViews = synchronized { + StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name => + TableIdentifier(name) } } + dbTables ++ localTempViews } /** @@ -607,8 +611,8 @@ class SessionCatalog( // If the database is not defined, there is a good chance this is a temp table. if (name.database.isEmpty) { tempTables.get(formatTableName(name.table)).foreach(_.refresh()) - } else if (formatDatabaseName(name.database.get) == globalTempDB) { - globalTempViews.get(formatTableName(name.table)).foreach(_.refresh()) + } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { + globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh()) } } @@ -1013,7 +1017,7 @@ class SessionCatalog( listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } - listTables(DEFAULT_DATABASE).foreach { case (table, _) => + listTables(DEFAULT_DATABASE).foreach { table => dropTable(table, ignoreIfNotExists = false, purge = false) } listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func => @@ -1024,7 +1028,7 @@ class SessionCatalog( } } tempTables.clear() - globalTempViews.clear() + globalTempViewManager.clear() functionRegistry.clear() // restore built-in functions FunctionRegistry.builtin.listFunction().foreach { f => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 6cf43748926a..4b52508740bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2436,6 +2436,10 @@ class Dataset[T] private[sql]( * Creates a local temporary view using the given name. The lifetime of this * temporary view is tied to the [[SparkSession]] that was used to create this Dataset. * + * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that + * created it, i.e. it will be automatically dropped when the session terminates. It's not + * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view. + * * @throws AnalysisException if the view name already exists * * @group basic @@ -2463,10 +2467,15 @@ class Dataset[T] private[sql]( * Creates a global temporary view using the given name. The lifetime of this * temporary view is tied to this Spark application. * + * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application, + * i.e. it will be automatically dropped when the application terminates. It's tied to a system + * preserved database `_global_temp`, and we must use the qualified name to refer a global temp + * view, e.g. `SELECT * FROM _global_temp.view1`. + * * @throws TempTableAlreadyExistsException if the view name already exists * * @group basic - * @since 2.0.1 + * @since 2.1.0 */ @throws[AnalysisException] def createGlobalTempView(viewName: String): Unit = withPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 9e7667861b6b..9aca20b176a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -181,6 +181,10 @@ abstract class Catalog { * Drops the local temporary view with the given view name in the catalog. * If the view has been cached before, then it will also be uncached. * + * Local temporary view is session-scoped. Its lifetime is the lifetime of the session that + * created it, i.e. it will be automatically dropped when the session terminates. It's not + * tied to any databases, i.e. we can't use `db1.view1` to reference a local temporary view. + * * @param viewName the name of the view to be dropped. * @since 2.0.0 */ @@ -190,8 +194,13 @@ abstract class Catalog { * Drops the global temporary view with the given view name in the catalog. * If the view has been cached before, then it will also be uncached. * + * Global temporary view is cross-session. Its lifetime is the lifetime of the Spark application, + * i.e. it will be automatically dropped when the application terminates. It's tied to a system + * preserved database `_global_temp`, and we must use the qualified name to refer a global temp + * view, e.g. `SELECT * FROM _global_temp.view1`. + * * @param viewName the name of the view to be dropped. - * @since 2.0.0 + * @since 2.1.0 */ def dropGlobalTempView(viewName: String): Boolean diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 1bcde069649e..7ce407fae3e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -1267,7 +1267,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } val viewType = if (ctx.TEMPORARY == null) { - PermanentView + PersistedView } else if (ctx.GLOBAL != null) { GlobalTempView } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index adede8a90249..b812b2fe29b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -529,7 +529,8 @@ case class ShowTablesCommand( val db = databaseName.getOrElse(catalog.getCurrentDatabase) val tables = tableIdentifierPattern.map(catalog.listTables(db, _)).getOrElse(catalog.listTables(db)) - tables.map { case (tableIdent, isTemp) => + tables.map { tableIdent => + val isTemp = catalog.isTemporaryTable(tableIdent) Row(tableIdent.database.getOrElse(""), tableIdent.table, isTemp) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index f6b42a491ced..bbcd9c4ef564 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -51,14 +51,14 @@ object LocalTempView extends ViewType object GlobalTempView extends ViewType /** - * PermanentView means cross-session permanent views. Permanent views stay until they are + * PersistedView means cross-session persisted views. Persisted views stay until they are * explicitly dropped by user command. It's always tied to a database, default to the current * database if not specified. * - * Note that, Existing permanent view with the same name are not visible to the current session + * Note that, Existing persisted view with the same name are not visible to the current session * while the local temporary view exists, unless the view name is qualified by database. */ -object PermanentView extends ViewType +object PersistedView extends ViewType /** @@ -95,7 +95,7 @@ case class CreateViewCommand( override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) - if (viewType == PermanentView) { + if (viewType == PersistedView) { require(originalText.isDefined, "'originalText' must be provided to create permanent view") } @@ -201,45 +201,6 @@ case class CreateViewCommand( } } - -/** - * Create or replace a local/global temporary view with given data source. - */ -case class CreateTempViewUsing( - tableIdent: TableIdentifier, - userSpecifiedSchema: Option[StructType], - replace: Boolean, - global: Boolean, - provider: String, - options: Map[String, String]) extends RunnableCommand { - - if (tableIdent.database.isDefined) { - throw new AnalysisException( - s"Temporary view '$tableIdent' should not have specified a database") - } - - def run(sparkSession: SparkSession): Seq[Row] = { - val dataSource = DataSource( - sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - className = provider, - options = options) - - val catalog = sparkSession.sessionState.catalog - val viewDefinition = Dataset.ofRows( - sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan - - if (global) { - catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace) - } else { - catalog.createTempView(tableIdent.table, viewDefinition, replace) - } - - Seq.empty[Row] - } -} - - /** * Alter a view with given query plan. If the view name contains database prefix, this command will * alter a permanent view matching the given name, or throw an exception if view not exist. Else, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 598247f8d422..59fb48ffea59 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -40,6 +40,43 @@ case class CreateTable( override def innerChildren: Seq[QueryPlan[_]] = query.toSeq } +/** + * Create or replace a local/global temporary view with given data source. + */ +case class CreateTempViewUsing( + tableIdent: TableIdentifier, + userSpecifiedSchema: Option[StructType], + replace: Boolean, + global: Boolean, + provider: String, + options: Map[String, String]) extends RunnableCommand { + + if (tableIdent.database.isDefined) { + throw new AnalysisException( + s"Temporary view '$tableIdent' should not have specified a database") + } + + def run(sparkSession: SparkSession): Seq[Row] = { + val dataSource = DataSource( + sparkSession, + userSpecifiedSchema = userSpecifiedSchema, + className = provider, + options = options) + + val catalog = sparkSession.sessionState.catalog + val viewDefinition = Dataset.ofRows( + sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan + + if (global) { + catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace) + } else { + catalog.createTempView(tableIdent.table, viewDefinition, replace) + } + + Seq.empty[Row] + } +} + case class RefreshTable(tableIdent: TableIdentifier) extends RunnableCommand { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 33fa46d24deb..ac9e9b616394 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -92,7 +92,8 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("database does not exist") override def listTables(dbName: String): Dataset[Table] = { - val tables = sessionCatalog.listTables(dbName).map { case (tableIdent, isTemp) => + val tables = sessionCatalog.listTables(dbName).map { tableIdent => + val isTemp = sessionCatalog.isTemporaryTable(tableIdent) val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent)) new Table( name = tableIdent.identifier, @@ -295,7 +296,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * * @param viewName the name of the view to be dropped. * @group ddl_ops - * @since 2.0.1 + * @since 2.1.0 */ override def dropGlobalTempView(viewName: String): Boolean = { sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { viewDef => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 40a95ff5ff73..4b4301ee281f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -94,8 +94,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { */ lazy val catalog = new SessionCatalog( sparkSession.sharedState.externalCatalog, - sparkSession.sharedState.globalTempDB, - sparkSession.sharedState.globalTempViews, + sparkSession.sharedState.globalTempViewManager, functionResourceLoader, functionRegistry, conf, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 90555500951c..7e055d29a72e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -26,7 +26,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, GlobalTempViewManager, InMemoryCatalog} import org.apache.spark.sql.execution.CacheManager import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.util.{MutableURLClassLoader, Utils} @@ -37,16 +37,17 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} */ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { - val globalTempDB = sparkContext.conf.get( - SessionCatalog.GLOBAL_TEMP_DB_CONF_KEY, SessionCatalog.DEFAULT_GLOBAL_TEMP_DB) - - val globalTempViews = { + val globalTempViewManager = { + // System preserved database should not exists in metastore. However it's hard to guarantee it + // for every session, because case-sensitivity differs. Here we always lowercase it to make our + // life easier. + val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase if (externalCatalog.databaseExists(globalTempDB)) { throw new SparkException( s"$globalTempDB is a system preserved database, please rename your existing database " + "to resolve the name conflict and launch your Spark application again.") } - new GlobalTempViewManager + new GlobalTempViewManager(globalTempDB) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 001c1a1d8531..2b35db411e2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -88,11 +88,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { df.createOrReplaceTempView("listtablessuitetable") assert( sqlContext.tables().filter("tableName = 'listtablessuitetable'").collect().toSeq == - Row("listtablessuitetable", true) :: Nil) + Row("", "listtablessuitetable", true) :: Nil) assert( sqlContext.sql("SHOW tables").filter("tableName = 'listtablessuitetable'").collect().toSeq == - Row("listtablessuitetable", true) :: Nil) + Row("", "listtablessuitetable", true) :: Nil) sqlContext.sessionState.catalog.dropTable( TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false) @@ -105,11 +105,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { df.createOrReplaceTempView("listtablessuitetable") assert( sqlContext.tables("default").filter("tableName = 'listtablessuitetable'").collect().toSeq == - Row("listtablessuitetable", true) :: Nil) + Row("", "listtablessuitetable", true) :: Nil) assert( sqlContext.sql("show TABLES in default").filter("tableName = 'listtablessuitetable'") - .collect().toSeq == Row("listtablessuitetable", true) :: Nil) + .collect().toSeq == Row("", "listtablessuitetable", true) :: Nil) sqlContext.sessionState.catalog.dropTable( TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false) @@ -122,7 +122,8 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext { df.createOrReplaceTempView("listtablessuitetable") val expectedSchema = StructType( - StructField("tableName", StringType, false) :: + StructField("database", StringType, false) :: + StructField("tableName", StringType, false) :: StructField("isTemporary", BooleanType, false) :: Nil) Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala index 1346d81445fe..6e7c77a4f8c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -28,7 +28,7 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { override protected def beforeAll(): Unit = { super.beforeAll() - globalTempDB = spark.sharedState.globalTempDB + globalTempDB = spark.sharedState.globalTempViewManager.database } private var globalTempDB: String = _ @@ -104,4 +104,20 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { spark.catalog.dropGlobalTempView("v2") } } + + test("should lookup global temp view if and only if global temp db is specified") { + try { + sql("CREATE GLOBAL TEMP VIEW same_name AS SELECT 3, 4") + sql("CREATE TEMP VIEW same_name AS SELECT 1, 2") + + checkAnswer(sql("SELECT * FROM same_name"), Row(1, 2)) + + // we never lookup global temp views if database is not specified in table name + spark.catalog.dropTempView("same_name") + intercept[NoSuchTableException](sql("SELECT * FROM same_name")) + + // Use qualified name to lookup a global temp view. + checkAnswer(sql(s"SELECT * FROM $globalTempDB.same_name"), Row(3, 4)) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b5499f2884c6..5489bc61a4ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -969,17 +969,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { """.stripMargin) checkAnswer( sql("SHOW TABLES IN default 'show1*'"), - Row("show1a", true) :: Nil) + Row("", "show1a", true) :: Nil) checkAnswer( sql("SHOW TABLES IN default 'show1*|show2*'"), - Row("show1a", true) :: - Row("show2b", true) :: Nil) + Row("", "show1a", true) :: + Row("", "show2b", true) :: Nil) checkAnswer( sql("SHOW TABLES 'show1*|show2*'"), - Row("show1a", true) :: - Row("show2b", true) :: Nil) + Row("", "show1a", true) :: + Row("", "show2b", true) :: Nil) assert( sql("SHOW TABLES").count() >= 2) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index ad0b319c2b98..85ecf0ce7075 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -41,8 +41,7 @@ import org.apache.spark.util.Utils private[sql] class HiveSessionCatalog( externalCatalog: HiveExternalCatalog, - globalTempDB: String, - globalTempViews: GlobalTempViewManager, + globalTempViewManager: GlobalTempViewManager, sparkSession: SparkSession, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, @@ -50,8 +49,7 @@ private[sql] class HiveSessionCatalog( hadoopConf: Configuration) extends SessionCatalog( externalCatalog, - globalTempDB, - globalTempViews, + globalTempViewManager, functionResourceLoader, functionRegistry, conf, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 0e41e4db58da..6d4fe1a941a9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -45,8 +45,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override lazy val catalog = { new HiveSessionCatalog( sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], - sparkSession.sharedState.globalTempDB, - sparkSession.sharedState.globalTempViews, + sparkSession.sharedState.globalTempViewManager, sparkSession, functionResourceLoader, functionRegistry, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 7b09ca45b0a1..c927e5d802c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -920,7 +920,7 @@ class HiveDDLSuite client.runSqlHive( s"CREATE INDEX $indexName ON TABLE $tabName (a) AS 'COMPACT' WITH DEFERRED REBUILD") val indexTabName = - spark.sessionState.catalog.listTables("default", s"*$indexName*").head._1.table + spark.sessionState.catalog.listTables("default", s"*$indexName*").head.table intercept[TableAlreadyExistsException] { sql(s"CREATE TABLE $indexTabName(b int)") } From 273253177c3c46c0ac3932eea62fa343551ead65 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 27 Sep 2016 13:26:27 +0800 Subject: [PATCH 03/13] fix test --- project/MimaExcludes.scala | 4 +++- .../sql/catalyst/analysis/Analyzer.scala | 10 ++++---- .../sql/catalyst/catalog/SessionCatalog.scala | 3 +-- .../spark/sql/execution/command/ddl.scala | 24 ++++++++++--------- .../sql/execution/GlobalTempViewSuite.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 2 +- 6 files changed, 24 insertions(+), 21 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 8024fbd21bbf..0568d9519d88 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -46,7 +46,9 @@ object MimaExcludes { // [SPARK-16967] Move Mesos to Module ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkMasterRegex.MESOS_REGEX"), // [SPARK-16240] ML persistence backward compatibility for LDA - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$") + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$"), + // [SPARK-17338][SQL] add global temp view + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView") ) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ae8869ff25f2..536d38777f89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -458,12 +458,12 @@ class Analyzer( i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) case u: UnresolvedRelation => val table = u.tableIdentifier - if (table.database.isDefined && conf.runSQLonFile && + if (table.database.isDefined && conf.runSQLonFile && !catalog.isTemporaryTable(table) && (!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) { - // If the table does not exist, and the database part is specified, and we support - // running SQL directly on files, then let's just return the original UnresolvedRelation. - // It is possible we are matching a query like "select * from parquet.`/path/to/query`". - // The plan will get resolved later. + // If the database part is specified, and we support running SQL directly on files, and + // it's not a temporary view, and the table does not exist, then let's just return the + // original UnresolvedRelation. It is possible we are matching a query like "select * + // from parquet.`/path/to/query`". The plan will get resolved later. // Note that we are testing (!db_exists || !table_exists) because the catalog throws // an exception from tableExists if the database does not exist. u diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index a2eb434df417..4fa941a47713 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -181,7 +181,7 @@ class SessionCatalog( def databaseExists(db: String): Boolean = { val dbName = formatDatabaseName(db) - dbName == globalTempViewManager.database || externalCatalog.databaseExists(dbName) + externalCatalog.databaseExists(dbName) } def listDatabases(): Seq[String] = { @@ -288,7 +288,6 @@ class SessionCatalog( */ def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) - if (db == globalTempViewManager.database) return None val table = formatTableName(name.table) requireDbExists(db) externalCatalog.getTableOption(db, table) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 01ac89868d10..2b05cfb3856c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -183,17 +183,19 @@ case class DropTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view - // issue an exception. - catalog.getTableMetadataOption(tableName).map(_.tableType match { - case CatalogTableType.VIEW if !isView => - throw new AnalysisException( - "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") - case o if o != CatalogTableType.VIEW && isView => - throw new AnalysisException( - s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") - case _ => - }) + if (tableName.database.exists(catalog.databaseExists) && catalog.tableExists(tableName)) { + // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view + // issue an exception. + catalog.getTableMetadata(tableName) match { + case CatalogTableType.VIEW if !isView => + throw new AnalysisException( + "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") + case o if o != CatalogTableType.VIEW && isView => + throw new AnalysisException( + s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead") + case _ => + } + } try { sparkSession.sharedState.cacheManager.uncacheQuery( sparkSession.table(tableName.quotedString)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala index 6e7c77a4f8c0..cca697d4ef6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -114,7 +114,7 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { // we never lookup global temp views if database is not specified in table name spark.catalog.dropTempView("same_name") - intercept[NoSuchTableException](sql("SELECT * FROM same_name")) + intercept[AnalysisException](sql("SELECT * FROM same_name")) // Use qualified name to lookup a global temp view. checkAnswer(sql(s"SELECT * FROM $globalTempDB.same_name"), Row(3, 4)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 8ae6868c9848..51670649ad1d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -984,7 +984,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv checkAnswer( spark.sql("show TABLES in testdb8156").filter("tableName = 'ttt3'"), - Row("ttt3", false)) + Row("testdb8156", "ttt3", false)) spark.sql("""use default""") spark.sql("""drop database if exists testdb8156 CASCADE""") } From 754d75be19afddb11a434fafd7cc8b605f607537 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 27 Sep 2016 19:59:37 +0800 Subject: [PATCH 04/13] fixed --- .../spark/sql/internal/SharedState.scala | 73 ++++++++++--------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 7e055d29a72e..c1f1992d8201 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -37,17 +37,28 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} */ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { - val globalTempViewManager = { - // System preserved database should not exists in metastore. However it's hard to guarantee it - // for every session, because case-sensitivity differs. Here we always lowercase it to make our - // life easier. - val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase - if (externalCatalog.databaseExists(globalTempDB)) { - throw new SparkException( - s"$globalTempDB is a system preserved database, please rename your existing database " + - "to resolve the name conflict and launch your Spark application again.") + { + // Set the Hive metastore warehouse path to the one we use + val tempConf = new SQLConf + sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) } + val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") + if (hiveWarehouseDir != null && !tempConf.contains(SQLConf.WAREHOUSE_PATH.key)) { + // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, + // we will respect the value of hive.metastore.warehouse.dir. + tempConf.setConfString(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir) + sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir) + logInfo(s"${SQLConf.WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + + s"is set. Setting ${SQLConf.WAREHOUSE_PATH.key} to the value of " + + s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") + } else { + // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using + // the value of spark.sql.warehouse.dir. + // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, + // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. + sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath) } - new GlobalTempViewManager(globalTempDB) + + logInfo(s"Warehouse path is '${tempConf.warehousePath}'.") } /** @@ -70,42 +81,34 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { /** * A catalog that interacts with external systems. */ - lazy val externalCatalog: ExternalCatalog = + val externalCatalog: ExternalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( SharedState.externalCatalogClassName(sparkContext.conf), sparkContext.conf, sparkContext.hadoopConfiguration) + /** + * A manager for global temporary views. + */ + val globalTempViewManager = { + // System preserved database should not exists in metastore. However it's hard to guarantee it + // for every session, because case-sensitivity differs. Here we always lowercase it to make our + // life easier. + val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase + if (externalCatalog.databaseExists(globalTempDB)) { + throw new SparkException( + s"$globalTempDB is a system preserved database, please rename your existing database " + + "to resolve the name conflict and launch your Spark application again.") + } + new GlobalTempViewManager(globalTempDB) + } + /** * A classloader used to load all user-added jar. */ val jarClassLoader = new NonClosableMutableURLClassLoader( org.apache.spark.util.Utils.getContextOrSparkClassLoader) - { - // Set the Hive metastore warehouse path to the one we use - val tempConf = new SQLConf - sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) } - val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") - if (hiveWarehouseDir != null && !tempConf.contains(SQLConf.WAREHOUSE_PATH.key)) { - // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, - // we will respect the value of hive.metastore.warehouse.dir. - tempConf.setConfString(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir) - sparkContext.conf.set(SQLConf.WAREHOUSE_PATH.key, hiveWarehouseDir) - logInfo(s"${SQLConf.WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + - s"is set. Setting ${SQLConf.WAREHOUSE_PATH.key} to the value of " + - s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") - } else { - // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using - // the value of spark.sql.warehouse.dir. - // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, - // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - sparkContext.conf.set("hive.metastore.warehouse.dir", tempConf.warehousePath) - } - - logInfo(s"Warehouse path is '${tempConf.warehousePath}'.") - } - /** * Create a SQLListener then add it into SparkContext, and create a SQLTab if there is SparkUI. */ From 6183400275837f51a84b6259e7aa5e011d77c2ea Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 28 Sep 2016 00:07:00 +0800 Subject: [PATCH 05/13] fix hive tests --- .../apache/spark/sql/execution/QueryExecution.scala | 8 +++++--- .../org/apache/spark/sql/execution/command/ddl.scala | 4 ++-- .../spark/sql/hive/HiveContextCompatibilitySuite.scala | 4 ++-- .../org/apache/spark/sql/hive/ListTablesSuite.scala | 8 ++++---- .../spark/sql/hive/execution/HiveCommandSuite.scala | 10 +++++----- 5 files changed, 18 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 383b3a233fc2..cb45a6d78b9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -21,15 +21,14 @@ import java.nio.charset.StandardCharsets import java.sql.Timestamp import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec} +import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} import org.apache.spark.util.Utils @@ -125,6 +124,9 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { .mkString("\t") } } + // SHOW TABLES in Hive only output table names, while ours outputs database, table name, isTemp. + case command: ExecutedCommandExec if command.cmd.isInstanceOf[ShowTablesCommand] => + command.executeCollect().map(_.getString(1)) case command: ExecutedCommandExec => command.executeCollect().map(_.getString(0)) case other => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 2b05cfb3856c..7fa2e9e943d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -183,10 +183,10 @@ case class DropTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (tableName.database.exists(catalog.databaseExists) && catalog.tableExists(tableName)) { + if (tableName.database.forall(catalog.databaseExists) && catalog.tableExists(tableName)) { // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view // issue an exception. - catalog.getTableMetadata(tableName) match { + catalog.getTableMetadata(tableName).tableType match { case CatalogTableType.VIEW if !isView => throw new AnalysisException( "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala index 57363b7259c6..939fd71b4f1e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala @@ -87,11 +87,11 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac assert( hc.sql("SELECT * FROM moo_table order by name").collect().toSeq == df.collect().toSeq.sortBy(_.getString(0))) - val tables = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) + val tables = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0)) assert(tables.toSet == Set("moo_table", "mee_table")) hc.sql("DROP TABLE moo_table") hc.sql("DROP TABLE mee_table") - val tables2 = hc.sql("SHOW TABLES IN mee_db").collect().map(_.getString(0)) + val tables2 = hc.sql("SHOW TABLES IN mee_db").select("tableName").collect().map(_.getString(0)) assert(tables2.isEmpty) hc.sql("USE default") hc.sql("DROP DATABASE mee_db CASCADE") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala index 6eeb67510c73..15ba61646d03 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala @@ -58,10 +58,10 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft // We are using default DB. checkAnswer( allTables.filter("tableName = 'listtablessuitetable'"), - Row("listtablessuitetable", true)) + Row("", "listtablessuitetable", true)) checkAnswer( allTables.filter("tableName = 'hivelisttablessuitetable'"), - Row("hivelisttablessuitetable", false)) + Row("default", "hivelisttablessuitetable", false)) assert(allTables.filter("tableName = 'hiveindblisttablessuitetable'").count() === 0) } } @@ -71,11 +71,11 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft case allTables => checkAnswer( allTables.filter("tableName = 'listtablessuitetable'"), - Row("listtablessuitetable", true)) + Row("", "listtablessuitetable", true)) assert(allTables.filter("tableName = 'hivelisttablessuitetable'").count() === 0) checkAnswer( allTables.filter("tableName = 'hiveindblisttablessuitetable'"), - Row("hiveindblisttablessuitetable", false)) + Row("listtablessuitedb", "hiveindblisttablessuitetable", false)) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index b2103b3bfc36..2c772ce2155e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -94,15 +94,15 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("CREATE TABLE show2b(c2 int)") checkAnswer( sql("SHOW TABLES IN default 'show1*'"), - Row("show1a", false) :: Nil) + Row("default", "show1a", false) :: Nil) checkAnswer( sql("SHOW TABLES IN default 'show1*|show2*'"), - Row("show1a", false) :: - Row("show2b", false) :: Nil) + Row("default", "show1a", false) :: + Row("default", "show2b", false) :: Nil) checkAnswer( sql("SHOW TABLES 'show1*|show2*'"), - Row("show1a", false) :: - Row("show2b", false) :: Nil) + Row("default", "show1a", false) :: + Row("default", "show2b", false) :: Nil) assert( sql("SHOW TABLES").count() >= 2) assert( From a683fba7fdff0dd101be231a84a88fb5b8f3aef4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 28 Sep 2016 13:25:13 +0800 Subject: [PATCH 06/13] fix init order --- .../apache/spark/sql/internal/SharedState.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index c1f1992d8201..105d7c59c922 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -37,7 +37,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} */ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { + // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on + // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. { + val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") + if (configFile != null) { + sparkContext.hadoopConfiguration.addResource(configFile) + } + // Set the Hive metastore warehouse path to the one we use val tempConf = new SQLConf sparkContext.conf.getAll.foreach { case (k, v) => tempConf.setConfString(k, v) } @@ -71,13 +78,6 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ val listener: SQLListener = createListenerAndUI(sparkContext) - { - val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") - if (configFile != null) { - sparkContext.hadoopConfiguration.addResource(configFile) - } - } - /** * A catalog that interacts with external systems. */ From 0a70679eb666dc3dd854d7583517743c5e859fbf Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 28 Sep 2016 16:35:19 +0800 Subject: [PATCH 07/13] fix python --- python/pyspark/sql/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 7482be8bda5c..8264dcf8a97d 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -386,7 +386,7 @@ def tables(self, dbName=None): >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.tables() >>> df2.filter("tableName = 'table1'").first() - Row(tableName=u'table1', isTemporary=True) + Row(database=u'', tableName=u'table1', isTemporary=True) """ if dbName is None: return DataFrame(self._ssql_ctx.tables(), self) From 88e684e4e69b9acfc3e9c395ec5ffdc4dfdf1610 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 29 Sep 2016 11:05:34 +0800 Subject: [PATCH 08/13] improve tests --- .../sql/execution/GlobalTempViewSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala index cca697d4ef6a..a2d261621123 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -77,6 +77,20 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { assert(e2.message.contains("system preserved database")) } + test("CREATE GLOBAL TEMP VIEW USING") { + withTempPath { path => + try { + Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath) + sql(s"CREATE GLOBAL TEMP VIEW src USING parquet OPTIONS (PATH '${path.getAbsolutePath}')") + checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a")) + sql(s"INSERT INTO $globalTempDB.src SELECT 2, 'b'") + checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a") :: Row(2, "b") :: Nil) + } finally { + spark.catalog.dropGlobalTempView("src") + } + } + } + test("CREATE TABLE LIKE should work for global temp view") { try { sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1 AS a, '2' AS b") @@ -118,6 +132,9 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { // Use qualified name to lookup a global temp view. checkAnswer(sql(s"SELECT * FROM $globalTempDB.same_name"), Row(3, 4)) + } finally { + spark.catalog.dropTempView("same_name") + spark.catalog.dropGlobalTempView("same_name") } } } From 952f13cd92e5e448622ad80377845d8448399b86 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 30 Sep 2016 09:43:02 +0800 Subject: [PATCH 09/13] fix comment --- .../scala/org/apache/spark/sql/execution/command/tables.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 6c0c787cf35d..424ef58d76c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -579,7 +579,7 @@ case class ShowTablesCommand( databaseName: Option[String], tableIdentifierPattern: Option[String]) extends RunnableCommand { - // The result of SHOW TABLES has two columns, tableName and isTemporary. + // The result of SHOW TABLES has three columns: database, tableName and isTemporary. override val output: Seq[Attribute] = { AttributeReference("database", StringType, nullable = false)() :: AttributeReference("tableName", StringType, nullable = false)() :: From 39f5995d608e69d9be555217caa54a1f049941f4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 30 Sep 2016 20:28:29 +0800 Subject: [PATCH 10/13] add python API and document --- docs/sql-programming-guide.md | 44 ++++++++++++++++--- .../examples/sql/JavaSparkSQLExample.java | 30 ++++++++++++- examples/src/main/python/sql/basic.py | 25 +++++++++++ .../spark/examples/sql/SparkSQLExample.scala | 25 +++++++++++ python/pyspark/sql/catalog.py | 18 +++++++- python/pyspark/sql/dataframe.py | 25 ++++++++++- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../spark/sql/internal/CatalogImpl.scala | 24 ++++------ .../sql/execution/GlobalTempViewSuite.scala | 28 ++++++++++++ 9 files changed, 195 insertions(+), 26 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 71bdd19c16db..fa02d8c058f3 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -220,6 +220,40 @@ The `sql` function enables applications to run SQL queries programmatically and +## Global Temporary View + +Temporay views in Spark SQL are session-scoped and will disappear if the session that creates it +terminates. If you want to have a temporary view that is shared among all sessions and keep alive +until the Spark application terminiates, you can create a global temporary view. Global temporary +view is tied to a system preserved database `global_temp`, and we must use the qualified name to +refer it, e.g. `SELECT * FROM global_temp.view1`. + +
+
+{% include_example global_temp_view scala/org/apache/spark/examples/sql/SparkSQLExample.scala %} +
+ +
+{% include_example global_temp_view java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %} +
+ +
+{% include_example global_temp_view python/sql/basic.py %} +
+ +
+ +{% highlight sql %} + +CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl + +SELECT * FROM global_temp.temp_view + +{% endhighlight %} + +
+ + ## Creating Datasets Datasets are similar to RDDs, however, instead of using Java serialization or Kryo they use @@ -1058,14 +1092,14 @@ the Data Sources API. The following options are supported: The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). - + truncate - This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g. indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. + This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g. indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. - + createTableOptions @@ -1101,11 +1135,11 @@ USING org.apache.spark.sql.jdbc OPTIONS ( url "jdbc:postgresql:dbserver", dbtable "schema.tablename", - user 'username', + user 'username', password 'password' ) -INSERT INTO TABLE jdbcTable +INSERT INTO TABLE jdbcTable SELECT * FROM resultTable {% endhighlight %} diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java index cff9032f52b5..c5770d147a6b 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java @@ -54,6 +54,7 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; // $example off:programmatic_schema$ +import org.apache.spark.sql.AnalysisException; // $example on:untyped_ops$ // col("...") is preferable to df.col("...") @@ -84,7 +85,7 @@ public void setAge(int age) { } // $example off:create_ds$ - public static void main(String[] args) { + public static void main(String[] args) throws AnalysisException { // $example on:init_session$ SparkSession spark = SparkSession .builder() @@ -101,7 +102,7 @@ public static void main(String[] args) { spark.stop(); } - private static void runBasicDataFrameExample(SparkSession spark) { + private static void runBasicDataFrameExample(SparkSession spark) throws AnalysisException { // $example on:create_df$ Dataset df = spark.read().json("examples/src/main/resources/people.json"); @@ -176,6 +177,31 @@ private static void runBasicDataFrameExample(SparkSession spark) { // | 19| Justin| // +----+-------+ // $example off:run_sql$ + + // $example on:global_temp_view$ + // Register the DataFrame as a global temporary view + df.createGlobalTempView("people"); + + // Global temporary view is tied to a system preserved database `global_temp` + spark.sql("SELECT * FROM global_temp.people").show(); + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + + // Global temporary view is cross-session + spark.newSession().sql("SELECT * FROM global_temp.people").show(); + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + // $example off:global_temp_view$ } private static void runDatasetCreationExample(SparkSession spark) { diff --git a/examples/src/main/python/sql/basic.py b/examples/src/main/python/sql/basic.py index fdc017aed97c..ebcf66995b47 100644 --- a/examples/src/main/python/sql/basic.py +++ b/examples/src/main/python/sql/basic.py @@ -114,6 +114,31 @@ def basic_df_example(spark): # +----+-------+ # $example off:run_sql$ + # $example on:global_temp_view$ + # Register the DataFrame as a global temporary view + df.createGlobalTempView("people") + + # Global temporary view is tied to a system preserved database `global_temp` + spark.sql("SELECT * FROM global_temp.people").show() + # +----+-------+ + # | age| name| + # +----+-------+ + # |null|Michael| + # | 30| Andy| + # | 19| Justin| + # +----+-------+ + + # Global temporary view is cross-session + spark.newSession().sql("SELECT * FROM global_temp.people").show() + # +----+-------+ + # | age| name| + # +----+-------+ + # |null|Michael| + # | 30| Andy| + # | 19| Justin| + # +----+-------+ + # $example off:global_temp_view$ + def schema_inference_example(spark): # $example on:schema_inferring$ diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala index 129b81d5fbbf..f27c403c5b38 100644 --- a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala @@ -135,6 +135,31 @@ object SparkSQLExample { // | 19| Justin| // +----+-------+ // $example off:run_sql$ + + // $example on:global_temp_view$ + // Register the DataFrame as a global temporary view + df.createGlobalTempView("people") + + // Global temporary view is tied to a system preserved database `global_temp` + spark.sql("SELECT * FROM global_temp.people").show() + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + + // Global temporary view is cross-session + spark.newSession().sql("SELECT * FROM global_temp.people").show() + // +----+-------+ + // | age| name| + // +----+-------+ + // |null|Michael| + // | 30| Andy| + // | 19| Justin| + // +----+-------+ + // $example off:global_temp_view$ } private def runDatasetCreationExample(spark: SparkSession): Unit = { diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 3c5030722f30..df3bf4254d4d 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -167,7 +167,7 @@ def createExternalTable(self, tableName, path=None, source=None, schema=None, ** @since(2.0) def dropTempView(self, viewName): - """Drops the temporary view with the given view name in the catalog. + """Drops the local temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. >>> spark.createDataFrame([(1, 1)]).createTempView("my_table") @@ -181,6 +181,22 @@ def dropTempView(self, viewName): """ self._jcatalog.dropTempView(viewName) + @since(2.1) + def dropGlobalTempView(self, viewName): + """Drops the global temporary view with the given view name in the catalog. + If the view has been cached before, then it will also be uncached. + + >>> spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table") + >>> spark.table("global_temp.my_table").collect() + [Row(_1=1, _2=1)] + >>> spark.catalog.dropGlobalTempView("my_table") + >>> spark.table("global_temp.my_table") # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + AnalysisException: ... + """ + self._jcatalog.dropGlobalTempView(viewName) + @ignore_unicode_prefix @since(2.0) def registerFunction(self, name, f, returnType=StringType()): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 0ac481a8a8b5..14e80ea4615e 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -131,7 +131,7 @@ def registerTempTable(self, name): @since(2.0) def createTempView(self, name): - """Creates a temporary view with this DataFrame. + """Creates a local temporary view with this DataFrame. The lifetime of this temporary table is tied to the :class:`SparkSession` that was used to create this :class:`DataFrame`. @@ -153,7 +153,7 @@ def createTempView(self, name): @since(2.0) def createOrReplaceTempView(self, name): - """Creates or replaces a temporary view with this DataFrame. + """Creates or replaces a local temporary view with this DataFrame. The lifetime of this temporary table is tied to the :class:`SparkSession` that was used to create this :class:`DataFrame`. @@ -169,6 +169,27 @@ def createOrReplaceTempView(self, name): """ self._jdf.createOrReplaceTempView(name) + @since(2.1) + def createGlobalTempView(self, name): + """Creates a global temporary view with this DataFrame. + + The lifetime of this temporary view is tied to this Spark application. + throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the + catalog. + + >>> df.createGlobalTempView("people") + >>> df2 = spark.sql("select * from global_temp.people") + >>> sorted(df.collect()) == sorted(df2.collect()) + True + >>> df.createGlobalTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + AnalysisException: u"Temporary table 'people' already exists;" + >>> spark.catalog.dropGlobalTempView("people") + + """ + self._jdf.createGlobalTempView(name) + @property @since(1.4) def write(self): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4fa941a47713..1f670f805167 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -441,7 +441,7 @@ class SessionCatalog( } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) { globalTempViewManager.get(table).map { plan => CatalogTable( - identifier = TableIdentifier(table), + identifier = TableIdentifier(table, Some(globalTempViewManager.database)), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, schema = plan.output.toStructType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 75d822190b86..2716d37a0654 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -96,19 +96,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { @throws[AnalysisException]("database does not exist") override def listTables(dbName: String): Dataset[Table] = { val tables = sessionCatalog.listTables(dbName).map { tableIdent => - val isTemp = sessionCatalog.isTemporaryTable(tableIdent) - makeTable(tableIdent, isTemp) + makeTable(tableIdent) } CatalogImpl.makeDataset(tables, sparkSession) } - private def makeTable(tableIdent: TableIdentifier, isTemp: Boolean): Table = { - val metadata = if (isTemp) None else Some(sessionCatalog.getTableMetadata(tableIdent)) + private def makeTable(tableIdent: TableIdentifier): Table = { + val isTemp = sessionCatalog.isTemporaryTable(tableIdent) + val tableMeta = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent) new Table( - name = tableIdent.identifier, - database = metadata.flatMap(_.identifier.database).orNull, - description = metadata.flatMap(_.comment).orNull, - tableType = metadata.map(_.tableType.name).getOrElse("TEMPORARY"), + name = tableMeta.identifier.table, + database = tableMeta.identifier.database.orNull, + description = tableMeta.comment.orNull, + tableType = if (isTemp) "TEMPORARY" else tableMeta.tableType.name, isTemporary = isTemp) } @@ -202,13 +202,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * [[AnalysisException]] when no [[Table]] can be found. */ override def findTable(dbName: String, tableName: String): Table = { - val tableIdent = TableIdentifier(tableName, Option(dbName)) - val isTemporary = sessionCatalog.isTemporaryTable(tableIdent) - if (isTemporary || sessionCatalog.tableExists(tableIdent)) { - makeTable(tableIdent, isTemporary) - } else { - throw new AnalysisException(s"The specified table $tableIdent does not exist.") - } + makeTable(TableIdentifier(tableName, Option(dbName))) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala index a2d261621123..13dafb7774e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalog.Table import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.test.SharedSQLContext @@ -69,6 +70,17 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2")) } + test("global temp view is shared among all sessions") { + try { + sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2") + checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, 2)) + val newSession = spark.newSession() + checkAnswer(newSession.table(s"$globalTempDB.src"), Row(1, 2)) + } finally { + spark.catalog.dropGlobalTempView("src") + } + } + test("global temp view database should be preserved") { val e = intercept[AnalysisException](sql(s"CREATE DATABASE $globalTempDB")) assert(e.message.contains("system preserved database")) @@ -137,4 +149,20 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { spark.catalog.dropGlobalTempView("same_name") } } + + test("public Catalog should recognize global temp view") { + try { + sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2") + + assert(spark.catalog.tableExists(globalTempDB, "src")) + assert(spark.catalog.findTable(globalTempDB, "src").toString == new Table( + name = "src", + database = globalTempDB, + description = null, + tableType = "TEMPORARY", + isTemporary = true).toString) + } finally { + spark.catalog.dropGlobalTempView("src") + } + } } From cbbe122299a690cba7aff6c1a320d366513d42c9 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 3 Oct 2016 23:34:14 +0800 Subject: [PATCH 11/13] fix --- docs/sql-programming-guide.md | 1 + .../org/apache/spark/sql/execution/GlobalTempViewSuite.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index fa02d8c058f3..835cb6981f5b 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -252,6 +252,7 @@ SELECT * FROM global_temp.temp_view {% endhighlight %}
+ ## Creating Datasets diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala index 13dafb7774e3..391bcb8b35d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala @@ -155,7 +155,7 @@ class GlobalTempViewSuite extends QueryTest with SharedSQLContext { sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2") assert(spark.catalog.tableExists(globalTempDB, "src")) - assert(spark.catalog.findTable(globalTempDB, "src").toString == new Table( + assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table( name = "src", database = globalTempDB, description = null, From fb96f1cc5461bf70241e0ed8476ff449a57ea41c Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 9 Oct 2016 17:33:42 +0800 Subject: [PATCH 12/13] address comments from yhuai --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 3 ++- .../scala/org/apache/spark/sql/execution/command/ddl.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 1f670f805167..58b1a1f00734 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -199,7 +199,8 @@ class SessionCatalog( if (dbName == globalTempViewManager.database) { throw new AnalysisException( s"${globalTempViewManager.database} is a system preserved database, " + - "you cannot use it as current database.") + "you cannot use it as current database. To access global temporary views, you should " + + s"use qualified name, e.g. ${globalTempViewManager.database}.viewName.") } requireDbExists(dbName) synchronized { currentDb = dbName } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7fa2e9e943d8..45fa293e5895 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -183,7 +183,8 @@ case class DropTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog - if (tableName.database.forall(catalog.databaseExists) && catalog.tableExists(tableName)) { + + if (!catalog.isTemporaryTable(tableName) && catalog.tableExists(tableName)) { // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view // issue an exception. catalog.getTableMetadata(tableName).tableType match { From 29e292a954f1b07d80d03d0fd6c4ad4605b41ab7 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 10 Oct 2016 10:45:40 +0800 Subject: [PATCH 13/13] improve error message --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 3 ++- .../main/scala/org/apache/spark/sql/internal/SharedState.scala | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 58b1a1f00734..e44e30ec648f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -200,7 +200,8 @@ class SessionCatalog( throw new AnalysisException( s"${globalTempViewManager.database} is a system preserved database, " + "you cannot use it as current database. To access global temporary views, you should " + - s"use qualified name, e.g. ${globalTempViewManager.database}.viewName.") + "use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM " + + s"${globalTempViewManager.database}.viewName.") } requireDbExists(dbName) synchronized { currentDb = dbName } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 105d7c59c922..c555a43cd258 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -98,7 +98,8 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { if (externalCatalog.databaseExists(globalTempDB)) { throw new SparkException( s"$globalTempDB is a system preserved database, please rename your existing database " + - "to resolve the name conflict and launch your Spark application again.") + "to resolve the name conflict, or set a different value for " + + s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.") } new GlobalTempViewManager(globalTempDB) }