diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index 88ab51c1d70b..20c22388b0ef 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -20,6 +20,9 @@ import org.apache.spark.annotation.Experimental; import org.apache.spark.sql.types.DataType; +import java.util.Arrays; +import java.util.Objects; + /** * TableChange subclasses represent requested changes to a table. These are passed to * {@link TableCatalog#alterTable}. For example, @@ -210,6 +213,20 @@ public String property() { public String value() { return value; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SetProperty that = (SetProperty) o; + return property.equals(that.property) && + value.equals(that.value); + } + + @Override + public int hashCode() { + return Objects.hash(property, value); + } } /** @@ -227,6 +244,19 @@ private RemoveProperty(String property) { public String property() { return property; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RemoveProperty that = (RemoveProperty) o; + return property.equals(that.property); + } + + @Override + public int hashCode() { + return Objects.hash(property); + } } interface ColumnChange extends TableChange { @@ -269,6 +299,24 @@ public boolean isNullable() { public String comment() { return comment; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AddColumn addColumn = (AddColumn) o; + return isNullable == addColumn.isNullable && + Arrays.equals(fieldNames, addColumn.fieldNames) && + dataType.equals(addColumn.dataType) && + comment.equals(addColumn.comment); + } + + @Override + public int hashCode() { + int result = Objects.hash(dataType, isNullable, comment); + result = 31 * result + Arrays.hashCode(fieldNames); + return result; + } } /** @@ -296,6 +344,22 @@ public String[] fieldNames() { public String newName() { return newName; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RenameColumn that = (RenameColumn) o; + return Arrays.equals(fieldNames, that.fieldNames) && + newName.equals(that.newName); + } + + @Override + public int hashCode() { + int result = Objects.hash(newName); + result = 31 * result + Arrays.hashCode(fieldNames); + return result; + } } /** @@ -328,6 +392,23 @@ public DataType newDataType() { public boolean isNullable() { return isNullable; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UpdateColumnType that = (UpdateColumnType) o; + return isNullable == that.isNullable && + Arrays.equals(fieldNames, that.fieldNames) && + newDataType.equals(that.newDataType); + } + + @Override + public int hashCode() { + int result = Objects.hash(newDataType, isNullable); + result = 31 * result + Arrays.hashCode(fieldNames); + return result; + } } /** @@ -354,6 +435,22 @@ public String[] fieldNames() { public String newComment() { return newComment; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UpdateColumnComment that = (UpdateColumnComment) o; + return Arrays.equals(fieldNames, that.fieldNames) && + newComment.equals(that.newComment); + } + + @Override + public int hashCode() { + int result = Objects.hash(newComment); + result = 31 * result + Arrays.hashCode(fieldNames); + return result; + } } /** @@ -372,6 +469,19 @@ private DeleteColumn(String[] fieldNames) { public String[] fieldNames() { return fieldNames; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteColumn that = (DeleteColumn) o; + return Arrays.equals(fieldNames, that.fieldNames); + } + + @Override + public int hashCode() { + return Arrays.hashCode(fieldNames); + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5e22d4ecde2a..4fb713b8108c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -128,6 +128,8 @@ class Analyzer( private val catalog: SessionCatalog = catalogManager.v1SessionCatalog + override def isView(nameParts: Seq[String]): Boolean = catalog.isView(nameParts) + // Only for tests. def this(catalog: SessionCatalog, conf: SQLConf) = { this( @@ -195,8 +197,7 @@ class Analyzer( new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: - ResolveAlterTable :: - ResolveDescribeTable :: + new ResolveCatalogs(catalogManager) :: ResolveInsertInto :: ResolveTables :: ResolveRelations :: @@ -680,6 +681,11 @@ class Analyzer( lookupV2Relation(u.multipartIdentifier) .map(v2Relation => i.copy(table = v2Relation)) .getOrElse(i) + + case u: UnresolvedV2Relation => + CatalogV2Util.loadTable(u.catalog, u.tableName).map { table => + DataSourceV2Relation.create(table) + }.getOrElse(u) } } @@ -910,82 +916,6 @@ class Analyzer( } } - /** - * Resolve ALTER TABLE statements that use a DSv2 catalog. - * - * This rule converts unresolved ALTER TABLE statements to v2 when a v2 catalog is responsible - * for the table identifier. A v2 catalog is responsible for an identifier when the identifier - * has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and - * the table identifier does not include a catalog. - */ - object ResolveAlterTable extends Rule[LogicalPlan] { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case alter @ AlterTableAddColumnsStatement(tableName, cols) => - val changes = cols.map { col => - TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull) - } - resolveV2Alter(tableName, changes).getOrElse(alter) - - case alter @ AlterTableAlterColumnStatement(tableName, colName, dataType, comment) => - val typeChange = dataType.map { newDataType => - TableChange.updateColumnType(colName.toArray, newDataType, true) - } - - val commentChange = comment.map { newComment => - TableChange.updateColumnComment(colName.toArray, newComment) - } - - resolveV2Alter(tableName, typeChange.toSeq ++ commentChange.toSeq).getOrElse(alter) - - case alter @ AlterTableRenameColumnStatement(tableName, col, newName) => - val changes = Seq(TableChange.renameColumn(col.toArray, newName)) - resolveV2Alter(tableName, changes).getOrElse(alter) - - case alter @ AlterTableDropColumnsStatement(tableName, cols) => - val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) - resolveV2Alter(tableName, changes).getOrElse(alter) - - case alter @ AlterTableSetPropertiesStatement(tableName, props) => - val changes = props.map { case (key, value) => - TableChange.setProperty(key, value) - } - - resolveV2Alter(tableName, changes.toSeq).getOrElse(alter) - - case alter @ AlterTableUnsetPropertiesStatement(tableName, keys, _) => - resolveV2Alter(tableName, keys.map(key => TableChange.removeProperty(key))).getOrElse(alter) - - case alter @ AlterTableSetLocationStatement(tableName, newLoc) => - resolveV2Alter(tableName, Seq(TableChange.setProperty("location", newLoc))).getOrElse(alter) - } - - private def resolveV2Alter( - tableName: Seq[String], - changes: Seq[TableChange]): Option[AlterTable] = { - lookupV2RelationAndCatalog(tableName).map { - case (relation, catalog, ident) => - AlterTable(catalog.asTableCatalog, ident, relation, changes) - } - } - } - - /** - * Resolve DESCRIBE TABLE statements that use a DSv2 catalog. - * - * This rule converts unresolved DESCRIBE TABLE statements to v2 when a v2 catalog is responsible - * for the table identifier. A v2 catalog is responsible for an identifier when the identifier - * has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and - * the table identifier does not include a catalog. - */ - object ResolveDescribeTable extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case describe @ DescribeTableStatement( - CatalogObjectIdentifier(Some(v2Catalog), ident), _, isExtended) => - DescribeTable(UnresolvedRelation(describe.tableName), isExtended) - } - } - /** * Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from * a logical plan node's children. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 4a19806bd80f..db4ed47fa54c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableStatement, InsertIntoStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -34,6 +34,8 @@ import org.apache.spark.sql.types._ */ trait CheckAnalysis extends PredicateHelper { + protected def isView(nameParts: Seq[String]): Boolean + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ /** @@ -96,6 +98,13 @@ trait CheckAnalysis extends PredicateHelper { case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) => failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}") + case u: UnresolvedV2Relation if isView(u.originalNameParts) => + u.failAnalysis( + s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") + + case u: UnresolvedV2Relation => + u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + case operator: LogicalPlan => // Check argument data types of higher-order functions downwards first. // If the arguments of the higher-order functions are resolved but the type check fails, @@ -357,9 +366,6 @@ trait CheckAnalysis extends PredicateHelper { case _ => } - case alter: AlterTableStatement => - alter.failAnalysis(s"Table or view not found: ${alter.tableName.quoted}") - case alter: AlterTable if alter.childrenResolved => val table = alter.table def findField(operation: String, fieldName: Array[String]): StructField = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala new file mode 100644 index 000000000000..3757569443e7 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.sql._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange} + +/** + * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements + * to the corresponding v2 commands if the resolved catalog is not the session catalog. + */ +class ResolveCatalogs(val catalogManager: CatalogManager) + extends Rule[LogicalPlan] with LookupCatalog { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + import org.apache.spark.sql.connector.catalog.CatalogV2Util._ + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case AlterTableAddColumnsStatement( + nameParts @ NonSessionCatalog(catalog, tableName), cols) => + val changes = cols.map { col => + TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull) + } + createAlterTable(nameParts, catalog, tableName, changes) + + case AlterTableAlterColumnStatement( + nameParts @ NonSessionCatalog(catalog, tableName), colName, dataType, comment) => + val typeChange = dataType.map { newDataType => + TableChange.updateColumnType(colName.toArray, newDataType, true) + } + val commentChange = comment.map { newComment => + TableChange.updateColumnComment(colName.toArray, newComment) + } + createAlterTable(nameParts, catalog, tableName, typeChange.toSeq ++ commentChange) + + case AlterTableRenameColumnStatement( + nameParts @ NonSessionCatalog(catalog, tableName), col, newName) => + val changes = Seq(TableChange.renameColumn(col.toArray, newName)) + createAlterTable(nameParts, catalog, tableName, changes) + + case AlterTableDropColumnsStatement( + nameParts @ NonSessionCatalog(catalog, tableName), cols) => + val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) + createAlterTable(nameParts, catalog, tableName, changes) + + case AlterTableSetPropertiesStatement( + nameParts @ NonSessionCatalog(catalog, tableName), props) => + val changes = props.map { case (key, value) => + TableChange.setProperty(key, value) + }.toSeq + createAlterTable(nameParts, catalog, tableName, changes) + + // TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag. + case AlterTableUnsetPropertiesStatement( + nameParts @ NonSessionCatalog(catalog, tableName), keys, _) => + val changes = keys.map(key => TableChange.removeProperty(key)) + createAlterTable(nameParts, catalog, tableName, changes) + + case AlterTableSetLocationStatement( + nameParts @ NonSessionCatalog(catalog, tableName), newLoc) => + val changes = Seq(TableChange.setProperty("location", newLoc)) + createAlterTable(nameParts, catalog, tableName, changes) + + case AlterViewSetPropertiesStatement( + NonSessionCatalog(catalog, tableName), props) => + throw new AnalysisException( + s"Can not specify catalog `${catalog.name}` for view ${tableName.quoted} " + + s"because view support in catalog has not been implemented yet") + + case AlterViewUnsetPropertiesStatement( + NonSessionCatalog(catalog, tableName), keys, ifExists) => + throw new AnalysisException( + s"Can not specify catalog `${catalog.name}` for view ${tableName.quoted} " + + s"because view support in catalog has not been implemented yet") + + case DeleteFromStatement( + nameParts @ NonSessionCatalog(catalog, tableName), tableAlias, condition) => + val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier) + val aliased = tableAlias.map(SubqueryAlias(_, r)).getOrElse(r) + DeleteFromTable(aliased, condition) + + case update: UpdateTableStatement => + throw new AnalysisException(s"UPDATE TABLE is not supported temporarily.") + + case DescribeTableStatement( + nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, isExtended) => + if (partitionSpec.nonEmpty) { + throw new AnalysisException("DESCRIBE TABLE does not support partition for v2 tables.") + } + val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier) + DescribeTable(r, isExtended) + + case DescribeColumnStatement( + NonSessionCatalog(catalog, tableName), colNameParts, isExtended) => + throw new AnalysisException("Describing columns is not supported for v2 tables.") + + case c @ CreateTableStatement( + NonSessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => + CreateV2Table( + catalog.asTableCatalog, + tableName.asIdentifier, + c.tableSchema, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider), + ignoreIfExists = c.ifNotExists) + + case c @ CreateTableAsSelectStatement( + NonSessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => + CreateTableAsSelect( + catalog.asTableCatalog, + tableName.asIdentifier, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + c.asSelect, + convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider), + writeOptions = c.options.filterKeys(_ != "path"), + ignoreIfExists = c.ifNotExists) + + case c @ ReplaceTableStatement( + NonSessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => + ReplaceTable( + catalog.asTableCatalog, + tableName.asIdentifier, + c.tableSchema, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider), + orCreate = c.orCreate) + + case c @ ReplaceTableAsSelectStatement( + NonSessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => + ReplaceTableAsSelect( + catalog.asTableCatalog, + tableName.asIdentifier, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + c.asSelect, + convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider), + writeOptions = c.options.filterKeys(_ != "path"), + orCreate = c.orCreate) + + case DropTableStatement(NonSessionCatalog(catalog, tableName), ifExists, _) => + DropTable(catalog.asTableCatalog, tableName.asIdentifier, ifExists) + + case DropViewStatement(NonSessionCatalog(catalog, viewName), _) => + throw new AnalysisException( + s"Can not specify catalog `${catalog.name}` for view ${viewName.quoted} " + + s"because view support in catalog has not been implemented yet") + + case ShowNamespacesStatement(Some(NonSessionCatalog(catalog, nameParts)), pattern) => + val namespace = if (nameParts.isEmpty) None else Some(nameParts) + ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern) + + // TODO (SPARK-29014): we should check if the current catalog is not session catalog here. + case ShowNamespacesStatement(None, pattern) if defaultCatalog.isDefined => + ShowNamespaces(defaultCatalog.get.asNamespaceCatalog, None, pattern) + + case ShowTablesStatement(Some(NonSessionCatalog(catalog, nameParts)), pattern) => + ShowTables(catalog.asTableCatalog, nameParts, pattern) + + // TODO (SPARK-29014): we should check if the current catalog is not session catalog here. + case ShowTablesStatement(None, pattern) if defaultCatalog.isDefined => + ShowTables(defaultCatalog.get.asTableCatalog, catalogManager.currentNamespace, pattern) + + case UseStatement(isNamespaceSet, nameParts) => + if (isNamespaceSet) { + SetCatalogAndNamespace(catalogManager, None, Some(nameParts)) + } else { + val CurrentCatalogAndNamespace(catalog, namespace) = nameParts + val ns = if (namespace.isEmpty) { None } else { Some(namespace) } + SetCatalogAndNamespace(catalogManager, Some(catalog.name()), ns) + } + } + + object NonSessionCatalog { + def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match { + case CatalogAndIdentifierParts(catalog, parts) if !isSessionCatalog(catalog) => + Some(catalog -> parts) + case _ => None + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 40f25fb7f972..e5a6f30c330e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.types.{DataType, Metadata, StructType} /** @@ -59,6 +60,28 @@ object UnresolvedRelation { UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table) } +/** + * A variant of [[UnresolvedRelation]] which can only be resolved to a v2 relation + * (`DataSourceV2Relation`), not v1 relation or temp view. + * + * @param originalNameParts the original table identifier name parts before catalog is resolved. + * @param catalog The catalog which the table should be looked up from. + * @param tableName The name of the table to look up. + */ +case class UnresolvedV2Relation( + originalNameParts: Seq[String], + catalog: TableCatalog, + tableName: Identifier) + extends LeafNode with NamedRelation { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override def name: String = originalNameParts.quoted + + override def output: Seq[Attribute] = Nil + + override lazy val resolved = false +} + /** * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4d2dee5da383..be8526454f9f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -781,6 +781,18 @@ class SessionCatalog( } } + def isView(nameParts: Seq[String]): Boolean = { + nameParts.length <= 2 && { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val ident = nameParts.asTableIdentifier + try { + getTempViewOrPermanentTableMetadata(ident).tableType == CatalogTableType.VIEW + } catch { + case _: NoSuchTableException => false + } + } + } + /** * List all tables in the specified database, including local temporary views. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/AlterTableStatements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/AlterTableStatements.scala index 8c1b54be46cf..9d7dec9ae0ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/AlterTableStatements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/AlterTableStatements.scala @@ -24,16 +24,12 @@ import org.apache.spark.sql.types.DataType */ case class QualifiedColType(name: Seq[String], dataType: DataType, comment: Option[String]) -trait AlterTableStatement extends ParsedStatement { - val tableName: Seq[String] -} - /** * ALTER TABLE ... ADD COLUMNS command, as parsed from SQL. */ case class AlterTableAddColumnsStatement( tableName: Seq[String], - columnsToAdd: Seq[QualifiedColType]) extends AlterTableStatement + columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement /** * ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL. @@ -42,7 +38,7 @@ case class AlterTableAlterColumnStatement( tableName: Seq[String], column: Seq[String], dataType: Option[DataType], - comment: Option[String]) extends AlterTableStatement + comment: Option[String]) extends ParsedStatement /** * ALTER TABLE ... RENAME COLUMN command, as parsed from SQL. @@ -50,21 +46,21 @@ case class AlterTableAlterColumnStatement( case class AlterTableRenameColumnStatement( tableName: Seq[String], column: Seq[String], - newName: String) extends AlterTableStatement + newName: String) extends ParsedStatement /** * ALTER TABLE ... DROP COLUMNS command, as parsed from SQL. */ case class AlterTableDropColumnsStatement( tableName: Seq[String], - columnsToDrop: Seq[Seq[String]]) extends AlterTableStatement + columnsToDrop: Seq[Seq[String]]) extends ParsedStatement /** * ALTER TABLE ... SET TBLPROPERTIES command, as parsed from SQL. */ case class AlterTableSetPropertiesStatement( tableName: Seq[String], - properties: Map[String, String]) extends AlterTableStatement + properties: Map[String, String]) extends ParsedStatement /** * ALTER TABLE ... UNSET TBLPROPERTIES command, as parsed from SQL. @@ -72,11 +68,11 @@ case class AlterTableSetPropertiesStatement( case class AlterTableUnsetPropertiesStatement( tableName: Seq[String], propertyKeys: Seq[String], - ifExists: Boolean) extends AlterTableStatement + ifExists: Boolean) extends ParsedStatement /** * ALTER TABLE ... SET LOCATION command, as parsed from SQL. */ case class AlterTableSetLocationStatement( tableName: Seq[String], - location: String) extends AlterTableStatement + location: String) extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala index 031e0586a5f0..4e5341839a72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connector.catalog import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform} import org.apache.spark.sql.types.StructType @@ -93,9 +94,21 @@ private[sql] object CatalogV2Implicits { } implicit class MultipartIdentifierHelper(parts: Seq[String]) { - def quoted: String = parts.map(quote).mkString(".") + if (parts.isEmpty) { + throw new AnalysisException("multi-part identifier cannot be empty.") + } def asIdentifier: Identifier = Identifier.of(parts.init.toArray, parts.last) + + def asTableIdentifier: TableIdentifier = parts match { + case Seq(tblName) => TableIdentifier(tblName) + case Seq(dbName, tblName) => TableIdentifier(tblName, Some(dbName)) + case _ => + throw new AnalysisException( + s"$quoted is not a valid TableIdentifier as it has more than 2 name parts.") + } + + def quoted: String = parts.map(quote).mkString(".") } private def quote(part: String): String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index a61a55b22285..6d8c6f845641 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -21,8 +21,11 @@ import java.util import java.util.Collections import scala.collection.JavaConverters._ +import scala.collection.mutable -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} +import org.apache.spark.sql.catalyst.plans.logical.AlterTable import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -220,4 +223,59 @@ private[sql] object CatalogV2Util { case _: NoSuchDatabaseException => None case _: NoSuchNamespaceException => None } + + def isSessionCatalog(catalog: CatalogPlugin): Boolean = { + catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME) + } + + def convertTableProperties( + properties: Map[String, String], + options: Map[String, String], + location: Option[String], + comment: Option[String], + provider: String): Map[String, String] = { + if (options.contains("path") && location.isDefined) { + throw new AnalysisException( + "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + + "you can only specify one of them.") + } + + if ((options.contains("comment") || properties.contains("comment")) + && comment.isDefined) { + throw new AnalysisException( + "COMMENT and option/property 'comment' are both used to set the table comment, you can " + + "only specify one of them.") + } + + if (options.contains("provider") || properties.contains("provider")) { + throw new AnalysisException( + "USING and option/property 'provider' are both used to set the provider implementation, " + + "you can only specify one of them.") + } + + val filteredOptions = options.filterKeys(_ != "path") + + // create table properties from TBLPROPERTIES and OPTIONS clauses + val tableProperties = new mutable.HashMap[String, String]() + tableProperties ++= properties + tableProperties ++= filteredOptions + + // convert USING, LOCATION, and COMMENT clauses to table properties + tableProperties += ("provider" -> provider) + comment.map(text => tableProperties += ("comment" -> text)) + location.orElse(options.get("path")).map(loc => tableProperties += ("location" -> loc)) + + tableProperties.toMap + } + + def createAlterTable( + originalNameParts: Seq[String], + catalog: CatalogPlugin, + tableName: Seq[String], + changes: Seq[TableChange]): AlterTable = { + val tableCatalog = catalog.asTableCatalog + val ident = tableName.asIdentifier + val unresolved = UnresolvedV2Relation(originalNameParts, tableCatalog, ident) + AlterTable(tableCatalog, ident, unresolved, changes) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index f99ea61527b3..be4a7eba5910 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.connector.catalog -import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.TableIdentifier @@ -155,4 +154,20 @@ private[sql] trait LookupCatalog extends Logging { None } } + + /** + * Extract catalog and the rest name parts from a multi-part identifier. + */ + object CatalogAndIdentifierParts { + def unapply(nameParts: Seq[String]): Some[(CatalogPlugin, Seq[String])] = { + assert(nameParts.nonEmpty) + try { + Some((catalogManager.catalog(nameParts.head), nameParts.tail)) + } catch { + case _: CatalogNotFoundException => + // TODO (SPARK-29014): use current catalog here. + Some((defaultCatalog.getOrElse(sessionCatalog), nameParts)) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala new file mode 100644 index 000000000000..1f3b4319c0ea --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.sql._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, Table, TableChange, V1Table} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileDataSourceV2} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} + +/** + * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements + * to the corresponding v1 or v2 commands if the resolved catalog is the session catalog. + * + * We can remove this rule once we implement all the catalog functionality in `V2SessionCatalog`. + */ +class ResolveSessionCatalog( + val catalogManager: CatalogManager, + conf: SQLConf, + isView: Seq[String] => Boolean) + extends Rule[LogicalPlan] with LookupCatalog { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + import org.apache.spark.sql.connector.catalog.CatalogV2Util._ + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case AlterTableAddColumnsStatement( + nameParts @ SessionCatalog(catalog, tableName), cols) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + cols.foreach(c => assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")) + AlterTableAddColumnsCommand(tableName.asTableIdentifier, cols.map(convertToStructField)) + }.getOrElse { + val changes = cols.map { col => + TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull) + } + createAlterTable(nameParts, catalog, tableName, changes) + } + + case AlterTableAlterColumnStatement( + nameParts @ SessionCatalog(catalog, tableName), colName, dataType, comment) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + // TODO(SPARK-29353): we should fallback to the v1 `AlterTableChangeColumnCommand`. + throw new AnalysisException("ALTER COLUMN is only supported with v2 tables.") + }.getOrElse { + val typeChange = dataType.map { newDataType => + TableChange.updateColumnType(colName.toArray, newDataType, true) + } + val commentChange = comment.map { newComment => + TableChange.updateColumnComment(colName.toArray, newComment) + } + createAlterTable(nameParts, catalog, tableName, typeChange.toSeq ++ commentChange) + } + + case AlterTableRenameColumnStatement( + nameParts @ SessionCatalog(catalog, tableName), col, newName) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + throw new AnalysisException("RENAME COLUMN is only supported with v2 tables.") + }.getOrElse { + val changes = Seq(TableChange.renameColumn(col.toArray, newName)) + createAlterTable(nameParts, catalog, tableName, changes) + } + + case AlterTableDropColumnsStatement( + nameParts @ SessionCatalog(catalog, tableName), cols) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + throw new AnalysisException("DROP COLUMN is only supported with v2 tables.") + }.getOrElse { + val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) + createAlterTable(nameParts, catalog, tableName, changes) + } + + case AlterTableSetPropertiesStatement( + nameParts @ SessionCatalog(catalog, tableName), props) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + AlterTableSetPropertiesCommand(tableName.asTableIdentifier, props, isView = false) + }.getOrElse { + val changes = props.map { case (key, value) => + TableChange.setProperty(key, value) + }.toSeq + createAlterTable(nameParts, catalog, tableName, changes) + } + + case AlterTableUnsetPropertiesStatement( + nameParts @ SessionCatalog(catalog, tableName), keys, ifExists) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + AlterTableUnsetPropertiesCommand( + tableName.asTableIdentifier, keys, ifExists, isView = false) + }.getOrElse { + val changes = keys.map(key => TableChange.removeProperty(key)) + createAlterTable(nameParts, catalog, tableName, changes) + } + + case AlterTableSetLocationStatement( + nameParts @ SessionCatalog(catalog, tableName), newLoc) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + AlterTableSetLocationCommand(tableName.asTableIdentifier, None, newLoc) + }.getOrElse { + val changes = Seq(TableChange.setProperty("location", newLoc)) + createAlterTable(nameParts, catalog, tableName, changes) + } + + // ALTER VIEW should always use v1 command if the resolved catalog is session catalog. + case AlterViewSetPropertiesStatement(SessionCatalog(catalog, tableName), props) => + AlterTableSetPropertiesCommand(tableName.asTableIdentifier, props, isView = true) + + case AlterViewUnsetPropertiesStatement(SessionCatalog(catalog, tableName), keys, ifExists) => + AlterTableUnsetPropertiesCommand(tableName.asTableIdentifier, keys, ifExists, isView = true) + + case DeleteFromStatement( + nameParts @ SessionCatalog(catalog, tableName), tableAlias, condition) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + throw new AnalysisException("DELETE FROM is only supported with v2 tables.") + }.getOrElse { + val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier) + val aliased = tableAlias.map(SubqueryAlias(_, r)).getOrElse(r) + DeleteFromTable(aliased, condition) + } + + case DescribeTableStatement( + nameParts @ SessionCatalog(catalog, tableName), partitionSpec, isExtended) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + DescribeTableCommand(tableName.asTableIdentifier, partitionSpec, isExtended) + }.getOrElse { + // The v1 `DescribeTableCommand` can describe view as well. + if (isView(tableName)) { + DescribeTableCommand(tableName.asTableIdentifier, partitionSpec, isExtended) + } else { + if (partitionSpec.nonEmpty) { + throw new AnalysisException("DESCRIBE TABLE does not support partition for v2 tables.") + } + val r = UnresolvedV2Relation(nameParts, catalog.asTableCatalog, tableName.asIdentifier) + DescribeTable(r, isExtended) + } + } + + case DescribeColumnStatement(SessionCatalog(catalog, tableName), colNameParts, isExtended) => + loadTable(catalog, tableName.asIdentifier).collect { + case v1Table: V1Table => + DescribeColumnCommand(tableName.asTableIdentifier, colNameParts, isExtended) + }.getOrElse { + if (isView(tableName)) { + DescribeColumnCommand(tableName.asTableIdentifier, colNameParts, isExtended) + } else { + throw new AnalysisException("Describing columns is not supported for v2 tables.") + } + } + + // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the + // session catalog and the table provider is not v2. + case c @ CreateTableStatement( + SessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => + if (!isV2Provider(c.provider)) { + val tableDesc = buildCatalogTable(c.tableName.asTableIdentifier, c.tableSchema, + c.partitioning, c.bucketSpec, c.properties, c.provider, c.options, c.location, + c.comment, c.ifNotExists) + val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, None) + } else { + CreateV2Table( + catalog.asTableCatalog, + tableName.asIdentifier, + c.tableSchema, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider), + ignoreIfExists = c.ifNotExists) + } + + case c @ CreateTableAsSelectStatement( + SessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => + if (!isV2Provider(c.provider)) { + val tableDesc = buildCatalogTable(c.tableName.asTableIdentifier, new StructType, + c.partitioning, c.bucketSpec, c.properties, c.provider, c.options, c.location, + c.comment, c.ifNotExists) + val mode = if (c.ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists + CreateTable(tableDesc, mode, Some(c.asSelect)) + } else { + CreateTableAsSelect( + catalog.asTableCatalog, + tableName.asIdentifier, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + c.asSelect, + convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider), + writeOptions = c.options.filterKeys(_ != "path"), + ignoreIfExists = c.ifNotExists) + } + + // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the + // session catalog and the table provider is not v2. + case c @ ReplaceTableStatement( + SessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => + if (!isV2Provider(c.provider)) { + throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.") + } else { + ReplaceTable( + catalog.asTableCatalog, + tableName.asIdentifier, + c.tableSchema, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider), + orCreate = c.orCreate) + } + + case c @ ReplaceTableAsSelectStatement( + SessionCatalog(catalog, tableName), _, _, _, _, _, _, _, _, _) => + if (!isV2Provider(c.provider)) { + throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.") + } else { + ReplaceTableAsSelect( + catalog.asTableCatalog, + tableName.asIdentifier, + // convert the bucket spec and add it as a transform + c.partitioning ++ c.bucketSpec.map(_.asTransform), + c.asSelect, + convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider), + writeOptions = c.options.filterKeys(_ != "path"), + orCreate = c.orCreate) + } + + case d @ DropTableStatement(SessionCatalog(catalog, tableName), ifExists, purge) => + DropTableCommand(d.tableName.asTableIdentifier, ifExists, isView = false, purge = purge) + + case DropViewStatement(SessionCatalog(catalog, viewName), ifExists) => + DropTableCommand(viewName.asTableIdentifier, ifExists, isView = true, purge = false) + + case ShowNamespacesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) => + throw new AnalysisException( + "SHOW NAMESPACES is not supported with the session catalog.") + + // TODO (SPARK-29014): we should check if the current catalog is session catalog here. + case ShowNamespacesStatement(None, pattern) if defaultCatalog.isEmpty => + throw new AnalysisException( + "SHOW NAMESPACES is not supported with the session catalog.") + + case ShowTablesStatement(Some(SessionCatalog(catalog, nameParts)), pattern) => + if (nameParts.length != 1) { + throw new AnalysisException( + s"The database name is not valid: ${nameParts.quoted}") + } + ShowTablesCommand(Some(nameParts.head), pattern) + + // TODO (SPARK-29014): we should check if the current catalog is session catalog here. + case ShowTablesStatement(None, pattern) if defaultCatalog.isEmpty => + ShowTablesCommand(None, pattern) + } + + private def buildCatalogTable( + table: TableIdentifier, + schema: StructType, + partitioning: Seq[Transform], + bucketSpec: Option[BucketSpec], + properties: Map[String, String], + provider: String, + options: Map[String, String], + location: Option[String], + comment: Option[String], + ifNotExists: Boolean): CatalogTable = { + + val storage = DataSource.buildStorageFormatFromOptions(options) + if (location.isDefined && storage.locationUri.isDefined) { + throw new AnalysisException( + "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + + "you can only specify one of them.") + } + val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI)) + + val tableType = if (customLocation.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } + + CatalogTable( + identifier = table, + tableType = tableType, + storage = storage.copy(locationUri = customLocation), + schema = schema, + provider = Some(provider), + partitionColumnNames = partitioning.asPartitionColumns, + bucketSpec = bucketSpec, + properties = properties, + comment = comment) + } + + object SessionCatalog { + def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match { + case CatalogAndIdentifierParts(catalog, parts) if isSessionCatalog(catalog) => + Some(catalog -> parts) + case _ => None + } + } + + private def assertTopLevelColumn(colName: Seq[String], command: String): Unit = { + if (colName.length > 1) { + throw new AnalysisException(s"$command does not support nested column: ${colName.quoted}") + } + } + + private def convertToStructField(col: QualifiedColType): StructField = { + val builder = new MetadataBuilder + col.comment.foreach(builder.putString("comment", _)) + + val cleanedDataType = HiveStringType.replaceCharType(col.dataType) + if (col.dataType != cleanedDataType) { + builder.putString(HIVE_TYPE_STRING, col.dataType.catalogString) + } + + StructField( + col.name.head, + cleanedDataType, + nullable = true, + builder.build()) + } + + private def isV2Provider(provider: String): Boolean = { + DataSource.lookupDataSourceV2(provider, conf) match { + // TODO(SPARK-28396): Currently file source v2 can't work with tables. + case Some(_: FileDataSourceV2) => false + case Some(_) => true + case _ => false + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala deleted file mode 100644 index 33e1ecd359ad..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import scala.collection.mutable - -import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables, SubqueryAlias} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement, UseStatement} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog, TableCatalog} -import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} -import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} - -case class DataSourceResolution( - conf: SQLConf, - catalogManager: CatalogManager) - extends Rule[LogicalPlan] with CastSupport with LookupCatalog { - - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case CreateTableStatement( - AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, - V1Provider(provider), options, location, comment, ifNotExists) => - // the source is v1, the identifier has no catalog, and there is no default v2 catalog - val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, - provider, options, location, comment, ifNotExists) - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - CreateTable(tableDesc, mode, None) - - case create: CreateTableStatement => - // the provider was not a v1 source or a v2 catalog is the default, convert to a v2 plan - val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName - maybeCatalog match { - case Some(catalog) => - // the identifier had a catalog, or there is a default v2 catalog - convertCreateTable(catalog.asTableCatalog, identifier, create) - case _ => - // the identifier had no catalog and no default catalog is set, but the source is v2. - // use the v2 session catalog, which delegates to the global v1 session catalog - convertCreateTable(sessionCatalog.asTableCatalog, identifier, create) - } - - case CreateTableAsSelectStatement( - AsTableIdentifier(table), query, partitionCols, bucketSpec, properties, - V1Provider(provider), options, location, comment, ifNotExists) => - // the source is v1, the identifier has no catalog, and there is no default v2 catalog - val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, - properties, provider, options, location, comment, ifNotExists) - val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists - - CreateTable(tableDesc, mode, Some(query)) - - case create: CreateTableAsSelectStatement => - // the provider was not a v1 source or a v2 catalog is the default, convert to a v2 plan - val CatalogObjectIdentifier(maybeCatalog, identifier) = create.tableName - maybeCatalog match { - case Some(catalog) => - // the identifier had a catalog, or there is a default v2 catalog - convertCTAS(catalog.asTableCatalog, identifier, create) - case _ => - // the identifier had no catalog and no default catalog is set, but the source is v2. - // use the v2 session catalog, which delegates to the global v1 session catalog - convertCTAS(sessionCatalog.asTableCatalog, identifier, create) - } - - case DescribeColumnStatement( - AsTableIdentifier(tableName), colName, isExtended) => - DescribeColumnCommand(tableName, colName, isExtended) - - case DescribeColumnStatement( - CatalogObjectIdentifier(Some(catalog), ident), colName, isExtended) => - throw new AnalysisException("Describing columns is not supported for v2 tables.") - - case DescribeTableStatement( - AsTableIdentifier(tableName), partitionSpec, isExtended) => - DescribeTableCommand(tableName, partitionSpec, isExtended) - - case ReplaceTableStatement( - AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, - V1Provider(provider), options, location, comment, orCreate) => - throw new AnalysisException( - s"Replacing tables is not supported using the legacy / v1 Spark external catalog" + - s" API. Write provider name: $provider, identifier: $table.") - - case ReplaceTableAsSelectStatement( - AsTableIdentifier(table), query, partitionCols, bucketSpec, properties, - V1Provider(provider), options, location, comment, orCreate) => - throw new AnalysisException( - s"Replacing tables is not supported using the legacy / v1 Spark external catalog" + - s" API. Write provider name: $provider, identifier: $table.") - - case replace: ReplaceTableStatement => - // the provider was not a v1 source, convert to a v2 plan - val CatalogObjectIdentifier(maybeCatalog, identifier) = replace.tableName - val catalog = maybeCatalog.getOrElse(sessionCatalog).asTableCatalog - convertReplaceTable(catalog, identifier, replace) - - case rtas: ReplaceTableAsSelectStatement => - // the provider was not a v1 source, convert to a v2 plan - val CatalogObjectIdentifier(maybeCatalog, identifier) = rtas.tableName - val catalog = maybeCatalog.getOrElse(sessionCatalog).asTableCatalog - convertRTAS(catalog, identifier, rtas) - - case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) => - DropTable(catalog.asTableCatalog, ident, ifExists) - - case DropTableStatement(AsTableIdentifier(tableName), ifExists, purge) => - DropTableCommand(tableName, ifExists, isView = false, purge) - - case DropViewStatement(CatalogObjectIdentifier(Some(catalog), ident), _) => - throw new AnalysisException( - s"Can not specify catalog `${catalog.name}` for view $ident " + - s"because view support in catalog has not been implemented yet") - - case DropViewStatement(AsTableIdentifier(tableName), ifExists) => - DropTableCommand(tableName, ifExists, isView = true, purge = false) - - case AlterTableSetPropertiesStatement(AsTableIdentifier(table), properties) => - AlterTableSetPropertiesCommand(table, properties, isView = false) - - case AlterViewSetPropertiesStatement(AsTableIdentifier(table), properties) => - AlterTableSetPropertiesCommand(table, properties, isView = true) - - case AlterTableUnsetPropertiesStatement(AsTableIdentifier(table), propertyKeys, ifExists) => - AlterTableUnsetPropertiesCommand(table, propertyKeys, ifExists, isView = false) - - case AlterViewUnsetPropertiesStatement(AsTableIdentifier(table), propertyKeys, ifExists) => - AlterTableUnsetPropertiesCommand(table, propertyKeys, ifExists, isView = true) - - case AlterTableSetLocationStatement(AsTableIdentifier(table), newLocation) => - AlterTableSetLocationCommand(table, None, newLocation) - - case AlterTableAddColumnsStatement(AsTableIdentifier(table), newColumns) - if newColumns.forall(_.name.size == 1) => - // only top-level adds are supported using AlterTableAddColumnsCommand - AlterTableAddColumnsCommand(table, newColumns.map(convertToStructField)) - - case DeleteFromStatement(AsTableIdentifier(table), tableAlias, condition) => - throw new AnalysisException( - s"Delete from tables is not supported using the legacy / v1 Spark external catalog" + - s" API. Identifier: $table.") - - case delete: DeleteFromStatement => - val relation = UnresolvedRelation(delete.tableName) - val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation) - DeleteFromTable(aliased, delete.condition) - - case ShowNamespacesStatement(None, pattern) => - defaultCatalog match { - case Some(catalog) => - ShowNamespaces(catalog.asNamespaceCatalog, None, pattern) - case None => - throw new AnalysisException("No default v2 catalog is set.") - } - - case ShowNamespacesStatement(Some(namespace), pattern) => - val DefaultCatalogAndNamespace(maybeCatalog, ns) = namespace - maybeCatalog match { - case Some(catalog) => - ShowNamespaces(catalog.asNamespaceCatalog, Some(ns), pattern) - case None => - throw new AnalysisException( - s"No v2 catalog is available for ${namespace.quoted}") - } - - case update: UpdateTableStatement => - throw new AnalysisException(s"Update table is not supported temporarily.") - - case ShowTablesStatement(None, pattern) => - defaultCatalog match { - case Some(catalog) => - ShowTables( - catalog.asTableCatalog, - catalogManager.currentNamespace, - pattern) - case None => - ShowTablesCommand(None, pattern) - } - - case ShowTablesStatement(Some(namespace), pattern) => - val DefaultCatalogAndNamespace(maybeCatalog, ns) = namespace - maybeCatalog match { - case Some(catalog) => - ShowTables(catalog.asTableCatalog, ns, pattern) - case None => - if (namespace.length != 1) { - throw new AnalysisException( - s"The database name is not valid: ${namespace.quoted}") - } - ShowTablesCommand(Some(namespace.quoted), pattern) - } - - case UseStatement(isNamespaceSet, nameParts) => - if (isNamespaceSet) { - SetCatalogAndNamespace(catalogManager, None, Some(nameParts)) - } else { - val CurrentCatalogAndNamespace(catalog, namespace) = nameParts - val ns = if (namespace.isEmpty) { None } else { Some(namespace) } - SetCatalogAndNamespace(catalogManager, Some(catalog.name()), ns) - } - } - - object V1Provider { - def unapply(provider: String): Option[String] = { - DataSource.lookupDataSourceV2(provider, conf) match { - // TODO(SPARK-28396): Currently file source v2 can't work with tables. - case Some(_: FileDataSourceV2) => Some(provider) - case Some(_) => None - case _ => Some(provider) - } - } - } - - private def buildCatalogTable( - table: TableIdentifier, - schema: StructType, - partitioning: Seq[Transform], - bucketSpec: Option[BucketSpec], - properties: Map[String, String], - provider: String, - options: Map[String, String], - location: Option[String], - comment: Option[String], - ifNotExists: Boolean): CatalogTable = { - - val storage = DataSource.buildStorageFormatFromOptions(options) - if (location.isDefined && storage.locationUri.isDefined) { - throw new AnalysisException( - "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + - "you can only specify one of them.") - } - val customLocation = storage.locationUri.orElse(location.map(CatalogUtils.stringToURI)) - - val tableType = if (customLocation.isDefined) { - CatalogTableType.EXTERNAL - } else { - CatalogTableType.MANAGED - } - - CatalogTable( - identifier = table, - tableType = tableType, - storage = storage.copy(locationUri = customLocation), - schema = schema, - provider = Some(provider), - partitionColumnNames = partitioning.asPartitionColumns, - bucketSpec = bucketSpec, - properties = properties, - comment = comment) - } - - private def convertCTAS( - catalog: TableCatalog, - identifier: Identifier, - ctas: CreateTableAsSelectStatement): CreateTableAsSelect = { - // convert the bucket spec and add it as a transform - val partitioning = ctas.partitioning ++ ctas.bucketSpec.map(_.asTransform) - val properties = convertTableProperties( - ctas.properties, ctas.options, ctas.location, ctas.comment, ctas.provider) - - CreateTableAsSelect( - catalog, - identifier, - partitioning, - ctas.asSelect, - properties, - writeOptions = ctas.options.filterKeys(_ != "path"), - ignoreIfExists = ctas.ifNotExists) - } - - private def convertCreateTable( - catalog: TableCatalog, - identifier: Identifier, - create: CreateTableStatement): CreateV2Table = { - // convert the bucket spec and add it as a transform - val partitioning = create.partitioning ++ create.bucketSpec.map(_.asTransform) - val properties = convertTableProperties( - create.properties, create.options, create.location, create.comment, create.provider) - - CreateV2Table( - catalog, - identifier, - create.tableSchema, - partitioning, - properties, - ignoreIfExists = create.ifNotExists) - } - - private def convertRTAS( - catalog: TableCatalog, - identifier: Identifier, - rtas: ReplaceTableAsSelectStatement): ReplaceTableAsSelect = { - // convert the bucket spec and add it as a transform - val partitioning = rtas.partitioning ++ rtas.bucketSpec.map(_.asTransform) - val properties = convertTableProperties( - rtas.properties, rtas.options, rtas.location, rtas.comment, rtas.provider) - - ReplaceTableAsSelect( - catalog, - identifier, - partitioning, - rtas.asSelect, - properties, - writeOptions = rtas.options.filterKeys(_ != "path"), - orCreate = rtas.orCreate) - } - - private def convertReplaceTable( - catalog: TableCatalog, - identifier: Identifier, - replace: ReplaceTableStatement): ReplaceTable = { - // convert the bucket spec and add it as a transform - val partitioning = replace.partitioning ++ replace.bucketSpec.map(_.asTransform) - val properties = convertTableProperties( - replace.properties, replace.options, replace.location, replace.comment, replace.provider) - - ReplaceTable( - catalog, - identifier, - replace.tableSchema, - partitioning, - properties, - orCreate = replace.orCreate) - } - - private def convertTableProperties( - properties: Map[String, String], - options: Map[String, String], - location: Option[String], - comment: Option[String], - provider: String): Map[String, String] = { - if (options.contains("path") && location.isDefined) { - throw new AnalysisException( - "LOCATION and 'path' in OPTIONS are both used to indicate the custom table path, " + - "you can only specify one of them.") - } - - if ((options.contains("comment") || properties.contains("comment")) - && comment.isDefined) { - throw new AnalysisException( - "COMMENT and option/property 'comment' are both used to set the table comment, you can " + - "only specify one of them.") - } - - if (options.contains("provider") || properties.contains("provider")) { - throw new AnalysisException( - "USING and option/property 'provider' are both used to set the provider implementation, " + - "you can only specify one of them.") - } - - val filteredOptions = options.filterKeys(_ != "path") - - // create table properties from TBLPROPERTIES and OPTIONS clauses - val tableProperties = new mutable.HashMap[String, String]() - tableProperties ++= properties - tableProperties ++= filteredOptions - - // convert USING, LOCATION, and COMMENT clauses to table properties - tableProperties += ("provider" -> provider) - comment.map(text => tableProperties += ("comment" -> text)) - location.orElse(options.get("path")).map(loc => tableProperties += ("location" -> loc)) - - tableProperties.toMap - } - - private def convertToStructField(col: QualifiedColType): StructField = { - val builder = new MetadataBuilder - col.comment.foreach(builder.putString("comment", _)) - - val cleanedDataType = HiveStringType.replaceCharType(col.dataType) - if (col.dataType != cleanedDataType) { - builder.putString(HIVE_TYPE_STRING, col.dataType.catalogString) - } - - StructField( - col.name.head, - cleanedDataType, - nullable = true, - builder.build()) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 1e1e86be1502..17ef9019b520 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.internal import org.apache.spark.SparkConf import org.apache.spark.annotation.Unstable import org.apache.spark.sql.{ExperimentalMethods, SparkSession, UDFRegistration, _} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, ResolveSessionCatalog} import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface @@ -174,7 +174,7 @@ abstract class BaseSessionStateBuilder( new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf, this.catalogManager) +: + new ResolveSessionCatalog(catalogManager, conf, catalog.isView) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 03cefa0d2e77..eed07aeff090 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -50,7 +50,7 @@ trait AlterTableTests extends SharedSparkSession { } assert(exc.getMessage.contains(s"${catalogAndNamespace}table_name")) - assert(exc.getMessage.contains("Table or view not found")) + assert(exc.getMessage.contains("Table not found")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ff3d828ff312..ddb8938cea90 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG +import org.apache.spark.sql.sources.SimpleScanSource import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -784,7 +785,8 @@ class DataSourceV2SQLSuite sql("SHOW NAMESPACES") } - assert(exception.getMessage.contains("No default v2 catalog is set")) + assert(exception.getMessage.contains( + "SHOW NAMESPACES is not supported with the session catalog")) } test("ShowNamespaces: default v2 catalog doesn't support namespace") { @@ -812,12 +814,13 @@ class DataSourceV2SQLSuite assert(exception.getMessage.contains("does not support namespaces")) } - test("ShowNamespaces: no v2 catalog is available") { + test("ShowNamespaces: session catalog") { val exception = intercept[AnalysisException] { sql("SHOW NAMESPACES in dummy") } - assert(exception.getMessage.contains("No v2 catalog is available")) + assert(exception.getMessage.contains( + "SHOW NAMESPACES is not supported with the session catalog")) } private def testShowNamespaces( @@ -1053,6 +1056,13 @@ class DataSourceV2SQLSuite } } + test("REPLACE TABLE: v1 table") { + val e = intercept[AnalysisException] { + sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}") + } + assert(e.message.contains("REPLACE TABLE is only supported with v2 tables")) + } + test("DeleteFrom: basic - delete all") { val t = "testcat.ns1.ns2.tbl" withTable(t) { @@ -1128,7 +1138,7 @@ class DataSourceV2SQLSuite |(4L, 'Frank', 33, 3) """.stripMargin) } - val errMsg = "Update table is not supported temporarily" + val errMsg = "UPDATE TABLE is not supported temporarily" testCreateAnalysisError( s"UPDATE $t SET name='Robert', age=32", errMsg diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index ff84b0571367..8ff293146127 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -136,12 +136,14 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { assertNoSuchTable(s"ALTER TABLE $viewName SET SERDE 'whatever'") assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a=1, b=2) SET SERDE 'whatever'") assertNoSuchTable(s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')") - assertNoSuchTable(s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'") assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'") assertNoSuchTable(s"ALTER TABLE $viewName ADD IF NOT EXISTS PARTITION (a='4', b='8')") assertNoSuchTable(s"ALTER TABLE $viewName DROP PARTITION (a='4', b='8')") assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')") assertNoSuchTable(s"ALTER TABLE $viewName RECOVER PARTITIONS") + + // For v2 ALTER TABLE statements, we have better error message saying view is not supported. + assertViewNotSupported(s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'") } } @@ -175,6 +177,11 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } + private def assertViewNotSupported(query: String): Unit = { + val e = intercept[AnalysisException](sql(query)) + assert(e.message.contains("'testView' is a view not a table")) + } + test("error handling: insert/load/truncate table commands against a view") { val viewName = "testView" withView(viewName) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f30300b5aa25..70b1db8e5f0d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2661,7 +2661,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val e = intercept[AnalysisException] { sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)") } - assert(e.message.contains("ALTER ADD COLUMNS does not support views")) + assert(e.message.contains("'tmp_v' is a view not a table")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 7110f13e2ead..104c845bfcc1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -26,34 +26,63 @@ import org.mockito.invocation.InvocationOnMock import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.AnalysisTest -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolveCatalogs, ResolveSessionCatalog, UnresolvedV2Relation} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} -import org.apache.spark.sql.connector.{InMemoryTableCatalog, InMemoryTableProvider} -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, TableCatalog} -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} -import org.apache.spark.sql.internal.SQLConf.DEFAULT_V2_CATALOG +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelect, CreateV2Table, DescribeTable, DropTable, LogicalPlan} +import org.apache.spark.sql.connector.InMemoryTableProvider +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap class PlanResolutionSuite extends AnalysisTest { import CatalystSqlParser._ private val v2Format = classOf[InMemoryTableProvider].getName + private val table: Table = { + val t = mock(classOf[Table]) + when(t.schema()).thenReturn(new StructType().add("i", "int")) + t + } + private val testCat: TableCatalog = { - val newCatalog = new InMemoryTableCatalog - newCatalog.initialize("testcat", CaseInsensitiveStringMap.empty()) + val newCatalog = mock(classOf[TableCatalog]) + when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => { + invocation.getArgument[Identifier](0).name match { + case "tab" => + table + case name => + throw new NoSuchTableException(name) + } + }) + when(newCatalog.name()).thenReturn("testcat") newCatalog } - private val v2SessionCatalog = { - val newCatalog = new InMemoryTableCatalog - newCatalog.initialize("session", CaseInsensitiveStringMap.empty()) + private val v2SessionCatalog: TableCatalog = { + val newCatalog = mock(classOf[TableCatalog]) + when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => { + invocation.getArgument[Identifier](0).name match { + case "v1Table" => + mock(classOf[V1Table]) + case "v2Table" => + table + case name => + throw new NoSuchTableException(name) + } + }) + when(newCatalog.name()).thenReturn("session") newCatalog } + private val v1SessionCatalog: SessionCatalog = new SessionCatalog( + new InMemoryCatalog, + EmptyFunctionRegistry, + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) + private val catalogManagerWithDefault = { val manager = mock(classOf[CatalogManager]) when(manager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => { @@ -66,6 +95,7 @@ class PlanResolutionSuite extends AnalysisTest { }) when(manager.defaultCatalog).thenReturn(Some(testCat)) when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog) + when(manager.v1SessionCatalog).thenReturn(v1SessionCatalog) manager } @@ -81,18 +111,24 @@ class PlanResolutionSuite extends AnalysisTest { }) when(manager.defaultCatalog).thenReturn(None) when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog) + when(manager.v1SessionCatalog).thenReturn(v1SessionCatalog) manager } def parseAndResolve(query: String, withDefault: Boolean = false): LogicalPlan = { - val newConf = conf.copy() - newConf.setConfString(DEFAULT_V2_CATALOG.key, "testcat") val catalogManager = if (withDefault) { catalogManagerWithDefault } else { catalogManagerWithoutDefault } - DataSourceResolution(newConf, catalogManager).apply(parsePlan(query)) + val analyzer = new Analyzer(catalogManager, conf) + val rules = Seq( + new ResolveCatalogs(catalogManager), + new ResolveSessionCatalog(catalogManager, conf, _ == Seq("v")), + analyzer.ResolveTables) + rules.foldLeft(parsePlan(query)) { + case (plan, rule) => rule.apply(plan) + } } private def parseResolveCompare(query: String, expected: LogicalPlan): Unit = @@ -642,51 +678,166 @@ class PlanResolutionSuite extends AnalysisTest { // ALTER TABLE table_name SET TBLPROPERTIES ('comment' = new_comment); // ALTER TABLE table_name UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key'); test("alter table: alter table properties") { - val sql1_table = "ALTER TABLE table_name SET TBLPROPERTIES ('test' = 'test', " + - "'comment' = 'new_comment')" - val sql2_table = "ALTER TABLE table_name UNSET TBLPROPERTIES ('comment', 'test')" - val sql3_table = "ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'test')" + Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach { + case (tblName, useV1Command) => + val sql1 = s"ALTER TABLE $tblName SET TBLPROPERTIES ('test' = 'test', " + + "'comment' = 'new_comment')" + val sql2 = s"ALTER TABLE $tblName UNSET TBLPROPERTIES ('comment', 'test')" + val sql3 = s"ALTER TABLE $tblName UNSET TBLPROPERTIES IF EXISTS ('comment', 'test')" + + val parsed1 = parseAndResolve(sql1) + val parsed2 = parseAndResolve(sql2) + val parsed3 = parseAndResolve(sql3) + + val tableIdent = TableIdentifier(tblName, None) + if (useV1Command) { + val expected1 = AlterTableSetPropertiesCommand( + tableIdent, Map("test" -> "test", "comment" -> "new_comment"), isView = false) + val expected2 = AlterTableUnsetPropertiesCommand( + tableIdent, Seq("comment", "test"), ifExists = false, isView = false) + val expected3 = AlterTableUnsetPropertiesCommand( + tableIdent, Seq("comment", "test"), ifExists = true, isView = false) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } else { + parsed1 match { + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( + TableChange.setProperty("test", "test"), + TableChange.setProperty("comment", "new_comment"))) + case _ => fail("expect AlterTable") + } + + parsed2 match { + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( + TableChange.removeProperty("comment"), + TableChange.removeProperty("test"))) + case _ => fail("expect AlterTable") + } + + parsed3 match { + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( + TableChange.removeProperty("comment"), + TableChange.removeProperty("test"))) + case _ => fail("expect AlterTable") + } + } + } - val parsed1_table = parseAndResolve(sql1_table) - val parsed2_table = parseAndResolve(sql2_table) - val parsed3_table = parseAndResolve(sql3_table) + val sql4 = "ALTER TABLE non_exist SET TBLPROPERTIES ('test' = 'test')" + val sql5 = "ALTER TABLE non_exist UNSET TBLPROPERTIES ('test')" + val parsed4 = parseAndResolve(sql4) + val parsed5 = parseAndResolve(sql5) - val tableIdent = TableIdentifier("table_name", None) - val expected1_table = AlterTableSetPropertiesCommand( - tableIdent, Map("test" -> "test", "comment" -> "new_comment"), isView = false) - val expected2_table = AlterTableUnsetPropertiesCommand( - tableIdent, Seq("comment", "test"), ifExists = false, isView = false) - val expected3_table = AlterTableUnsetPropertiesCommand( - tableIdent, Seq("comment", "test"), ifExists = true, isView = false) - - comparePlans(parsed1_table, expected1_table) - comparePlans(parsed2_table, expected2_table) - comparePlans(parsed3_table, expected3_table) + // For non-existing tables, we convert it to v2 command with `UnresolvedV2Table` + parsed4 match { + case AlterTable(_, _, _: UnresolvedV2Relation, _) => // OK + case _ => fail("unexpected plan:\n" + parsed4.treeString) + } + parsed5 match { + case AlterTable(_, _, _: UnresolvedV2Relation, _) => // OK + case _ => fail("unexpected plan:\n" + parsed5.treeString) + } } test("support for other types in TBLPROPERTIES") { - val sql = - """ - |ALTER TABLE table_name - |SET TBLPROPERTIES ('a' = 1, 'b' = 0.1, 'c' = TRUE) - """.stripMargin - val parsed = parseAndResolve(sql) - val expected = AlterTableSetPropertiesCommand( - TableIdentifier("table_name"), - Map("a" -> "1", "b" -> "0.1", "c" -> "true"), - isView = false) - - comparePlans(parsed, expected) + Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach { + case (tblName, useV1Command) => + val sql = + s""" + |ALTER TABLE $tblName + |SET TBLPROPERTIES ('a' = 1, 'b' = 0.1, 'c' = TRUE) + """.stripMargin + val parsed = parseAndResolve(sql) + if (useV1Command) { + val expected = AlterTableSetPropertiesCommand( + TableIdentifier(tblName), + Map("a" -> "1", "b" -> "0.1", "c" -> "true"), + isView = false) + + comparePlans(parsed, expected) + } else { + parsed match { + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( + TableChange.setProperty("a", "1"), + TableChange.setProperty("b", "0.1"), + TableChange.setProperty("c", "true"))) + case _ => fail("expect AlterTable") + } + } + } } test("alter table: set location") { - val sql1 = "ALTER TABLE table_name SET LOCATION 'new location'" - val parsed1 = parseAndResolve(sql1) - val tableIdent = TableIdentifier("table_name", None) - val expected1 = AlterTableSetLocationCommand( - tableIdent, - None, - "new location") - comparePlans(parsed1, expected1) + Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach { + case (tblName, useV1Command) => + val sql = s"ALTER TABLE $tblName SET LOCATION 'new location'" + val parsed = parseAndResolve(sql) + if (useV1Command) { + val expected = AlterTableSetLocationCommand( + TableIdentifier(tblName, None), + None, + "new location") + comparePlans(parsed, expected) + } else { + parsed match { + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq(TableChange.setProperty("location", "new location"))) + case _ => fail("expect AlterTable") + } + } + } + } + + test("describe table") { + Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach { + case (tblName, useV1Command) => + val sql1 = s"DESC TABLE $tblName" + val sql2 = s"DESC TABLE EXTENDED $tblName" + val parsed1 = parseAndResolve(sql1) + val parsed2 = parseAndResolve(sql2) + if (useV1Command) { + val expected1 = DescribeTableCommand(TableIdentifier(tblName, None), Map.empty, false) + val expected2 = DescribeTableCommand(TableIdentifier(tblName, None), Map.empty, true) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } else { + parsed1 match { + case DescribeTable(_: DataSourceV2Relation, isExtended) => + assert(!isExtended) + case _ => fail("expect DescribeTable") + } + + parsed2 match { + case DescribeTable(_: DataSourceV2Relation, isExtended) => + assert(isExtended) + case _ => fail("expect DescribeTable") + } + } + + val sql3 = s"DESC TABLE $tblName PARTITION(a=1)" + if (useV1Command) { + val parsed3 = parseAndResolve(sql3) + val expected3 = DescribeTableCommand( + TableIdentifier(tblName, None), Map("a" -> "1"), false) + comparePlans(parsed3, expected3) + } else { + val e = intercept[AnalysisException](parseAndResolve(sql3)) + assert(e.message.contains("DESCRIBE TABLE does not support partition for v2 tables")) + } + } + + // use v1 command to describe views. + val sql4 = "DESC TABLE v" + val parsed4 = parseAndResolve(sql4) + assert(parsed4.isInstanceOf[DescribeTableCommand]) } + + // TODO: add tests for more commands. } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 75b21a4458fd..3df77fec2099 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import org.apache.spark.annotation.Unstable import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule @@ -73,7 +73,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: - DataSourceResolution(conf, this.catalogManager) +: + new ResolveSessionCatalog(catalogManager, conf, catalog.isView) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =