diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5317af494d8e..89cd8e9648b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -755,12 +755,14 @@ class Analyzer( .map(view => i.copy(table = view)) .getOrElse(i) case u @ UnresolvedTable(ident) => - lookupTempView(ident).foreach { _ => - u.failAnalysis(s"${ident.quoted} is a temp view not table.") - } - u + lookupTempView(ident) + .map(_ => UnresolvedTableWithViewExists( + ResolvedView(ident.asIdentifier, isTempView = true))) + .getOrElse(u) case u @ UnresolvedTableOrView(ident) => - lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u) + lookupTempView(ident) + .map(_ => ResolvedView(ident.asIdentifier, isTempView = true)) + .getOrElse(u) } def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = { @@ -814,14 +816,6 @@ class Analyzer( lookupV2Relation(u.multipartIdentifier) .map(v2Relation => i.copy(table = v2Relation)) .getOrElse(i) - - case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) => - CatalogV2Util.loadRelation(u.catalog, u.tableName) - .map(rel => alter.copy(table = rel)) - .getOrElse(alter) - - case u: UnresolvedV2Relation => - CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u) } /** @@ -888,8 +882,7 @@ class Analyzer( case u @ UnresolvedTable(identifier) => lookupTableOrView(identifier).map { - case v: ResolvedView => - u.failAnalysis(s"${v.identifier.quoted} is a view not table.") + case v: ResolvedView => UnresolvedTableWithViewExists(v) case table => table }.getOrElse(u) @@ -902,7 +895,7 @@ class Analyzer( case SessionCatalogAndIdentifier(catalog, ident) => CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW => - ResolvedView(ident) + ResolvedView(ident, isTempView = false) case table => ResolvedTable(catalog.asTableCatalog, ident, table) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 1d44c84f3123..65a38aacecd4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -86,6 +87,20 @@ trait CheckAnalysis extends PredicateHelper { } def checkAnalysis(plan: LogicalPlan): Unit = { + // Analysis that needs to be performed top down can be added here. + plan.foreach { + case p if p.analyzed => // Skip already analyzed sub-plans + + case alter: AlterTable => + alter.table match { + case u @ UnresolvedTableWithViewExists(view) if !view.isTempView => + u.failAnalysis("Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead") + case _ => + } + + case _ => // Analysis successful! + } + // We transform up and order the rules so as to catch the first possible failure instead // of the result of cascading resolution failures. plan.foreachUp { @@ -104,23 +119,13 @@ trait CheckAnalysis extends PredicateHelper { case u: UnresolvedRelation => u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}") + case u: UnresolvedTableWithViewExists => + val viewKind = if (u.view.isTempView) { "temp view" } else { "view" } + u.failAnalysis(s"${u.view.identifier.quoted} is a $viewKind not a table.") + case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) => failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}") - case u: UnresolvedV2Relation if isView(u.originalNameParts) => - u.failAnalysis( - s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") - - case u: UnresolvedV2Relation => - u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") - - case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) => - u.failAnalysis( - s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") - - case AlterTable(_, _, u: UnresolvedV2Relation, _) => - failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") - case operator: LogicalPlan => // Check argument data types of higher-order functions downwards first. // If the arguments of the higher-order functions are resolved but the type check fails, @@ -419,8 +424,9 @@ trait CheckAnalysis extends PredicateHelper { case _ => } - case alter: AlterTable if alter.childrenResolved => - val table = alter.table + case alter: AlterTable + if alter.childrenResolved && alter.table.isInstanceOf[ResolvedTable] => + val table = alter.table.asInstanceOf[ResolvedTable].table def findField(operation: String, fieldName: Array[String]): StructField = { // include collections because structs nested in maps and arrays may be altered val field = table.schema.findNestedField(fieldName, includeCollections = true) @@ -469,6 +475,8 @@ trait CheckAnalysis extends PredicateHelper { throw new AnalysisException( s"Cannot change nullable column to non-nullable: $fieldName") } + case update: UpdateColumnPosition => + findField("update", update.fieldNames) case rename: RenameColumn => findField("rename", rename.fieldNames) case update: UpdateColumnComment => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index 88a3c0a73a10..a44877fc1b4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog} /** * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements @@ -32,71 +32,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager) import org.apache.spark.sql.connector.catalog.CatalogV2Util._ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case AlterTableAddColumnsStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => - val changes = cols.map { col => - TableChange.addColumn( - col.name.toArray, - col.dataType, - col.nullable, - col.comment.orNull, - col.position.orNull) - } - createAlterTable(nameParts, catalog, tbl, changes) - - case a @ AlterTableAlterColumnStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => - val colName = a.column.toArray - val typeChange = a.dataType.map { newDataType => - TableChange.updateColumnType(colName, newDataType) - } - val nullabilityChange = a.nullable.map { nullable => - TableChange.updateColumnNullability(colName, nullable) - } - val commentChange = a.comment.map { newComment => - TableChange.updateColumnComment(colName, newComment) - } - val positionChange = a.position.map { newPosition => - TableChange.updateColumnPosition(colName, newPosition) - } - createAlterTable( - nameParts, - catalog, - tbl, - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange) - - case AlterTableRenameColumnStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) => - val changes = Seq(TableChange.renameColumn(col.toArray, newName)) - createAlterTable(nameParts, catalog, tbl, changes) - - case AlterTableDropColumnsStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => - val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) - createAlterTable(nameParts, catalog, tbl, changes) - - case AlterTableSetPropertiesStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), props) => - val changes = props.map { case (key, value) => - TableChange.setProperty(key, value) - }.toSeq - createAlterTable(nameParts, catalog, tbl, changes) - - // TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag. - case AlterTableUnsetPropertiesStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), keys, _) => - val changes = keys.map(key => TableChange.removeProperty(key)) - createAlterTable(nameParts, catalog, tbl, changes) - - case AlterTableSetLocationStatement( - nameParts @ NonSessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) => - if (partitionSpec.nonEmpty) { - throw new AnalysisException( - "ALTER TABLE SET LOCATION does not support partition for v2 tables.") - } - val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc)) - createAlterTable(nameParts, catalog, tbl, changes) - case AlterViewSetPropertiesStatement( NonSessionCatalogAndTable(catalog, tbl), props) => throw new AnalysisException( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 608f39c2d86f..c7d977a3d4a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.types.{DataType, Metadata, StructType} /** @@ -60,28 +59,6 @@ object UnresolvedRelation { UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table) } -/** - * A variant of [[UnresolvedRelation]] which can only be resolved to a v2 relation - * (`DataSourceV2Relation`), not v1 relation or temp view. - * - * @param originalNameParts the original table identifier name parts before catalog is resolved. - * @param catalog The catalog which the table should be looked up from. - * @param tableName The name of the table to look up. - */ -case class UnresolvedV2Relation( - originalNameParts: Seq[String], - catalog: TableCatalog, - tableName: Identifier) - extends LeafNode with NamedRelation { - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - - override def name: String = originalNameParts.quoted - - override def output: Seq[Attribute] = Nil - - override lazy val resolved = false -} - /** * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 239f987e97a7..c6300b0bb079 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, Table, TableCatalog} /** @@ -41,6 +41,16 @@ case class UnresolvedTable(multipartIdentifier: Seq[String]) extends LeafNode { override def output: Seq[Attribute] = Nil } +/** + * Holds the resolved view. It is used in a scenario where table is expected but the identifier + * is resolved to a (temp) view. + */ +case class UnresolvedTableWithViewExists(view: ResolvedView) extends LeafNode { + override lazy val resolved: Boolean = false + + override def output: Seq[Attribute] = Nil +} + /** * Holds the name of a table or view that has yet to be looked up in a catalog. It will * be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis. @@ -71,6 +81,6 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T */ // TODO: create a generic representation for temp view, v1 view and v2 view, after we add view // support to v2 catalog. For now we only need the identifier to fallback to v1 command. -case class ResolvedView(identifier: Identifier) extends LeafNode { +case class ResolvedView(identifier: Identifier, isTempView: Boolean) extends LeafNode { override def output: Seq[Attribute] = Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f744787c9082..d09c53ed919c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2907,7 +2907,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Parse a [[AlterTableAddColumnsStatement]] command. + * Parse a [[AlterTableAddColumns]] command. * * For example: * {{{ @@ -2916,14 +2916,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) { - AlterTableAddColumnsStatement( - visitMultipartIdentifier(ctx.multipartIdentifier), + AlterTableAddColumns( + UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)), ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]) ) } /** - * Parse a [[AlterTableRenameColumnStatement]] command. + * Parse a [[AlterTableRenameColumn]] command. * * For example: * {{{ @@ -2932,14 +2932,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitRenameTableColumn( ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) { - AlterTableRenameColumnStatement( - visitMultipartIdentifier(ctx.table), + AlterTableRenameColumn( + UnresolvedTable(visitMultipartIdentifier(ctx.table)), ctx.from.parts.asScala.map(_.getText), ctx.to.getText) } /** - * Parse a [[AlterTableAlterColumnStatement]] command. + * Parse a [[AlterTableAlterColumn]] command. * * For example: * {{{ @@ -2956,8 +2956,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER", ctx) } - AlterTableAlterColumnStatement( - visitMultipartIdentifier(ctx.table), + AlterTableAlterColumn( + UnresolvedTable(visitMultipartIdentifier(ctx.table)), typedVisit[Seq[String]](ctx.column), dataType = Option(ctx.dataType).map(typedVisit[DataType]), nullable = None, @@ -2966,7 +2966,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Parse a [[AlterTableAlterColumnStatement]] command to change column nullability. + * Parse a [[AlterTableAlterColumn]] command to change column nullability. * * For example: * {{{ @@ -2980,8 +2980,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case SqlBaseParser.SET => false case SqlBaseParser.DROP => true } - AlterTableAlterColumnStatement( - visitMultipartIdentifier(ctx.table), + AlterTableAlterColumn( + UnresolvedTable(visitMultipartIdentifier(ctx.table)), typedVisit[Seq[String]](ctx.column), dataType = None, nullable = Some(nullable), @@ -2991,7 +2991,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Parse a [[AlterTableAlterColumnStatement]] command. This is Hive SQL syntax. + * Parse a [[AlterTableAlterColumn]] command. This is Hive SQL syntax. * * For example: * {{{ @@ -3014,8 +3014,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging "please run ALTER COLUMN ... SET/DROP NOT NULL instead.") } - AlterTableAlterColumnStatement( - typedVisit[Seq[String]](ctx.table), + AlterTableAlterColumn( + UnresolvedTable(typedVisit[Seq[String]](ctx.table)), columnNameParts, dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]), nullable = None, @@ -3024,7 +3024,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Parse a [[AlterTableDropColumnsStatement]] command. + * Parse a [[AlterTableDropColumns]] command. * * For example: * {{{ @@ -3035,13 +3035,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitDropTableColumns( ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) { val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]]) - AlterTableDropColumnsStatement( - visitMultipartIdentifier(ctx.multipartIdentifier), + AlterTableDropColumns( + UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)), columnsToDrop) } /** - * Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetPropertiesStatement]] commands. + * Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetProperties]] commands. * * For example: * {{{ @@ -3057,12 +3057,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (ctx.VIEW != null) { AlterViewSetPropertiesStatement(identifier, cleanedTableProperties) } else { - AlterTableSetPropertiesStatement(identifier, cleanedTableProperties) + AlterTableSetProperties(UnresolvedTable(identifier), cleanedTableProperties) } } /** - * Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetPropertiesStatement]] commands. + * Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetProperties]] commands. * * For example: * {{{ @@ -3080,12 +3080,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (ctx.VIEW != null) { AlterViewUnsetPropertiesStatement(identifier, cleanedProperties, ifExists) } else { - AlterTableUnsetPropertiesStatement(identifier, cleanedProperties, ifExists) + AlterTableUnsetProperties(UnresolvedTable(identifier), cleanedProperties, ifExists) } } /** - * Create an [[AlterTableSetLocationStatement]] command. + * Create an [[AlterTableSetLocation]] command. * * For example: * {{{ @@ -3093,8 +3093,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) { - AlterTableSetLocationStatement( - visitMultipartIdentifier(ctx.multipartIdentifier), + AlterTableSetLocation( + UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)), Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), visitLocationSpec(ctx.locationSpec)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 44f7b4143926..2083a00cae0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -149,62 +149,6 @@ case class QualifiedColType( comment: Option[String], position: Option[ColumnPosition]) -/** - * ALTER TABLE ... ADD COLUMNS command, as parsed from SQL. - */ -case class AlterTableAddColumnsStatement( - tableName: Seq[String], - columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement - -/** - * ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL. - */ -case class AlterTableAlterColumnStatement( - tableName: Seq[String], - column: Seq[String], - dataType: Option[DataType], - nullable: Option[Boolean], - comment: Option[String], - position: Option[ColumnPosition]) extends ParsedStatement - -/** - * ALTER TABLE ... RENAME COLUMN command, as parsed from SQL. - */ -case class AlterTableRenameColumnStatement( - tableName: Seq[String], - column: Seq[String], - newName: String) extends ParsedStatement - -/** - * ALTER TABLE ... DROP COLUMNS command, as parsed from SQL. - */ -case class AlterTableDropColumnsStatement( - tableName: Seq[String], - columnsToDrop: Seq[Seq[String]]) extends ParsedStatement - -/** - * ALTER TABLE ... SET TBLPROPERTIES command, as parsed from SQL. - */ -case class AlterTableSetPropertiesStatement( - tableName: Seq[String], - properties: Map[String, String]) extends ParsedStatement - -/** - * ALTER TABLE ... UNSET TBLPROPERTIES command, as parsed from SQL. - */ -case class AlterTableUnsetPropertiesStatement( - tableName: Seq[String], - propertyKeys: Seq[String], - ifExists: Boolean) extends ParsedStatement - -/** - * ALTER TABLE ... SET LOCATION command, as parsed from SQL. - */ -case class AlterTableSetLocationStatement( - tableName: Seq[String], - partitionSpec: Option[TablePartitionSpec], - location: String) extends ParsedStatement - /** * ALTER TABLE ... RECOVER PARTITIONS command, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index e1e7eac4cc08..d62869e91298 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable} import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange} +import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType} @@ -394,37 +394,125 @@ case class DropTable( ifExists: Boolean) extends Command /** - * The logical plan of the ALTER TABLE command that works for v2 tables. + * The base class for ALTER TABLE commands that work for v2 tables. */ -case class AlterTable( - catalog: TableCatalog, - ident: Identifier, - table: NamedRelation, - changes: Seq[TableChange]) extends Command { - - override lazy val resolved: Boolean = table.resolved && { - changes.forall { - case add: AddColumn => - add.fieldNames match { - case Array(_) => - // a top-level field can always be added - true - case _ => - // the parent field must exist - table.schema.findNestedField(add.fieldNames.init, includeCollections = true).isDefined - } +abstract class AlterTable extends Command { + def table: LogicalPlan - case colChange: ColumnChange => - // the column that will be changed must exist - table.schema.findNestedField(colChange.fieldNames, includeCollections = true).isDefined + def changes: Seq[TableChange] + + override def children: Seq[LogicalPlan] = Seq(table) + + override lazy val resolved: Boolean = table.resolved +} - case _ => - // property changes require no resolution checks - true +/** + * The logical plan of the ALTER TABLE ... ADD COLUMNS command that works for v2 tables. + */ +case class AlterTableAddColumns( + table: LogicalPlan, + columnsToAdd: Seq[QualifiedColType]) extends AlterTable { + override lazy val changes: Seq[TableChange] = { + columnsToAdd.map { col => + TableChange.addColumn( + col.name.toArray, + col.dataType, + col.nullable, + col.comment.orNull, + col.position.orNull) } } } +/** + * The logical plan of the ALTER TABLE ... CHANGE COLUMN command that works for v2 tables. + */ +case class AlterTableAlterColumn( + table: LogicalPlan, + column: Seq[String], + dataType: Option[DataType], + nullable: Option[Boolean], + comment: Option[String], + position: Option[ColumnPosition]) extends AlterTable { + override lazy val changes: Seq[TableChange] = { + val colName = column.toArray + val typeChange = dataType.map { newDataType => + TableChange.updateColumnType(colName, newDataType) + } + val nullabilityChange = nullable.map { nullable => + TableChange.updateColumnNullability(colName, nullable) + } + val commentChange = comment.map { newComment => + TableChange.updateColumnComment(colName, newComment) + } + val positionChange = position.map { newPosition => + TableChange.updateColumnPosition(colName, newPosition) + } + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange + } +} + +/** + * The logical plan of the ALTER TABLE ... RENAME COLUMN command that works for v2 tables. + */ +case class AlterTableRenameColumn( + table: LogicalPlan, + column: Seq[String], + newName: String) extends AlterTable { + override lazy val changes: Seq[TableChange] = { + Seq(TableChange.renameColumn(column.toArray, newName)) + } +} + +/** + * The logical plan of the ALTER TABLE ... DROP COLUMNS command that works for v2 tables. + */ +case class AlterTableDropColumns( + table: LogicalPlan, + columnsToDrop: Seq[Seq[String]]) extends AlterTable { + override lazy val changes: Seq[TableChange] = { + columnsToDrop.map(col => TableChange.deleteColumn(col.toArray)) + } +} + +/** + * The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command that works for v2 tables. + */ +case class AlterTableSetProperties( + table: LogicalPlan, + properties: Map[String, String]) extends AlterTable { + override lazy val changes: Seq[TableChange] = { + properties.map { case (key, value) => + TableChange.setProperty(key, value) + }.toSeq + } +} + +/** + * The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command that works for v2 tables. + */ +// TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag. +case class AlterTableUnsetProperties( + table: LogicalPlan, + propertyKeys: Seq[String], + ifExists: Boolean) extends AlterTable { + override lazy val changes: Seq[TableChange] = { + propertyKeys.map(key => TableChange.removeProperty(key)) + } +} + +/** + * The logical plan of the ALTER TABLE ... SET LOCATION command that works for v2 tables. + */ +case class AlterTableSetLocation( + table: LogicalPlan, + partitionSpec: Option[TablePartitionSpec], + location: String) extends AlterTable { + override lazy val changes: Seq[TableChange] = { + Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location)) + } +} + /** * The logical plan of the ALTER TABLE RENAME command that works for v2 tables. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index f8fc4bc7ce23..3ee22548ca3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,8 +22,7 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} -import org.apache.spark.sql.catalyst.plans.logical.AlterTable +import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException} import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -276,17 +275,6 @@ private[sql] object CatalogV2Util { location.map(TableCatalog.PROP_LOCATION -> _) } - def createAlterTable( - originalNameParts: Seq[String], - catalog: CatalogPlugin, - tableName: Seq[String], - changes: Seq[TableChange]): AlterTable = { - val tableCatalog = catalog.asTableCatalog - val ident = tableName.asIdentifier - val unresolved = UnresolvedV2Relation(originalNameParts, tableCatalog, ident) - AlterTable(tableCatalog, ident, unresolved, changes) - } - def getTableProviderCatalog( provider: SupportsCatalogOptions, catalogManager: CatalogManager, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 47387fa18411..5e45b57187f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -476,22 +476,22 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql1_table), - AlterTableSetPropertiesStatement( - Seq("table_name"), Map("test" -> "test", "comment" -> "new_comment"))) + AlterTableSetProperties( + UnresolvedTable(Seq("table_name")), Map("test" -> "test", "comment" -> "new_comment"))) comparePlans( parsePlan(sql2_table), - AlterTableUnsetPropertiesStatement( - Seq("table_name"), Seq("comment", "test"), ifExists = false)) + AlterTableUnsetProperties( + UnresolvedTable(Seq("table_name")), Seq("comment", "test"), ifExists = false)) comparePlans( parsePlan(sql3_table), - AlterTableUnsetPropertiesStatement( - Seq("table_name"), Seq("comment", "test"), ifExists = true)) + AlterTableUnsetProperties( + UnresolvedTable(Seq("table_name")), Seq("comment", "test"), ifExists = true)) } test("alter table: add column") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, None) ))) } @@ -499,7 +499,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add multiple columns") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, None), QualifiedColType(Seq("y"), StringType, true, None, None) ))) @@ -508,7 +508,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS x int"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, None) ))) } @@ -516,7 +516,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS (...)") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, None) ))) } @@ -524,7 +524,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS (...) and COMMENT") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, true, Some("doc"), None) ))) } @@ -532,7 +532,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add non-nullable column") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int NOT NULL"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, false, None, None) ))) } @@ -540,7 +540,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COMMENT") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, true, Some("doc"), None) ))) } @@ -548,13 +548,13 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with position") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, Some(first())) ))) comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, Some(after("y"))) ))) } @@ -562,7 +562,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with nested column name") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x", "y", "z"), IntegerType, true, Some("doc"), None) ))) } @@ -570,7 +570,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add multiple columns with nested column name") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"), - AlterTableAddColumnsStatement(Seq("table_name"), Seq( + AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( QualifiedColType(Seq("x", "y", "z"), IntegerType, true, Some("doc"), None), QualifiedColType(Seq("a", "b"), StringType, true, None, Some(first())) ))) @@ -579,12 +579,12 @@ class DDLParserSuite extends AnalysisTest { test("alter table: set location") { comparePlans( parsePlan("ALTER TABLE a.b.c SET LOCATION 'new location'"), - AlterTableSetLocationStatement(Seq("a", "b", "c"), None, "new location")) + AlterTableSetLocation(UnresolvedTable(Seq("a", "b", "c")), None, "new location")) comparePlans( parsePlan("ALTER TABLE a.b.c PARTITION(ds='2017-06-10') SET LOCATION 'new location'"), - AlterTableSetLocationStatement( - Seq("a", "b", "c"), + AlterTableSetLocation( + UnresolvedTable(Seq("a", "b", "c")), Some(Map("ds" -> "2017-06-10")), "new location")) } @@ -592,8 +592,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: rename column") { comparePlans( parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"), - AlterTableRenameColumnStatement( - Seq("table_name"), + AlterTableRenameColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), "d")) } @@ -601,8 +601,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type using ALTER") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), Some(LongType), None, @@ -613,8 +613,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), Some(LongType), None, @@ -625,8 +625,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column comment") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), None, None, @@ -637,8 +637,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column position") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), None, None, @@ -650,8 +650,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c " + "TYPE bigint COMMENT 'new comment' AFTER d"), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), Some(LongType), None, @@ -662,8 +662,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: SET/DROP NOT NULL") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), None, Some(false), @@ -672,8 +672,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), None, Some(true), @@ -684,7 +684,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: drop column") { comparePlans( parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"), - AlterTableDropColumnsStatement(Seq("table_name"), Seq(Seq("a", "b", "c")))) + AlterTableDropColumns(UnresolvedTable(Seq("table_name")), Seq(Seq("a", "b", "c")))) } test("alter table: drop multiple columns") { @@ -692,8 +692,8 @@ class DDLParserSuite extends AnalysisTest { Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop => comparePlans( parsePlan(drop), - AlterTableDropColumnsStatement( - Seq("table_name"), + AlterTableDropColumns( + UnresolvedTable(Seq("table_name")), Seq(Seq("x"), Seq("y"), Seq("a", "b", "c")))) } } @@ -705,8 +705,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql1), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), Some(IntegerType), None, @@ -715,8 +715,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql2), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), Some(IntegerType), None, @@ -725,8 +725,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql3), - AlterTableAlterColumnStatement( - Seq("table_name"), + AlterTableAlterColumn( + UnresolvedTable(Seq("table_name")), Seq("a", "b", "c"), Some(IntegerType), None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 8b0d339dbb86..0aaf9d7e2e1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} @@ -47,141 +47,63 @@ class ResolveSessionCatalog( import org.apache.spark.sql.connector.catalog.CatalogV2Util._ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case AlterTableAddColumnsStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - cols.foreach { c => - assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand") - if (!c.nullable) { - throw new AnalysisException( - "ADD COLUMN with v1 tables cannot specify NOT NULL.") - } - } - AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField)) - }.getOrElse { - val changes = cols.map { col => - TableChange.addColumn( - col.name.toArray, - col.dataType, - col.nullable, - col.comment.orNull, - col.position.orNull) + case AlterTableAddColumns(ResolvedTable(_, ident, _: V1Table), cols) => + cols.foreach { c => + assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand") + if (!c.nullable) { + throw new AnalysisException( + "ADD COLUMN with v1 tables cannot specify NOT NULL.") } - createAlterTable(nameParts, catalog, tbl, changes) } + AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField)) - case a @ AlterTableAlterColumnStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - if (a.column.length > 1) { - throw new AnalysisException( - "ALTER COLUMN with qualified column is only supported with v2 tables.") - } - if (a.dataType.isEmpty) { - throw new AnalysisException( - "ALTER COLUMN with v1 tables must specify new data type.") - } - if (a.nullable.isDefined) { - throw new AnalysisException( - "ALTER COLUMN with v1 tables cannot specify NOT NULL.") - } - if (a.position.isDefined) { - throw new AnalysisException("" + - "ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.") - } - val builder = new MetadataBuilder - // Add comment to metadata - a.comment.map(c => builder.putString("comment", c)) - // Add Hive type string to metadata. - val cleanedDataType = HiveStringType.replaceCharType(a.dataType.get) - if (a.dataType.get != cleanedDataType) { - builder.putString(HIVE_TYPE_STRING, a.dataType.get.catalogString) - } - val newColumn = StructField( - a.column(0), - cleanedDataType, - nullable = true, - builder.build()) - AlterTableChangeColumnCommand(tbl.asTableIdentifier, a.column(0), newColumn) - }.getOrElse { - val colName = a.column.toArray - val typeChange = a.dataType.map { newDataType => - TableChange.updateColumnType(colName, newDataType) - } - val nullabilityChange = a.nullable.map { nullable => - TableChange.updateColumnNullability(colName, nullable) - } - val commentChange = a.comment.map { newComment => - TableChange.updateColumnComment(colName, newComment) - } - val positionChange = a.position.map { newPosition => - TableChange.updateColumnPosition(colName, newPosition) - } - createAlterTable( - nameParts, - catalog, - tbl, - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange) + case a @ AlterTableAlterColumn(ResolvedTable(_, ident, _: V1Table), _, _, _, _, _) => + if (a.column.length > 1) { + throw new AnalysisException( + "ALTER COLUMN with qualified column is only supported with v2 tables.") } - - case AlterTableRenameColumnStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), col, newName) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - throw new AnalysisException("RENAME COLUMN is only supported with v2 tables.") - }.getOrElse { - val changes = Seq(TableChange.renameColumn(col.toArray, newName)) - createAlterTable(nameParts, catalog, tbl, changes) + if (a.dataType.isEmpty) { + throw new AnalysisException( + "ALTER COLUMN with v1 tables must specify new data type.") } - - case AlterTableDropColumnsStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - throw new AnalysisException("DROP COLUMN is only supported with v2 tables.") - }.getOrElse { - val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) - createAlterTable(nameParts, catalog, tbl, changes) + if (a.nullable.isDefined) { + throw new AnalysisException( + "ALTER COLUMN with v1 tables cannot specify NOT NULL.") } - - case AlterTableSetPropertiesStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), props) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - AlterTableSetPropertiesCommand(tbl.asTableIdentifier, props, isView = false) - }.getOrElse { - val changes = props.map { case (key, value) => - TableChange.setProperty(key, value) - }.toSeq - createAlterTable(nameParts, catalog, tbl, changes) + if (a.position.isDefined) { + throw new AnalysisException("" + + "ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.") } - - case AlterTableUnsetPropertiesStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), keys, ifExists) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - AlterTableUnsetPropertiesCommand( - tbl.asTableIdentifier, keys, ifExists, isView = false) - }.getOrElse { - val changes = keys.map(key => TableChange.removeProperty(key)) - createAlterTable(nameParts, catalog, tbl, changes) + val builder = new MetadataBuilder + // Add comment to metadata + a.comment.map(c => builder.putString("comment", c)) + // Add Hive type string to metadata. + val cleanedDataType = HiveStringType.replaceCharType(a.dataType.get) + if (a.dataType.get != cleanedDataType) { + builder.putString(HIVE_TYPE_STRING, a.dataType.get.catalogString) } + val newColumn = StructField( + a.column(0), + cleanedDataType, + nullable = true, + builder.build()) + AlterTableChangeColumnCommand(ident.asTableIdentifier, a.column(0), newColumn) - case AlterTableSetLocationStatement( - nameParts @ SessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) => - loadTable(catalog, tbl.asIdentifier).collect { - case v1Table: V1Table => - AlterTableSetLocationCommand(tbl.asTableIdentifier, partitionSpec, newLoc) - }.getOrElse { - if (partitionSpec.nonEmpty) { - throw new AnalysisException( - "ALTER TABLE SET LOCATION does not support partition for v2 tables.") - } - val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc)) - createAlterTable(nameParts, catalog, tbl, changes) - } + case AlterTableRenameColumn(ResolvedTable(_, _, _: V1Table), _, _) => + throw new AnalysisException("RENAME COLUMN is only supported with v2 tables.") + + case AlterTableDropColumns(ResolvedTable(_, _, _: V1Table), _) => + throw new AnalysisException("DROP COLUMN is only supported with v2 tables.") + + case AlterTableSetProperties(ResolvedTable(_, ident, _: V1Table), props) => + AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false) + + case AlterTableUnsetProperties(ResolvedTable(_, ident, _: V1Table), keys, ifExists) => + AlterTableUnsetPropertiesCommand(ident.asTableIdentifier, keys, ifExists, isView = false) + + case AlterTableSetLocation( + ResolvedTable(_, ident, _: V1Table), partitionSpec, newLoc) => + AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, newLoc) // ALTER VIEW should always use v1 command if the resolved catalog is session catalog. case AlterViewSetPropertiesStatement(SessionCatalogAndTable(_, tbl), props) => @@ -218,7 +140,7 @@ class ResolveSessionCatalog( DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended) // Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet. - case DescribeRelation(ResolvedView(ident), partitionSpec, isExtended) => + case DescribeRelation(ResolvedView(ident, _), partitionSpec, isExtended) => DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended) case DescribeColumnStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index a92fbdf25975..ab47d640f470 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -257,14 +257,6 @@ case class AlterTableAddColumnsCommand( table: TableIdentifier): CatalogTable = { val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) - if (catalogTable.tableType == CatalogTableType.VIEW) { - throw new AnalysisException( - s""" - |ALTER ADD COLUMNS does not support views. - |You must drop and re-create the views for adding the new columns. Views: $table - """.stripMargin) - } - if (DDLUtils.isDatasourceTable(catalogTable)) { DataSource.lookupDataSource(catalogTable.provider.get, conf). getConstructor().newInstance() match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index d848e3464866..7169a437359a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -235,8 +235,18 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DropTable(catalog, ident, ifExists) => DropTableExec(catalog, ident, ifExists) :: Nil - case AlterTable(catalog, ident, _, changes) => - AlterTableExec(catalog, ident, changes) :: Nil + case a @ AlterTableSetLocation(r: ResolvedTable, partitionSpec, _) => + if (partitionSpec.nonEmpty) { + throw new AnalysisException( + "ALTER TABLE SET LOCATION does not support partition for v2 tables.") + } + AlterTableExec(r.catalog, r.identifier, a.changes) :: Nil + + case a: AlterTable => + a.table match { + case r: ResolvedTable => AlterTableExec(r.catalog, r.identifier, a.changes) :: Nil + case _ => Nil + } case RenameTable(catalog, oldIdent, newIdent) => RenameTableExec(catalog, oldIdent, newIdent) :: Nil diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index 82326346b361..bb5b4ae84d3b 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -195,7 +195,7 @@ ALTER TABLE temp_view CHANGE a TYPE INT COMMENT 'this is column a' struct<> -- !query 20 output org.apache.spark.sql.AnalysisException -Invalid command: 'temp_view' is a view not a table.; line 1 pos 0 +temp_view is a temp view not a table.; line 1 pos 0 -- !query 21 @@ -212,7 +212,7 @@ ALTER TABLE global_temp.global_temp_view CHANGE a TYPE INT COMMENT 'this is colu struct<> -- !query 22 output org.apache.spark.sql.AnalysisException -Invalid command: 'global_temp.global_temp_view' is a view not a table.; line 1 pos 0 +global_temp.global_temp_view is a temp view not a table.; line 1 pos 0 -- !query 23 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 4ee8a6803ea5..4c5b1d95b12d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2194,7 +2194,7 @@ class DataSourceV2SQLSuite withTempView("v") { sql("create global temp view v as select 1") val e = intercept[AnalysisException](sql("COMMENT ON TABLE global_temp.v IS NULL")) - assert(e.getMessage.contains("global_temp.v is a temp view not table.")) + assert(e.getMessage.contains("global_temp.v is a temp view not a table.")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 9a393f19ce9b..4ed2506b35a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -145,13 +145,13 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { // For v2 ALTER TABLE statements, we have better error message saying view is not supported. assertAnalysisError( s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'", - s"'$viewName' is a view not a table") + s"$viewName is a temp view not a table") - // For the following v2 ALERT TABLE statements, unsupported operations are checked first - // before resolving the relations. + // For the following v2 ALERT TABLE statements, relations are first resolved before + // unsupported operations are checked. assertAnalysisError( s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'", - "ALTER TABLE SET LOCATION does not support partition for v2 tables") + s"$viewName is a temp view not a table") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 11c6487e25e9..64c85c19ba48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2779,7 +2779,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val e = intercept[AnalysisException] { sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)") } - assert(e.message.contains("'tmp_v' is a view not a table")) + assert(e.message.contains("tmp_v is a temp view not a table")) } } @@ -2789,7 +2789,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val e = intercept[AnalysisException] { sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)") } - assert(e.message.contains("ALTER ADD COLUMNS does not support views")) + assert(e.message.contains( + "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 0901c66cccce..abc20049735c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -26,7 +26,7 @@ import org.mockito.invocation.InvocationOnMock import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, CTESubstitution, EmptyFunctionRegistry, NoSuchTableException, ResolveCatalogs, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, UnresolvedV2Relation} +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, CTESubstitution, EmptyFunctionRegistry, NoSuchTableException, ResolveCatalogs, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, UnresolvedTable} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{EqualTo, InSubquery, IntegerLiteral, ListQuery, StringLiteral} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -131,9 +131,9 @@ class PlanResolutionSuite extends AnalysisTest { val rules = Seq( CTESubstitution, analyzer.ResolveRelations, + analyzer.ResolveTables, new ResolveCatalogs(catalogManager), - new ResolveSessionCatalog(catalogManager, conf, _ == Seq("v")), - analyzer.ResolveTables) + new ResolveSessionCatalog(catalogManager, conf, _ == Seq("v"))) rules.foldLeft(parsePlan(query)) { case (plan, rule) => rule.apply(plan) } @@ -711,24 +711,24 @@ class PlanResolutionSuite extends AnalysisTest { comparePlans(parsed3, expected3) } else { parsed1 match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( + case a: AlterTable => + assert(a.changes == Seq( TableChange.setProperty("test", "test"), TableChange.setProperty("comment", "new_comment"))) case _ => fail("expect AlterTable") } parsed2 match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( + case a: AlterTable => + assert(a.changes == Seq( TableChange.removeProperty("comment"), TableChange.removeProperty("test"))) case _ => fail("expect AlterTable") } parsed3 match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( + case a: AlterTable => + assert(a.changes == Seq( TableChange.removeProperty("comment"), TableChange.removeProperty("test"))) case _ => fail("expect AlterTable") @@ -741,15 +741,9 @@ class PlanResolutionSuite extends AnalysisTest { val parsed4 = parseAndResolve(sql4) val parsed5 = parseAndResolve(sql5) - // For non-existing tables, we convert it to v2 command with `UnresolvedV2Table` - parsed4 match { - case AlterTable(_, _, _: UnresolvedV2Relation, _) => // OK - case _ => fail("Expect AlterTable, but got:\n" + parsed4.treeString) - } - parsed5 match { - case AlterTable(_, _, _: UnresolvedV2Relation, _) => // OK - case _ => fail("Expect AlterTable, but got:\n" + parsed5.treeString) - } + // For non-existing tables, we expect `UnresolvedTable` in the resolved plan. + assert(parsed4.collect{ case u: UnresolvedTable => u }.length == 1) + assert(parsed5.collect{ case u: UnresolvedTable => u }.length == 1) } test("support for other types in TBLPROPERTIES") { @@ -770,8 +764,8 @@ class PlanResolutionSuite extends AnalysisTest { comparePlans(parsed, expected) } else { parsed match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( + case a: AlterTable => + assert(a.changes == Seq( TableChange.setProperty("a", "1"), TableChange.setProperty("b", "0.1"), TableChange.setProperty("c", "true"))) @@ -794,8 +788,8 @@ class PlanResolutionSuite extends AnalysisTest { comparePlans(parsed, expected) } else { parsed match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq(TableChange.setProperty("location", "new location"))) + case a: AlterTable => + assert(a.changes == Seq(TableChange.setProperty("location", "new location"))) case _ => fail("Expect AlterTable, but got:\n" + parsed.treeString) } } @@ -1036,23 +1030,23 @@ class PlanResolutionSuite extends AnalysisTest { val parsed3 = parseAndResolve(sql3) parsed1 match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( + case a: AlterTable => + assert(a.changes == Seq( TableChange.updateColumnType(Array("i"), LongType))) case _ => fail("expect AlterTable") } parsed2 match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( + case a: AlterTable => + assert(a.changes == Seq( TableChange.updateColumnType(Array("i"), LongType), TableChange.updateColumnComment(Array("i"), "new comment"))) case _ => fail("expect AlterTable") } parsed3 match { - case AlterTable(_, _, _: DataSourceV2Relation, changes) => - assert(changes == Seq( + case a: AlterTable => + assert(a.changes == Seq( TableChange.updateColumnComment(Array("i"), "new comment"))) case _ => fail("expect AlterTable") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index dbbf2b29fe8b..9b12ac1d79e7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -158,7 +158,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto val message = intercept[AnalysisException] { sql("SHOW TBLPROPERTIES parquet_temp") }.getMessage - assert(message.contains("parquet_temp is a temp view not table")) + assert(message.contains("parquet_temp is a temp view not a table")) } }