Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1536,10 +1536,10 @@ def test_list_databases(self):
spark = self.spark
spark.catalog._reset()
databases = [db.name for db in spark.catalog.listDatabases()]
self.assertEquals(databases, ["default"])
self.assertEquals(databases, ["default", "information_schema"])
spark.sql("CREATE DATABASE some_db")
databases = [db.name for db in spark.catalog.listDatabases()]
self.assertEquals(sorted(databases), ["default", "some_db"])
self.assertEquals(sorted(databases), ["default", "information_schema", "some_db"])

def test_list_tables(self):
from pyspark.sql.catalog import Table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils

object SessionCatalog {
val DEFAULT_DATABASE = "default"
val INFORMATION_SCHEMA_DATABASE = "information_schema"
}

/**
Expand Down Expand Up @@ -153,6 +154,8 @@ class SessionCatalog(
val dbName = formatDatabaseName(db)
if (dbName == DEFAULT_DATABASE) {
throw new AnalysisException(s"Can not drop default database")
} else if (dbName == INFORMATION_SCHEMA_DATABASE) {
throw new AnalysisException(s"Can not drop system database `$INFORMATION_SCHEMA_DATABASE`")
} else if (dbName == getCurrentDatabase) {
throw new AnalysisException(s"Can not drop current database `${dbName}`")
}
Expand All @@ -173,7 +176,11 @@ class SessionCatalog(

def databaseExists(db: String): Boolean = {
val dbName = formatDatabaseName(db)
externalCatalog.databaseExists(dbName)
if (db == INFORMATION_SCHEMA_DATABASE) {
true
} else {
externalCatalog.databaseExists(dbName)
}
}

def listDatabases(): Seq[String] = {
Expand Down Expand Up @@ -253,9 +260,21 @@ class SessionCatalog(
def getTableMetadata(name: TableIdentifier): CatalogTable = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
val normalizedName = normalizeTableIdentifier(name)
if (isTemporaryTable(normalizedName)) {
val tid = TableIdentifier(table)
CatalogTable(
identifier = tid,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = tempTables(normalizedName.table).output.toStructType,
properties = Map(),
viewText = None)
} else {
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}
}

/**
Expand Down Expand Up @@ -434,10 +453,11 @@ class SessionCatalog(
*/
def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = {
synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
val normalizedName = normalizeTableIdentifier(name)
val db = formatDatabaseName(normalizedName.database.getOrElse(currentDb))
val table = formatTableName(normalizedName.table)
val relationAlias = alias.getOrElse(table)
if (name.database.isDefined || !tempTables.contains(table)) {
if (normalizedName.database.isDefined || !tempTables.contains(table)) {
val metadata = externalCatalog.getTable(db, table)
val view = Option(metadata.tableType).collect {
case CatalogTableType.VIEW => name
Expand All @@ -460,7 +480,7 @@ class SessionCatalog(
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (isTemporaryTable(name)) {
if (isTemporaryTable(normalizeTableIdentifier(name))) {
true
} else {
externalCatalog.tableExists(db, table)
Expand All @@ -477,6 +497,33 @@ class SessionCatalog(
name.database.isEmpty && tempTables.contains(formatTableName(name.table))
}

/**
* Normalize TableIdentifier by consistently ensuring the following two rules.
* 1. System temporary views should have None as database.
* 2. System temporary views should have prefixed table names.
* Currently, only INFORMATION_SCHEMA has temporary views.
*/
protected def normalizeTableIdentifier(name: TableIdentifier): TableIdentifier = synchronized {
if (name.database.isDefined) {
if (name.database.contains(INFORMATION_SCHEMA_DATABASE)) {
TableIdentifier(s"$INFORMATION_SCHEMA_DATABASE.${name.table}", None)
} else {
name
}
} else {
val tableName = formatTableName(name.table)
if (tableName.startsWith(INFORMATION_SCHEMA_DATABASE + ".")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain which cases will enter this processing? Is that possible we could hit backtick-quoted INFORMATION_SCHEMA_DATABASE here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backtick-quoted one will not reach here.

scala> sql("create table `aaa.bbb`(a int)")
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.ql.metadata.HiveException: [aaa.bbb]: is not a valid table name;

TableIdentifier(tableName, None)
} else if (currentDb == INFORMATION_SCHEMA_DATABASE) {
TableIdentifier(s"$INFORMATION_SCHEMA_DATABASE.$tableName", None)
} else if (tempTables.contains(tableName)) {
TableIdentifier(tableName, None)
} else {
TableIdentifier(name.table, Some(currentDb))
}
}
}

/**
* List all tables in the specified database, including temporary tables.
*/
Expand All @@ -491,8 +538,11 @@ class SessionCatalog(
val dbTables =
externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) }
synchronized {
val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern)
var _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern)
.map { t => TableIdentifier(t) }
if (db != INFORMATION_SCHEMA_DATABASE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is not clear to me.
What happens if users want to list tables in database INFORMATION_SCHEMA_DATABASE, it will return empty?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the current result.

scala> sql("use information_schema")
res11: org.apache.spark.sql.DataFrame = []

scala> sql("show tables").show(false)
+------------------------------------+-----------+
|tableName                           |isTemporary|
+------------------------------------+-----------+
|information_schema.columns          |true       |
|information_schema.databases        |true       |
|information_schema.schemata         |true       |
|information_schema.session_variables|true       |
|information_schema.tables           |true       |
|information_schema.views            |true       |
+------------------------------------+-----------+

_tempTables = _tempTables.filterNot(_.table.startsWith(INFORMATION_SCHEMA_DATABASE + "."))
}
dbTables ++ _tempTables
}
}
Expand Down Expand Up @@ -907,8 +957,8 @@ class SessionCatalog(
*/
def reset(): Unit = synchronized {
setCurrentDatabase(DEFAULT_DATABASE)
listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db =>
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
listDatabases().filter(x => x != DEFAULT_DATABASE && x != INFORMATION_SCHEMA_DATABASE).foreach {
db => dropDatabase(db, ignoreIfNotExists = false, cascade = true)
}
listTables(DEFAULT_DATABASE).foreach { table =>
dropTable(table, ignoreIfNotExists = false, purge = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.systemcatalog.InformationSchema
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -544,7 +545,11 @@ class SparkSession private(
*
* @since 2.0.0
*/
@transient lazy val catalog: Catalog = new CatalogImpl(self)
@transient lazy val catalog: Catalog = {
val catalog = new CatalogImpl(self)
InformationSchema.registerInformationSchema(self)
catalog
}

/**
* Returns the specified table as a [[DataFrame]].
Expand Down
Loading