diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 4d5d125ecdd7e..5afb2acd3ba6e 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -231,7 +231,7 @@ describeFuncName ; describeColName - : identifier ('.' (identifier | STRING))* + : identifier ('.' colpathIdentifier)* ; ctes @@ -454,6 +454,10 @@ tableIdentifier : (db=identifier '.')? table=identifier ; +colpathIdentifier + : identifier | ELEM_TYPE | KEY_TYPE | VALUE_TYPE + ; + namedExpression : expression (AS? (identifier | identifierList))? ; @@ -902,6 +906,9 @@ OPTION: 'OPTION'; ANTI: 'ANTI'; LOCAL: 'LOCAL'; INPATH: 'INPATH'; +KEY_TYPE: '$KEY$'; +VALUE_TYPE: '$VALUE$'; +ELEM_TYPE: '$ELEM$'; STRING : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b06f24bc4866a..acc9bbf9146fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, SimpleFunctionRegistry} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder -import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.catalyst.util.StringUtils diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 5efaf8f2010f2..eb2d6d9a42975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -110,6 +110,14 @@ case class CatalogTable( def partitionColumns: Seq[CatalogColumn] = schema.filter { c => partitionColumnNames.contains(c.name) } + /** Columns this table is bucketed by. */ + def bucketColumns: Seq[CatalogColumn] = + schema.filter { c => bucketColumnNames.contains(c.name) } + + /** Columns this table is sorted by. */ + def sortColumns: Seq[CatalogColumn] = + schema.filter { c => sortColumnNames.contains(c.name) } + /** Return the database this table was specified to belong to, assuming it exists. */ def database: String = identifier.database.getOrElse { throw new AnalysisException(s"table $identifier did not specify database") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8128a6efe3ccb..04e6a0f7fc8d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -218,32 +218,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { // Create the explain comment. val statement = plan(ctx.statement) - if (isExplainableStatement(statement)) { - ExplainCommand(statement, extended = options.exists(_.EXTENDED != null), - codegen = options.exists(_.CODEGEN != null)) - } else { - ExplainCommand(OneRowRelation) - } - } - - /** - * Determine if a plan should be explained at all. - */ - protected def isExplainableStatement(plan: LogicalPlan): Boolean = plan match { - case _: DescribeTableCommand => false - case _ => true + ExplainCommand(statement, extended = options.exists(_.EXTENDED != null), + codegen = options.exists(_.CODEGEN != null)) } /** - * Create a [[DescribeTableCommand]] logical plan. + * A command for users to describe a table in the given database. If a databaseName is not given, + * the current database will be used. + * The syntax of using this command in SQL is: + * {{{ + * DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec] + * }}} */ override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) { // FORMATTED and columns are not supported. Return null and let the parser decide what to do // with this (create an exception or pass it on to a different system). - if (ctx.describeColName != null || ctx.FORMATTED != null || ctx.partitionSpec != null) { + if (ctx.FORMATTED != null) { null } else { - DescribeTableCommand(visitTableIdentifier(ctx.tableIdentifier), ctx.EXTENDED != null) + val partitionKeys = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) + val columnPath = Option(ctx.describeColName).map(visitDescribeColName) + DescribeTableCommand( + visitTableIdentifier(ctx.tableIdentifier), + partitionKeys, + columnPath, + ctx.EXTENDED != null) } } @@ -349,6 +348,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } + /** + * A column path can be specified as an parameter to describe command. It is a dot separated + * list of identifiers with three special kinds of identifiers namely '$elem$', '$key$' and + * '$value$' which are used to represent array element, map key and values respectively. + */ + override def visitDescribeColName(ctx: DescribeColNameContext): String = { + var result = ctx.identifier.getText + if (!ctx.colpathIdentifier.isEmpty) { + result = result ++ "." ++ ctx.colpathIdentifier.asScala.map { _.getText}.mkString(".") + } + result + } + /** * Create a [[CreateDatabase]] command. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 6078918316d9e..d876021e40731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -24,11 +24,13 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, UnaryNode} -import org.apache.spark.sql.types.{BooleanType, MetadataBuilder, StringType} +import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils case class CreateTableAsSelectLogicalPlan( @@ -268,12 +270,26 @@ case class LoadData( } /** - * Command that looks like + * A command for users to describe a table in the given database. If a databaseName is not given, + * the current database will be used. + * The syntax of using this command in SQL is: * {{{ - * DESCRIBE (EXTENDED) table_name; + * DESCRIBE [EXTENDED|FORMATTED] [db_name.]table_name [column_name] [PARTITION partition_spec] * }}} + * Note : FORMATTED option is not supported. + * @param table table to be described. + * @param partSpec spec If specified, the specified partition is described. It is effective only + * when the table is a Hive table + * @param colPath If specified, only the specified column is described. It is effective only + * when the table is a Hive table + * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false. It is effective only + * when the table is a Hive table */ -case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) +case class DescribeTableCommand( + table: TableIdentifier, + partSpec: Option[TablePartitionSpec], + colPath: Option[String], + isExtended: Boolean) extends RunnableCommand { override val output: Seq[Attribute] = Seq( @@ -286,20 +302,147 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) new MetadataBuilder().putString("comment", "comment of the column").build())() ) + private def formatColumns(cols: Seq[CatalogColumn]): String = { + cols.map { col => + s""" + |${col.getClass.getSimpleName} + |(name:${col.name} + |type:${col.dataType} + |comment:${col.comment.orNull} + """.stripMargin + }.mkString(",") + } + + private def formatProperties(props: Map[String, String]): String = { + props.map { + case (k, v) => s"$k=$v" + }.mkString("{", ", ", "}") + } + + private def getPartValues(part: CatalogTablePartition, cols: Seq[String]): String = { + cols.map { name => + PartitioningUtils.escapePathName(part.spec(name)) + }.mkString(", ") + } + + private def descColPath(table: CatalogTable, colPath: String): Array[Row] = { + val names = colPath.split("\\."); + val lastName = names(names.length - 1) + val fields = table.schema.map {c => + StructField(c.name, CatalystSqlParser.parseDataType(c.dataType), c.nullable) + } + var dataType: DataType = StructType(fields) + for (i <- 0 to names.length -1) { + dataType match { + case s: StructType => + try { + dataType = s.apply(names(i)).dataType + } catch { + case e: Exception => + throw new AnalysisException(s"Column name/path: ${colPath} does not exist.") + } + case m: MapType if names(i) == "$key$" => dataType = m.keyType + case m: MapType if names(i) == "$value$" => dataType = m.valueType + case a: ArrayType if names(i) == "$value$" => dataType = a.elementType + case _ => throw new AnalysisException("Column name/path: ${colPath} does not exist") + } + } + + val result: Seq[Row] = dataType match { + case s: StructType => + s.map { f => + Row(f.name, f.dataType.simpleString, "from deserializer")} + case d: DataType => Seq(Row(lastName, dataType.simpleString, "from deserializer")) + } + result.toArray + } + + private def descStorageFormat( + table: CatalogTable, + storage: CatalogStorageFormat): String = { + // TODO - check with Lian - from StorageDesc - compress, skewedInfo, storedAsSubDirectories + // are not availble. So these are dropped from the output. + val storageLocationStr = + s""" + |${storage.getClass.getSimpleName}(location:${storage.locationUri.orNull}, + | inputFormat:${storage.inputFormat.orNull}, + | outputFormat:${storage.outputFormat.orNull}, + | numBuckets:${table.numBuckets}, + | serializationLib=${storage.serde.orNull}, + | parameters=${formatProperties(storage.serdeProperties)}, + | bucketCols:[${formatColumns(table.bucketColumns)}], + | sortCols=[${formatColumns(table.sortColumns)}]) + """.stripMargin.replaceAll("\n", "").trim + storageLocationStr + } + + private def descPartExtended(table: CatalogTable, part: CatalogTablePartition): String = { + val result = StringBuilder.newBuilder + val clsName = part.getClass.getSimpleName + result ++= s"${clsName}(values:[${getPartValues(part, table.partitionColumnNames)}], " + result ++= s"dbName:${table.database}, " + // TODO - check with Lian - no owner info available. + result ++= s"createTime:${table.createTime}, " + result ++= s"lastAccessTime:${table.lastAccessTime}, " + // TODO - check with Lian - no retention info available. + + result ++= s"sd:${descStorageFormat(table, part.storage)}, " + // TODO Check with Lian - Hive prints partition keys here. Since we output paritioning keys and + // schema already at the start i don't output it here again. + result ++= s"parameters:${formatProperties(table.properties)}, " + result ++= s"viewOriginalText:${table.viewOriginalText.orNull}, " + result ++= s"viewExpandedText:${table.viewText.orNull}, " + result ++= s"tableType:${table.tableType})" + result.toString + } + + private def descTableExtended(table: CatalogTable): String = { + val result = StringBuilder.newBuilder + result ++= s"${table.getClass.getSimpleName}(tableName:${table.identifier.table}, " + result ++= s"dbName:${table.database}, " + // TODO - check with Lian - no owner info available. + result ++= s"createTime:${table.createTime}, " + result ++= s"lastAccessTime:${table.lastAccessTime}, " + // TODO - check with Lian - no retention info available. + + result ++= s"sd:${descStorageFormat(table, table.storage)}, " + // TODO Check with Lian - Hive prints partition keys here. Since we output paritioning keys + // and schema already i don't output it here again. + result ++= s"parameters:${formatProperties(table.properties)}, " + result ++= s"viewOriginalText:${table.viewOriginalText.orNull}, " + result ++= s"viewExpandedText:${table.viewText.orNull}, " + result ++= s"tableType:${table.tableType})" + result.toString + } + override def run(sparkSession: SparkSession): Seq[Row] = { val result = new ArrayBuffer[Row] - sparkSession.sessionState.catalog.lookupRelation(table) match { + val catalog = sparkSession.sessionState.catalog + catalog.lookupRelation(table) match { case catalogRelation: CatalogRelation => - catalogRelation.catalogTable.schema.foreach { column => - result += Row(column.name, column.dataType, column.comment.orNull) - } - - if (catalogRelation.catalogTable.partitionColumns.nonEmpty) { - result += Row("# Partition Information", "", "") - result += Row(s"# ${output(0).name}", output(1).name, output(2).name) + val tab = catalogRelation.catalogTable + val part = partSpec.map(p => Option(catalog.getPartition(table, p))).getOrElse(None) + if (colPath.nonEmpty) { + result ++= descColPath(tab, colPath.get) + } else { + catalogRelation.catalogTable.schema.foreach { column => + result += Row(column.name, column.dataType, column.comment.orNull) + } + if (tab.partitionColumns.nonEmpty) { + result += Row("# Partition Information", "", "") + result += Row(s"# ${output(0).name}", output(1).name, output(2).name) - catalogRelation.catalogTable.partitionColumns.foreach { col => - result += Row(col.name, col.dataType, col.comment.orNull) + tab.partitionColumns.foreach { col => + result += Row(col.name, col.dataType, col.comment.orNull) + } + } + if (isExtended) { + if (partSpec.isEmpty) { + result += Row("Detailed Table Information", descTableExtended(tab), "") + } else { + result += + Row("Detailed Partition Information", descPartExtended(tab, part.get), "") + } } } @@ -315,7 +458,6 @@ case class DescribeTableCommand(table: TableIdentifier, isExtended: Boolean) } } - /** * A command for users to get tables in the given database. * If a databaseName is not given, the current database will be used. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index be0f4d78a5237..71352070be0c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -757,4 +757,28 @@ class DDLCommandSuite extends PlanTest { comparePlans(parsed2, expected2) comparePlans(parsed3, expected3) } + + test("describe table") { + val parsed1 = parser.parsePlan("DESCRIBE tab1") + val expected1 = DescribeTableCommand(TableIdentifier("tab1", None), None, None, false) + val parsed2 = parser.parsePlan("DESCRIBE db1.tab1") + val expected2 = DescribeTableCommand(TableIdentifier("tab1", Some("db1")), None, None, false) + val parsed3 = parser.parsePlan("DESCRIBE tab1 col1") + val expected3 = DescribeTableCommand(TableIdentifier("tab1", None), None, Some("col1"), false) + val parsed4 = parser.parsePlan("DESCRIBE tab1 PARTITION (c1 = 'val1')") + val expected4 = DescribeTableCommand(TableIdentifier("tab1", None), + Some(Map("c1" -> "val1")), None, false) + val parsed5 = parser.parsePlan("DESCRIBE EXTENDED tab1 PARTITION (c1 = 'val1')") + val expected5 = DescribeTableCommand(TableIdentifier("tab1", None), + Some(Map("c1" -> "val1")), None, true) + val parsed6 = parser.parsePlan("DESCRIBE EXTENDED tab1 tab1.col1.field1.$elem$") + val expected6 = DescribeTableCommand(TableIdentifier("tab1", None), + None, Some("tab1.col1.field1.$elem$"), true) + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + comparePlans(parsed5, expected5) + comparePlans(parsed6, expected6) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 8b3f2d1a0cd07..b843a06de3c67 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, NoSuchTableException} +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -61,6 +61,11 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto |PARTITION(year = 2016, month = 4, hour = 10, minute = 10, sec = 10, extra = 1) SELECT 3, 3 """.stripMargin) sql("CREATE VIEW parquet_view1 as select * from parquet_tab4") + sql( + """ + |CREATE TABLE tab_complex (col1 map , + |col2 struct , col3 map>) + """.stripMargin) } override protected def afterAll(): Unit = { @@ -71,6 +76,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("DROP VIEW IF EXISTS parquet_view1") sql("DROP TABLE IF EXISTS parquet_tab4") sql("DROP TABLE IF EXISTS parquet_tab5") + sql("DROP TABLE IF EXISTS tab_complex") } finally { super.afterAll() } @@ -152,6 +158,47 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } } + test("describe table - negative tests") { + val message1 = intercept[NoSuchTableException] { + sql("DESCRIBE bad_table") + }.getMessage + assert(message1.contains("Table or View bad_table not found")) + val message2 = intercept[AnalysisException] { + sql("DESCRIBE parquet_tab2 PARTITION (day=31)") + }.getMessage + assert(message2.contains("table is not partitioned but partition spec exists: {day=31}")) + val message3 = intercept[AnalysisException] { + sql("DESCRIBE parquet_tab4 PARTITION (Year=2000, month='10')") + }.getMessage + assert(message3.contains("Partition not found in table parquet_tab4 database default")) + val message4 = intercept[AnalysisException] { + sql("DESCRIBE parquet_tab2 invalidCol") + }.getMessage + assert(message4.contains("Column name/path: invalidCol does not exist")) + } + + test("describe column - nested") { + checkAnswer( + sql("describe tab_complex col1.$key$"), + Row("$key$", "int", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col1.$value$"), + Row("$value$", "string", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col1"), + Row("col1", "map", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col2.f1"), + Row("f1", "int", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col2"), + Row("f1", "int", "from deserializer") :: + Row("f2", "string", "from deserializer") :: Nil) + checkAnswer( + sql("describe tab_complex col3.$value$.f3"), + Row("f3", "int", "from deserializer") :: Nil) + } + test("LOAD DATA") { withTable("non_part_table", "part_table") { sql( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 3bf0e84267419..43f1b03e768b9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -819,6 +819,35 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { .collect() } + // Describe a column is a native command + assertResult(Array(Row("value", "string", "from deserializer"))) { + sql("DESCRIBE test_describe_commands1 value") + .select('col_name, 'data_type, 'comment) + .collect() + } + + // Describe a column is a native command + assertResult(Array(Row("value", "string", "from deserializer"))) { + sql("DESCRIBE default.test_describe_commands1 value") + .select('col_name, 'data_type, 'comment) + .collect() + } + + // Describe a partition is a native command + assertResult( + Array( + Row("key", "int"), + Row("value", "string"), + Row("dt", "string"), + Row("# Partition Information", ""), + Row("# col_name", "data_type"), + Row("dt", "string")) + ) { + sql("DESCRIBE test_describe_commands1 PARTITION (dt='2008-06-08')") + .select('col_name, 'data_type) + .collect() + } + // Describe a registered temporary table. val testData = TestHive.sparkContext.parallelize(