From e832f5ba8be965e75107697fcfacc623572eb751 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sat, 9 Jul 2016 05:54:30 -0700 Subject: [PATCH] [SPARK-16452][SQL] Basic INFORMATION_SCHEMA support --- python/pyspark/sql/tests.py | 4 +- .../sql/catalyst/catalog/SessionCatalog.scala | 72 +++- .../org/apache/spark/sql/SparkSession.scala | 7 +- .../systemcatalog/InformationSchema.scala | 312 ++++++++++++++++++ .../sql/execution/command/DDLSuite.scala | 29 +- .../InformationSchemaSuite.scala | 159 +++++++++ .../spark/sql/internal/CatalogSuite.scala | 7 +- .../spark/sql/hive/HiveSessionCatalog.scala | 7 +- 8 files changed, 575 insertions(+), 22 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchemaSuite.scala diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index c2171c277cac..d7acd3c5f986 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1536,10 +1536,10 @@ def test_list_databases(self): spark = self.spark spark.catalog._reset() databases = [db.name for db in spark.catalog.listDatabases()] - self.assertEquals(databases, ["default"]) + self.assertEquals(databases, ["default", "information_schema"]) spark.sql("CREATE DATABASE some_db") databases = [db.name for db in spark.catalog.listDatabases()] - self.assertEquals(sorted(databases), ["default", "some_db"]) + self.assertEquals(sorted(databases), ["default", "information_schema", "some_db"]) def test_list_tables(self): from pyspark.sql.catalog import Table diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index ef29c75c0189..071cd58111b5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils object SessionCatalog { val DEFAULT_DATABASE = "default" + val INFORMATION_SCHEMA_DATABASE = "information_schema" } /** @@ -153,6 +154,8 @@ class SessionCatalog( val dbName = formatDatabaseName(db) if (dbName == DEFAULT_DATABASE) { throw new AnalysisException(s"Can not drop default database") + } else if (dbName == INFORMATION_SCHEMA_DATABASE) { + throw new AnalysisException(s"Can not drop system database `$INFORMATION_SCHEMA_DATABASE`") } else if (dbName == getCurrentDatabase) { throw new AnalysisException(s"Can not drop current database `${dbName}`") } @@ -173,7 +176,11 @@ class SessionCatalog( def databaseExists(db: String): Boolean = { val dbName = formatDatabaseName(db) - externalCatalog.databaseExists(dbName) + if (db == INFORMATION_SCHEMA_DATABASE) { + true + } else { + externalCatalog.databaseExists(dbName) + } } def listDatabases(): Seq[String] = { @@ -253,9 +260,21 @@ class SessionCatalog( def getTableMetadata(name: TableIdentifier): CatalogTable = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) - requireDbExists(db) - requireTableExists(TableIdentifier(table, Some(db))) - externalCatalog.getTable(db, table) + val normalizedName = normalizeTableIdentifier(name) + if (isTemporaryTable(normalizedName)) { + val tid = TableIdentifier(table) + CatalogTable( + identifier = tid, + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = tempTables(normalizedName.table).output.toStructType, + properties = Map(), + viewText = None) + } else { + requireDbExists(db) + requireTableExists(TableIdentifier(table, Some(db))) + externalCatalog.getTable(db, table) + } } /** @@ -434,10 +453,11 @@ class SessionCatalog( */ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { synchronized { - val db = formatDatabaseName(name.database.getOrElse(currentDb)) - val table = formatTableName(name.table) + val normalizedName = normalizeTableIdentifier(name) + val db = formatDatabaseName(normalizedName.database.getOrElse(currentDb)) + val table = formatTableName(normalizedName.table) val relationAlias = alias.getOrElse(table) - if (name.database.isDefined || !tempTables.contains(table)) { + if (normalizedName.database.isDefined || !tempTables.contains(table)) { val metadata = externalCatalog.getTable(db, table) val view = Option(metadata.tableType).collect { case CatalogTableType.VIEW => name @@ -460,7 +480,7 @@ class SessionCatalog( def tableExists(name: TableIdentifier): Boolean = synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) - if (isTemporaryTable(name)) { + if (isTemporaryTable(normalizeTableIdentifier(name))) { true } else { externalCatalog.tableExists(db, table) @@ -477,6 +497,33 @@ class SessionCatalog( name.database.isEmpty && tempTables.contains(formatTableName(name.table)) } + /** + * Normalize TableIdentifier by consistently ensuring the following two rules. + * 1. System temporary views should have None as database. + * 2. System temporary views should have prefixed table names. + * Currently, only INFORMATION_SCHEMA has temporary views. + */ + protected def normalizeTableIdentifier(name: TableIdentifier): TableIdentifier = synchronized { + if (name.database.isDefined) { + if (name.database.contains(INFORMATION_SCHEMA_DATABASE)) { + TableIdentifier(s"$INFORMATION_SCHEMA_DATABASE.${name.table}", None) + } else { + name + } + } else { + val tableName = formatTableName(name.table) + if (tableName.startsWith(INFORMATION_SCHEMA_DATABASE + ".")) { + TableIdentifier(tableName, None) + } else if (currentDb == INFORMATION_SCHEMA_DATABASE) { + TableIdentifier(s"$INFORMATION_SCHEMA_DATABASE.$tableName", None) + } else if (tempTables.contains(tableName)) { + TableIdentifier(tableName, None) + } else { + TableIdentifier(name.table, Some(currentDb)) + } + } + } + /** * List all tables in the specified database, including temporary tables. */ @@ -491,8 +538,11 @@ class SessionCatalog( val dbTables = externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) } synchronized { - val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern) + var _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, pattern) .map { t => TableIdentifier(t) } + if (db != INFORMATION_SCHEMA_DATABASE) { + _tempTables = _tempTables.filterNot(_.table.startsWith(INFORMATION_SCHEMA_DATABASE + ".")) + } dbTables ++ _tempTables } } @@ -907,8 +957,8 @@ class SessionCatalog( */ def reset(): Unit = synchronized { setCurrentDatabase(DEFAULT_DATABASE) - listDatabases().filter(_ != DEFAULT_DATABASE).foreach { db => - dropDatabase(db, ignoreIfNotExists = false, cascade = true) + listDatabases().filter(x => x != DEFAULT_DATABASE && x != INFORMATION_SCHEMA_DATABASE).foreach { + db => dropDatabase(db, ignoreIfNotExists = false, cascade = true) } listTables(DEFAULT_DATABASE).foreach { table => dropTable(table, ignoreIfNotExists = false, purge = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 0f6292db6217..797283999696 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.systemcatalog.InformationSchema import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} import org.apache.spark.sql.sources.BaseRelation @@ -544,7 +545,11 @@ class SparkSession private( * * @since 2.0.0 */ - @transient lazy val catalog: Catalog = new CatalogImpl(self) + @transient lazy val catalog: Catalog = { + val catalog = new CatalogImpl(self) + InformationSchema.registerInformationSchema(self) + catalog + } /** * Returns the specified table as a [[DataFrame]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala new file mode 100644 index 000000000000..8d0135985592 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchema.scala @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.systemcatalog + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.SessionCatalog.{DEFAULT_DATABASE, INFORMATION_SCHEMA_DATABASE} +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.CatalogImpl +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +/** + * INFORMATION_SCHEMA is a database consisting views which provide information about all of the + * tables, views, columns in a database. + * + * These views are designed to be populated by this package in order to be independent from + * Spark catalog. To keep minimal dependency, currently INFORMATION_SCHEMA views are implemented as + * Spark temporary views with a database prefix: `SessionCatalog.INFORMATION_SCHEMA_DATABASE`. + * + * The following is the class hierarchy in this package rooted at InformationSchemaRelationProvider. + * + * InformationSchemaRelationProvider + * -> DatabasesRelationProvider + * -> TablesRelationProvider + * -> ViewsRelationProvider + * -> ColumnsRelationProvider + * -> SessionVariablesRelationProvider + */ + +/** + * InformationSchema object provides bootstrap and utility functions. + */ +object InformationSchema { + + /** + * Register INFORMATION_SCHEMA database. SessionCatalog.catalog invokes this function. + */ + def registerInformationSchema(sparkSession: SparkSession): Unit = { + sparkSession.sql(s"CREATE DATABASE IF NOT EXISTS $INFORMATION_SCHEMA_DATABASE") + registerView(sparkSession, new DatabasesRelationProvider, Seq("schemata", "databases")) + registerView(sparkSession, new TablesRelationProvider, Seq("tables")) + registerView(sparkSession, new ViewsRelationProvider, Seq("views")) + registerView(sparkSession, new ColumnsRelationProvider, Seq("columns")) + registerView(sparkSession, new SessionVariablesRelationProvider, Seq("session_variables")) + } + + /** + * Register an INFORMATION_SCHEMA relation provider as a temporary view of Spark Catalog. + */ + private def registerView( + sparkSession: SparkSession, + relationProvider: RelationProvider, + names: Seq[String]) { + val plan = + LogicalRelation(relationProvider.createRelation(sparkSession.sqlContext, parameters = null)) + .analyze + val projectList = plan.output.zip(plan.schema).map { + case (attr, col) => Alias(attr, col.name)() + } + sparkSession.sessionState.executePlan(Project(projectList, plan)) + + for (name <- names) { + sparkSession.sessionState.catalog.createTempView(s"$INFORMATION_SCHEMA_DATABASE.$name", + plan, overrideIfExists = true) + } + } + + /** + * Only EqualTo filters are handled in INFORMATION_SCHEMA data sources. + */ + def unhandledFilters(filters: Array[Filter], columnName: String): Array[Filter] = { + import org.apache.spark.sql.sources.EqualTo + filters.filter { + case EqualTo(attribute, _) if attribute.equalsIgnoreCase(columnName) => false + case _ => true + } + } + + /** + * Return `EqualTo` filtered DataFrame. + */ + def getFilteredTables(sparkSession: SparkSession, filters: Seq[Expression], columnName: String) + : DataFrame = { + import org.apache.spark.sql.catalyst.expressions.EqualTo + val database = filters.filter { + case EqualTo(AttributeReference(name, _, _, _), Literal(_, StringType)) + if name.equalsIgnoreCase(columnName) => true + case _ => false + }.map(_.asInstanceOf[EqualTo].right.asInstanceOf[Literal].eval().toString()).headOption + + val tableList = if (database.nonEmpty) { + sparkSession.catalog.listTables(database.get) + } else { + // Lookup the all table information from all databases. + val allTables = new ArrayBuffer[org.apache.spark.sql.catalog.Table] + val nonSystemDBs = sparkSession.catalog.listDatabases() + .filter(_.name != INFORMATION_SCHEMA_DATABASE).collect() + for (db <- nonSystemDBs) + allTables ++= sparkSession.catalog.listTables(db.name).collect() + CatalogImpl.makeDataset(allTables, sparkSession) + } + + tableList.selectExpr("'default'", s"IFNULL(database, '$DEFAULT_DATABASE')", "name", + "IF(tableType='VIEW', 'VIEW', IF(isTemporary, 'VIEW', 'TABLE'))", + "IFNULL(description, 'VIEW')", "isTemporary" + ).toDF("TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "TABLE_TYPE", "VIEW_DEFINITION", + "isTemporary") + } +} + +/** + * A base class for all INFORMATION_SCHEMA relation providers. + */ +private abstract class InformationSchemaRelationProvider extends RelationProvider { + def relationSchema: StructType + + def makeRDD( + sparkSession: SparkSession, + requiredColumns: Seq[Attribute], + filters: Seq[Expression]): RDD[Row] + + def providerUnhandledFilters(filters: Array[Filter]): Array[Filter] = filters + + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + val sparkSession = sqlContext.sparkSession + + new BaseRelation with CatalystScan { + override def sqlContext: SQLContext = sparkSession.sqlContext + + override def schema: StructType = relationSchema + + override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = + makeRDD(sparkSession, requiredColumns, filters) + + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = + providerUnhandledFilters(filters) + } + } +} + +/** + * Provides DATABASES relation consisting all databases. + * Note that filters are not supported. + */ +private class DatabasesRelationProvider extends InformationSchemaRelationProvider { + def relationSchema: StructType = StructType(Seq( + StructField("CATALOG_NAME", StringType, nullable = false), + StructField("SCHEMA_NAME", StringType, nullable = false) + )) + + def makeRDD( + sparkSession: SparkSession, + requiredColumns: Seq[Attribute], + filters: Seq[Expression]): RDD[Row] = { + sparkSession.catalog.listDatabases() + .selectExpr( + "'default' as CATALOG_NAME", + s"regexp_replace(name, '$INFORMATION_SCHEMA_DATABASE.', '') as SCHEMA_NAME") + .selectExpr(requiredColumns.map(_.name): _*).rdd + } +} + +/** + * Provides TABLES relation containing all tables and views. + * Note that only `TABLE_SCHEMA='database_name'` is supported for pushdown predicates. + */ +private class TablesRelationProvider extends InformationSchemaRelationProvider { + override def relationSchema: StructType = StructType(Seq( + StructField("TABLE_CATALOG", StringType, nullable = false), + StructField("TABLE_SCHEMA", StringType, nullable = false), + StructField("TABLE_NAME", StringType, nullable = false), + StructField("TABLE_TYPE", StringType, nullable = false) + )) + + def makeRDD( + sparkSession: SparkSession, + requiredColumns: Seq[Attribute], + filters: Seq[Expression]): RDD[Row] = { + val x = InformationSchema.getFilteredTables(sparkSession, filters, "TABLE_SCHEMA").collect() + InformationSchema.getFilteredTables(sparkSession, filters, "TABLE_SCHEMA") + .selectExpr(requiredColumns.map(_.name): _*).rdd + } + + override def providerUnhandledFilters(filters: Array[Filter]): Array[Filter] = + InformationSchema.unhandledFilters(filters, "TABLE_SCHEMA") +} + +/** + * Provides VIEWS relation containing all views. + * Note that filters are not supported yet. + * Note that only `TABLE_SCHEMA='database_name'` is supported for pushdown predicates. + */ +private class ViewsRelationProvider extends InformationSchemaRelationProvider { + override def relationSchema: StructType = StructType(Seq( + StructField("TABLE_CATALOG", StringType, nullable = false), + StructField("TABLE_SCHEMA", StringType, nullable = false), + StructField("TABLE_NAME", StringType, nullable = false), + StructField("VIEW_DEFINITION", StringType, nullable = false) + )) + + def makeRDD( + sparkSession: SparkSession, + requiredColumns: Seq[Attribute], + filters: Seq[Expression]): RDD[Row] = { + val x = InformationSchema.getFilteredTables(sparkSession, filters, "TABLE_SCHEMA").collect() + InformationSchema.getFilteredTables(sparkSession, filters, "TABLE_SCHEMA") + .toDF("TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "TABLE_TYPE", "VIEW_DEFINITION", + "isTemporary") + .where("isTemporary OR TABLE_TYPE = 'VIEW'") + .selectExpr(requiredColumns.map(_.name): _*).rdd + } + + override def providerUnhandledFilters(filters: Array[Filter]): Array[Filter] = + InformationSchema.unhandledFilters(filters, "TABLE_SCHEMA") +} + +/** + * Provides COLUMNS relation containing all columns. + * Note that only `TABLE_SCHEMA='database_name'` is supported for pushdown predicates. + */ +private class ColumnsRelationProvider extends InformationSchemaRelationProvider { + override def relationSchema: StructType = StructType(Seq( + StructField("TABLE_CATALOG", StringType, nullable = false), + StructField("TABLE_SCHEMA", StringType, nullable = false), + StructField("TABLE_NAME", StringType, nullable = false), + StructField("COLUMN_NAME", StringType, nullable = false), + StructField("ORDINAL_POSITION", LongType, nullable = false), + StructField("IS_NULLABLE", BooleanType, nullable = false), + StructField("DATA_TYPE", StringType, nullable = false) + )) + + def makeRDD( + sparkSession: SparkSession, + requiredColumns: Seq[Attribute], + filters: Seq[Expression]): RDD[Row] = { + val tables = + InformationSchema.getFilteredTables(sparkSession, filters, "TABLE_SCHEMA").collect() + val result = new ArrayBuffer[Row] + for (t <- tables) { + val database = t(1).toString + val name = t(2).toString + val columnList = if (database == null) { + sparkSession.catalog.listColumns(name) + } else if (database == INFORMATION_SCHEMA_DATABASE) { + sparkSession.catalog.listColumns(s"$database.$name") + } else { + sparkSession.catalog.listColumns(database, name) + } + result ++= columnList.rdd.zipWithIndex.map { + case (col, index) => + Row("default", if (database != null) database else DEFAULT_DATABASE, + name, col.name, index, col.nullable, col.dataType) + }.collect() + } + sparkSession + .createDataFrame(sparkSession.sparkContext.parallelize(result), relationSchema) + .orderBy("TABLE_CATALOG", "TABLE_SCHEMA", "TABLE_NAME", "ORDINAL_POSITION") + .selectExpr(requiredColumns.map(_.name): _*).rdd + } + + override def providerUnhandledFilters(filters: Array[Filter]): Array[Filter] = + InformationSchema.unhandledFilters(filters, "TABLE_SCHEMA") +} + +/** + * Provides SESSION_VARIABLE relation containing all key-value pairs of session environment. + * Note that filters are not supported. + */ +private class SessionVariablesRelationProvider extends InformationSchemaRelationProvider { + override def relationSchema: StructType = StructType(Seq( + StructField("VARIABLE_NAME", StringType, nullable = false), + StructField("VARIABLE_VALUE", StringType, nullable = false) + )) + + def makeRDD( + sparkSession: SparkSession, + requiredColumns: Seq[Attribute], + filters: Seq[Expression]): RDD[Row] = { + val runtimeConfig = sparkSession.conf.getAll.toArray + val sqlConfig = sparkSession.sparkContext.conf.getAll + val allConfig = runtimeConfig ++ sqlConfig + val rdd = sparkSession.sparkContext.parallelize(allConfig).map { case (k, v) => Row(k, v) } + sparkSession + .createDataFrame(rdd, relationSchema) + .distinct() + .orderBy("VARIABLE_NAME") + .selectExpr(requiredColumns.map(_.name): _*).rdd + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b5499f2884c6..4efa61183c11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -671,7 +671,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { catalog.setCurrentDatabase("dbx") // rename without explicitly specifying database sql("ALTER TABLE tab2 RENAME TO tab1") - assert(catalog.listTables("dbx") == Seq(tableIdent1)) + assert(catalog.listTables("dbx").contains(tableIdent1)) + assert(!catalog.listTables("dbx").contains(tableIdent2)) // table to rename does not exist intercept[AnalysisException] { sql("ALTER TABLE dbx.does_not_exist RENAME TO tab2") @@ -993,7 +994,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("CREATE DATABASE showdb1A") // check the result as well as its order - checkDataset(sql("SHOW DATABASES"), Row("default"), Row("showdb1a"), Row("showdb2b")) + checkDataset( + sql("SHOW DATABASES"), + Row("default"), Row("information_schema"), Row("showdb1a"), Row("showdb2b")) checkAnswer( sql("SHOW DATABASES LIKE '*db1A'"), @@ -1593,6 +1596,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("drop information_schema database") { + Seq("true", "false").foreach { caseSensitive => + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) { + var message = intercept[AnalysisException] { + sql(s"DROP DATABASE ${SessionCatalog.INFORMATION_SCHEMA_DATABASE}") + }.getMessage + assert(message.contains( + s"Can not drop system database `${SessionCatalog.INFORMATION_SCHEMA_DATABASE}`")) + + message = intercept[AnalysisException] { + sql("DROP DATABASE Information_ScheMA") + }.getMessage + if (caseSensitive == "true") { + assert(message.contains("Database 'Information_ScheMA' not found")) + } else { + assert(message.contains( + s"Can not drop system database `${SessionCatalog.INFORMATION_SCHEMA_DATABASE}`")) + } + } + } + } + test("truncate table - datasource table") { import testImplicits._ val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchemaSuite.scala new file mode 100644 index 000000000000..2f755d9d705f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/systemcatalog/InformationSchemaSuite.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.systemcatalog + +import java.util.UUID + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.test.SharedSQLContext + +/** + * Information Schema Suite + */ +class InformationSchemaSuite extends QueryTest with SharedSQLContext { + val TESTDB = s"db_${UUID.randomUUID().toString.replace('-', '_')}" + + override def beforeAll(): Unit = { + super.beforeAll() + InformationSchema.registerInformationSchema(spark) + sql(s"create database if not exists $TESTDB") + sql(s"create table $TESTDB.t1(a int)") + sql(s"create view $TESTDB.v1 as select 1") + sql("use default") + } + + override def afterAll(): Unit = { + try { + sql("use default") + sql(s"drop database $TESTDB cascade") + } finally { + super.afterAll() + } + } + + test("use information_schema") { + sql("use information_schema") + sql("select * from databases") + sql("select * from schemata") + sql("select * from tables") + sql("select * from views") + sql("select * from columns") + sql("select * from session_variables") + sql("use default") + } + + test("catalog functions") { + sql("use default") + assert(spark.catalog.listTables().collect().length == 0) + spark.catalog.listColumns("information_schema.tables") + intercept[NoSuchTableException] { + spark.catalog.listColumns("tables") + } + + sql("use information_schema") + assert(spark.catalog.listTables().collect().length == 6) + spark.catalog.listColumns("tables") + + sql("use default") + } + + test("databases/schemata") { + checkAnswer( + sql("select * from information_schema.databases"), + Seq(Row("default", "default"), Row("default", "information_schema"), Row("default", TESTDB))) + + checkAnswer( + sql("select * from information_schema.schemata"), + Seq(Row("default", "default"), Row("default", "information_schema"), Row("default", TESTDB))) + + checkAnswer( + sql("select SCHEMA_NAME from information_schema.databases"), + Row(TESTDB) :: Row("default") :: Row("information_schema") :: Nil) + + checkAnswer( + sql("select SCHEMA_NAME from information_schema.schemata"), + Row(TESTDB) :: Row("default") :: Row("information_schema") :: Nil) + } + + test("tables") { + checkAnswer( + sql("select * from information_schema.tables"), + Row("default", TESTDB, "t1", "TABLE") :: + Row("default", TESTDB, "v1", "VIEW") :: Nil) + + checkAnswer( + sql("select TABLE_NAME from information_schema.tables"), + Row("t1") :: Row("v1") :: Nil) + + checkAnswer( + sql(s"select * from information_schema.tables where TABLE_SCHEMA='$TESTDB'"), + Row("default", TESTDB, "t1", "TABLE") :: + Row("default", TESTDB, "v1", "VIEW") :: Nil) + + checkAnswer( + sql("select * from information_schema.tables where TABLE_SCHEMA='default'"), + Nil) + } + + test("views") { + checkAnswer( + sql("select * from information_schema.views"), + Row("default", TESTDB, "v1", "VIEW") :: Nil) + + checkAnswer( + sql("select TABLE_NAME from information_schema.views"), + Row("v1") :: Nil) + + checkAnswer( + sql(s"select * from information_schema.views where TABLE_SCHEMA='$TESTDB'"), + Row("default", TESTDB, "v1", "VIEW") :: Nil) + + checkAnswer( + sql("select * from information_schema.views where TABLE_SCHEMA='default'"), + Nil) + } + + test("columns") { + checkAnswer( + sql("select * from information_schema.columns"), + Row("default", TESTDB, "t1", "a", 0, true, "int") :: + Row("default", TESTDB, "v1", "1", 0, false, "int") :: + Nil) + + checkAnswer( + sql("select COLUMN_NAME from information_schema.columns"), + Row("a") :: Row("1") :: Nil) + + checkAnswer( + sql("select COLUMN_NAME from information_schema.columns WHERE TABLE_NAME = 't1'"), + Row("a") :: Nil) + } + + test("session_variables") { + val df = sql("select * from information_schema.session_variables").collect() + assert(df.forall(_.length == 2)) + assert(df.exists(row => row(0) == "spark.app.id")) + assert(df.exists(row => row(0) == "spark.app.name")) + + val df2 = sql("select VARIABLE_NAME from information_schema.session_variables").collect() + assert(df2.forall(_.length == 1)) + assert(df2.exists(row => row(0) == "spark.app.id")) + assert(df2.exists(row => row(0) == "spark.app.name")) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 3dc67ffafb04..27e213ff07a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -122,14 +122,15 @@ class CatalogSuite } test("list databases") { - assert(spark.catalog.listDatabases().collect().map(_.name).toSet == Set("default")) + assert(spark.catalog.listDatabases().collect().map(_.name).toSet == + Set("default", "information_schema")) createDatabase("my_db1") createDatabase("my_db2") assert(spark.catalog.listDatabases().collect().map(_.name).toSet == - Set("default", "my_db1", "my_db2")) + Set("default", "information_schema", "my_db1", "my_db2")) dropDatabase("my_db1") assert(spark.catalog.listDatabases().collect().map(_.name).toSet == - Set("default", "my_db2")) + Set("default", "information_schema", "my_db2")) } test("list tables") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 85c509847d8e..2effaa443e95 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -54,9 +54,10 @@ private[sql] class HiveSessionCatalog( hadoopConf) { override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = { - val table = formatTableName(name.table) - if (name.database.isDefined || !tempTables.contains(table)) { - val database = name.database.map(formatDatabaseName) + val normalizeName = normalizeTableIdentifier(name) + val table = formatTableName(normalizeName.table) + if (normalizeName.database.isDefined || !tempTables.contains(table)) { + val database = normalizeName.database.map(formatDatabaseName) val newName = name.copy(database = database, table = table) metastoreCatalog.lookupRelation(newName, alias) } else {