Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.spark.sql.catalyst.catalog

import java.lang.reflect.InvocationTargetException
import java.net.URI
import java.util.Locale
import java.util.concurrent.Callable
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.hadoop.conf.Configuration
Expand All @@ -41,7 +39,6 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

Expand All @@ -52,7 +49,7 @@ object SessionCatalog {
/**
* An internal catalog that is used by a Spark Session. This internal catalog serves as a
* proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary
* tables and functions of the Spark Session that it belongs to.
* views and functions of the Spark Session that it belongs to.
*
* This class must be thread-safe.
*/
Expand Down Expand Up @@ -90,13 +87,13 @@ class SessionCatalog(
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
}

/** List of temporary tables, mapping from table name to their logical plan. */
Copy link
Member

@viirya viirya Sep 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code comment at the beginning of SessionCatalog also includes wording ...it also manages temporary tables and ...

/** List of temporary views, mapping from table name to their logical plan. */
@GuardedBy("this")
protected val tempTables = new mutable.HashMap[String, LogicalPlan]
protected val tempViews = new mutable.HashMap[String, LogicalPlan]

// Note: we track current database here because certain operations do not explicitly
// specify the database (e.g. DROP TABLE my_table). In these cases we must first
// check whether the temporary table or function exists, then, if not, operate on
// check whether the temporary view or function exists, then, if not, operate on
// the corresponding item in the current database.
@GuardedBy("this")
protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)
Expand Down Expand Up @@ -272,8 +269,8 @@ class SessionCatalog(
// ----------------------------------------------------------------------------
// Tables
// ----------------------------------------------------------------------------
// There are two kinds of tables, temporary tables and metastore tables.
// Temporary tables are isolated across sessions and do not belong to any
// There are two kinds of tables, temporary views and metastore tables.
// Temporary views are isolated across sessions and do not belong to any
// particular database. Metastore tables can be used across multiple
// sessions as their metadata is persisted in the underlying catalog.
// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -462,10 +459,10 @@ class SessionCatalog(
tableDefinition: LogicalPlan,
overrideIfExists: Boolean): Unit = synchronized {
val table = formatTableName(name)
if (tempTables.contains(table) && !overrideIfExists) {
if (tempViews.contains(table) && !overrideIfExists) {
throw new TempTableAlreadyExistsException(name)
}
tempTables.put(table, tableDefinition)
tempViews.put(table, tableDefinition)
}

/**
Expand All @@ -487,7 +484,7 @@ class SessionCatalog(
viewDefinition: LogicalPlan): Boolean = synchronized {
val viewName = formatTableName(name.table)
if (name.database.isEmpty) {
if (tempTables.contains(viewName)) {
if (tempViews.contains(viewName)) {
createTempView(viewName, viewDefinition, overrideIfExists = true)
true
} else {
Expand All @@ -504,7 +501,7 @@ class SessionCatalog(
* Return a local temporary view exactly as it was stored.
*/
def getTempView(name: String): Option[LogicalPlan] = synchronized {
tempTables.get(formatTableName(name))
tempViews.get(formatTableName(name))
}

/**
Expand All @@ -520,7 +517,7 @@ class SessionCatalog(
* Returns true if this view is dropped successfully, false otherwise.
*/
def dropTempView(name: String): Boolean = synchronized {
tempTables.remove(formatTableName(name)).isDefined
tempViews.remove(formatTableName(name)).isDefined
}

/**
Expand Down Expand Up @@ -572,7 +569,7 @@ class SessionCatalog(
* Rename a table.
*
* If a database is specified in `oldName`, this will rename the table in that database.
* If no database is specified, this will first attempt to rename a temporary table with
* If no database is specified, this will first attempt to rename a temporary view with
* the same name, then, if that does not exist, rename the table in the current database.
*
* This assumes the database specified in `newName` matches the one in `oldName`.
Expand All @@ -592,24 +589,24 @@ class SessionCatalog(
globalTempViewManager.rename(oldTableName, newTableName)
} else {
requireDbExists(db)
if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
if (oldName.database.isDefined || !tempViews.contains(oldTableName)) {
requireTableExists(TableIdentifier(oldTableName, Some(db)))
requireTableNotExists(TableIdentifier(newTableName, Some(db)))
validateName(newTableName)
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
if (newName.database.isDefined) {
throw new AnalysisException(
s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': cannot specify database " +
s"RENAME TEMPORARY VIEW from '$oldName' to '$newName': cannot specify database " +
s"name '${newName.database.get}' in the destination table")
}
if (tempTables.contains(newTableName)) {
throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': " +
if (tempViews.contains(newTableName)) {
throw new AnalysisException(s"RENAME TEMPORARY VIEW from '$oldName' to '$newName': " +
"destination table already exists")
}
val table = tempTables(oldTableName)
tempTables.remove(oldTableName)
tempTables.put(newTableName, table)
val table = tempViews(oldTableName)
tempViews.remove(oldTableName)
tempViews.put(newTableName, table)
}
}
}
Expand All @@ -618,7 +615,7 @@ class SessionCatalog(
* Drop a table.
*
* If a database is specified in `name`, this will drop the table from that database.
* If no database is specified, this will first attempt to drop a temporary table with
* If no database is specified, this will first attempt to drop a temporary view with
* the same name, then, if that does not exist, drop the table from the current database.
*/
def dropTable(
Expand All @@ -633,7 +630,7 @@ class SessionCatalog(
throw new NoSuchTableException(globalTempViewManager.database, table)
}
} else {
if (name.database.isDefined || !tempTables.contains(table)) {
if (name.database.isDefined || !tempViews.contains(table)) {
requireDbExists(db)
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
// Instead, log it as an error message.
Expand All @@ -643,7 +640,7 @@ class SessionCatalog(
throw new NoSuchTableException(db = db, table = table)
}
} else {
tempTables.remove(table)
tempViews.remove(table)
}
}
}
Expand All @@ -652,7 +649,7 @@ class SessionCatalog(
* Return a [[LogicalPlan]] that represents the given table or view.
*
* If a database is specified in `name`, this will return the table/view from that database.
* If no database is specified, this will first attempt to return a temporary table/view with
* If no database is specified, this will first attempt to return a temporary view with
* the same name, then, if that does not exist, return the table/view from the current database.
*
* Note that, the global temp view database is also valid here, this will return the global temp
Expand All @@ -671,7 +668,7 @@ class SessionCatalog(
globalTempViewManager.get(table).map { viewDef =>
SubqueryAlias(table, viewDef)
}.getOrElse(throw new NoSuchTableException(db, table))
} else if (name.database.isDefined || !tempTables.contains(table)) {
} else if (name.database.isDefined || !tempViews.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
if (metadata.tableType == CatalogTableType.VIEW) {
val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text."))
Expand All @@ -687,21 +684,21 @@ class SessionCatalog(
SubqueryAlias(table, UnresolvedCatalogRelation(metadata))
}
} else {
SubqueryAlias(table, tempTables(table))
SubqueryAlias(table, tempViews(table))
}
}
}

/**
* Return whether a table with the specified name is a temporary table.
* Return whether a table with the specified name is a temporary view.
*
* Note: The temporary table cache is checked only when database is not
* Note: The temporary view cache is checked only when database is not
* explicitly specified.
*/
def isTemporaryTable(name: TableIdentifier): Boolean = synchronized {
val table = formatTableName(name.table)
if (name.database.isEmpty) {
tempTables.contains(table)
tempViews.contains(table)
} else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(table).isDefined
} else {
Expand All @@ -710,15 +707,15 @@ class SessionCatalog(
}

/**
* List all tables in the specified database, including local temporary tables.
* List all tables in the specified database, including local temporary views.
*
* Note that, if the specified database is global temporary view database, we will list global
* temporary views.
*/
def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")

/**
* List all matching tables in the specified database, including local temporary tables.
* List all matching tables in the specified database, including local temporary views.
*
* Note that, if the specified database is global temporary view database, we will list global
* temporary views.
Expand All @@ -736,7 +733,7 @@ class SessionCatalog(
}
}
val localTempViews = synchronized {
StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name =>
StringUtils.filterPattern(tempViews.keys.toSeq, pattern).map { name =>
TableIdentifier(name)
}
}
Expand All @@ -750,11 +747,11 @@ class SessionCatalog(
val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
val tableName = formatTableName(name.table)

// Go through temporary tables and invalidate them.
// Go through temporary views and invalidate them.
// If the database is defined, this may be a global temporary view.
// If the database is not defined, there is a good chance this is a temp table.
// If the database is not defined, there is a good chance this is a temp view.
if (name.database.isEmpty) {
tempTables.get(tableName).foreach(_.refresh())
tempViews.get(tableName).foreach(_.refresh())
} else if (dbName == globalTempViewManager.database) {
globalTempViewManager.get(tableName).foreach(_.refresh())
}
Expand All @@ -765,11 +762,11 @@ class SessionCatalog(
}

/**
* Drop all existing temporary tables.
* Drop all existing temporary views.
* For testing only.
*/
def clearTempTables(): Unit = synchronized {
tempTables.clear()
tempViews.clear()
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -1337,7 +1334,7 @@ class SessionCatalog(
*/
private[sql] def copyStateTo(target: SessionCatalog): Unit = synchronized {
target.currentDb = currentDb
// copy over temporary tables
tempTables.foreach(kv => target.tempTables.put(kv._1, kv._2))
// copy over temporary views
tempViews.foreach(kv => target.tempViews.put(kv._1, kv._2))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
checkAnswer(spark.table("teachers"), df)
}

test("rename temporary table - destination table with database name") {
test("rename temporary view - destination table with database name") {
withTempView("tab1") {
sql(
"""
Expand All @@ -812,15 +812,15 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql("ALTER TABLE tab1 RENAME TO default.tab2")
}
assert(e.getMessage.contains(
"RENAME TEMPORARY TABLE from '`tab1`' to '`default`.`tab2`': " +
"RENAME TEMPORARY VIEW from '`tab1`' to '`default`.`tab2`': " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can rename this test name from rename temporary table... to ``rename temporary view...`.

"cannot specify database name 'default' in the destination table"))

val catalog = spark.sessionState.catalog
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At L823 there is a test named rename temporary table too.

assert(catalog.listTables("default") == Seq(TableIdentifier("tab1")))
}
}

test("rename temporary table") {
test("rename temporary view") {
withTempView("tab1", "tab2") {
spark.range(10).createOrReplaceTempView("tab1")
sql("ALTER TABLE tab1 RENAME TO tab2")
Expand All @@ -832,7 +832,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
}

test("rename temporary table - destination table already exists") {
test("rename temporary view - destination table already exists") {
withTempView("tab1", "tab2") {
sql(
"""
Expand Down Expand Up @@ -860,7 +860,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql("ALTER TABLE tab1 RENAME TO tab2")
}
assert(e.getMessage.contains(
"RENAME TEMPORARY TABLE from '`tab1`' to '`tab2`': destination table already exists"))
"RENAME TEMPORARY VIEW from '`tab1`' to '`tab2`': destination table already exists"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. This test name is also rename temporary table...


val catalog = spark.sessionState.catalog
assert(catalog.listTables("default") == Seq(TableIdentifier("tab1"), TableIdentifier("tab2")))
Expand Down