diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7e9f85b64e4a..7954e0b547d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -755,14 +755,12 @@ class Analyzer( .map(view => i.copy(table = view)) .getOrElse(i) case u @ UnresolvedTable(ident) => - lookupTempView(ident) - .map(_ => UnresolvedTableWithViewExists( - ResolvedView(ident.asIdentifier, isTempView = true))) - .getOrElse(u) + lookupTempView(ident).foreach { _ => + u.failAnalysis(s"${ident.quoted} is a temp view not table.") + } + u case u @ UnresolvedTableOrView(ident) => - lookupTempView(ident) - .map(_ => ResolvedView(ident.asIdentifier, isTempView = true)) - .getOrElse(u) + lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u) } def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = { @@ -816,6 +814,14 @@ class Analyzer( lookupV2Relation(u.multipartIdentifier) .map(v2Relation => i.copy(table = v2Relation)) .getOrElse(i) + + case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) => + CatalogV2Util.loadRelation(u.catalog, u.tableName) + .map(rel => alter.copy(table = rel)) + .getOrElse(alter) + + case u: UnresolvedV2Relation => + CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u) } /** @@ -882,7 +888,8 @@ class Analyzer( case u @ UnresolvedTable(identifier) => lookupTableOrView(identifier).map { - case v: ResolvedView => UnresolvedTableWithViewExists(v) + case v: ResolvedView => + u.failAnalysis(s"${v.identifier.quoted} is a view not table.") case table => table }.getOrElse(u) @@ -895,7 +902,7 @@ class Analyzer( case SessionCatalogAndIdentifier(catalog, ident) => CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table if v1Table.v1Table.tableType == CatalogTableType.VIEW => - ResolvedView(ident, isTempView = false) + ResolvedView(ident) case table => ResolvedTable(catalog.asTableCatalog, ident, table) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 624c25d95c70..d6fc1dc6ddc3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.TypeUtils -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnType} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -87,20 +87,6 @@ trait CheckAnalysis extends PredicateHelper { } def checkAnalysis(plan: LogicalPlan): Unit = { - // Analysis that needs to be performed top down can be added here. - plan.foreach { - case p if p.analyzed => // Skip already analyzed sub-plans - - case alter: AlterTable => - alter.table match { - case u @ UnresolvedTableWithViewExists(view) if !view.isTempView => - u.failAnalysis("Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead") - case _ => - } - - case _ => // Analysis successful! - } - // We transform up and order the rules so as to catch the first possible failure instead // of the result of cascading resolution failures. plan.foreachUp { @@ -119,13 +105,23 @@ trait CheckAnalysis extends PredicateHelper { case u: UnresolvedRelation => u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}") - case u: UnresolvedTableWithViewExists => - val viewKind = if (u.view.isTempView) { "temp view" } else { "view" } - u.failAnalysis(s"${u.view.identifier.quoted} is a $viewKind not a table.") - case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) => failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}") + case u: UnresolvedV2Relation if isView(u.originalNameParts) => + u.failAnalysis( + s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") + + case u: UnresolvedV2Relation => + u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + + case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) => + u.failAnalysis( + s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.") + + case AlterTable(_, _, u: UnresolvedV2Relation, _) => + failAnalysis(s"Table not found: ${u.originalNameParts.quoted}") + case operator: LogicalPlan => // Check argument data types of higher-order functions downwards first. // If the arguments of the higher-order functions are resolved but the type check fails, @@ -429,9 +425,8 @@ trait CheckAnalysis extends PredicateHelper { case _ => } - case alter: AlterTable - if alter.childrenResolved && alter.table.isInstanceOf[ResolvedTable] => - val table = alter.table.asInstanceOf[ResolvedTable].table + case alter: AlterTable if alter.childrenResolved => + val table = alter.table def findField(operation: String, fieldName: Array[String]): StructField = { // include collections because structs nested in maps and arrays may be altered val field = table.schema.findNestedField(fieldName, includeCollections = true) @@ -484,8 +479,6 @@ trait CheckAnalysis extends PredicateHelper { throw new AnalysisException( s"Cannot change nullable column to non-nullable: $fieldName") } - case update: UpdateColumnPosition => - findField("update", update.fieldNames) case rename: RenameColumn => findField("rename", rename.fieldNames) case update: UpdateColumnComment => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index a44877fc1b4d..88a3c0a73a10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange} /** * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements @@ -32,6 +32,71 @@ class ResolveCatalogs(val catalogManager: CatalogManager) import org.apache.spark.sql.connector.catalog.CatalogV2Util._ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case AlterTableAddColumnsStatement( + nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => + val changes = cols.map { col => + TableChange.addColumn( + col.name.toArray, + col.dataType, + col.nullable, + col.comment.orNull, + col.position.orNull) + } + createAlterTable(nameParts, catalog, tbl, changes) + + case a @ AlterTableAlterColumnStatement( + nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => + val colName = a.column.toArray + val typeChange = a.dataType.map { newDataType => + TableChange.updateColumnType(colName, newDataType) + } + val nullabilityChange = a.nullable.map { nullable => + TableChange.updateColumnNullability(colName, nullable) + } + val commentChange = a.comment.map { newComment => + TableChange.updateColumnComment(colName, newComment) + } + val positionChange = a.position.map { newPosition => + TableChange.updateColumnPosition(colName, newPosition) + } + createAlterTable( + nameParts, + catalog, + tbl, + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange) + + case AlterTableRenameColumnStatement( + nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) => + val changes = Seq(TableChange.renameColumn(col.toArray, newName)) + createAlterTable(nameParts, catalog, tbl, changes) + + case AlterTableDropColumnsStatement( + nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) => + val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) + createAlterTable(nameParts, catalog, tbl, changes) + + case AlterTableSetPropertiesStatement( + nameParts @ NonSessionCatalogAndTable(catalog, tbl), props) => + val changes = props.map { case (key, value) => + TableChange.setProperty(key, value) + }.toSeq + createAlterTable(nameParts, catalog, tbl, changes) + + // TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag. + case AlterTableUnsetPropertiesStatement( + nameParts @ NonSessionCatalogAndTable(catalog, tbl), keys, _) => + val changes = keys.map(key => TableChange.removeProperty(key)) + createAlterTable(nameParts, catalog, tbl, changes) + + case AlterTableSetLocationStatement( + nameParts @ NonSessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) => + if (partitionSpec.nonEmpty) { + throw new AnalysisException( + "ALTER TABLE SET LOCATION does not support partition for v2 tables.") + } + val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc)) + createAlterTable(nameParts, catalog, tbl, changes) + case AlterViewSetPropertiesStatement( NonSessionCatalogAndTable(catalog, tbl), props) => throw new AnalysisException( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index c7d977a3d4a8..608f39c2d86f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.types.{DataType, Metadata, StructType} /** @@ -59,6 +60,28 @@ object UnresolvedRelation { UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table) } +/** + * A variant of [[UnresolvedRelation]] which can only be resolved to a v2 relation + * (`DataSourceV2Relation`), not v1 relation or temp view. + * + * @param originalNameParts the original table identifier name parts before catalog is resolved. + * @param catalog The catalog which the table should be looked up from. + * @param tableName The name of the table to look up. + */ +case class UnresolvedV2Relation( + originalNameParts: Seq[String], + catalog: TableCatalog, + tableName: Identifier) + extends LeafNode with NamedRelation { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override def name: String = originalNameParts.quoted + + override def output: Seq[Attribute] = Nil + + override lazy val resolved = false +} + /** * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index c6300b0bb079..239f987e97a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, Table, TableCatalog} /** @@ -41,16 +41,6 @@ case class UnresolvedTable(multipartIdentifier: Seq[String]) extends LeafNode { override def output: Seq[Attribute] = Nil } -/** - * Holds the resolved view. It is used in a scenario where table is expected but the identifier - * is resolved to a (temp) view. - */ -case class UnresolvedTableWithViewExists(view: ResolvedView) extends LeafNode { - override lazy val resolved: Boolean = false - - override def output: Seq[Attribute] = Nil -} - /** * Holds the name of a table or view that has yet to be looked up in a catalog. It will * be resolved to [[ResolvedTable]] or [[ResolvedView]] during analysis. @@ -81,6 +71,6 @@ case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: T */ // TODO: create a generic representation for temp view, v1 view and v2 view, after we add view // support to v2 catalog. For now we only need the identifier to fallback to v1 command. -case class ResolvedView(identifier: Identifier, isTempView: Boolean) extends LeafNode { +case class ResolvedView(identifier: Identifier) extends LeafNode { override def output: Seq[Attribute] = Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a8a96f0f6803..e1dca4e94539 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2908,7 +2908,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Parse a [[AlterTableAddColumns]] command. + * Parse a [[AlterTableAddColumnsStatement]] command. * * For example: * {{{ @@ -2917,14 +2917,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = withOrigin(ctx) { - AlterTableAddColumns( - UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)), + AlterTableAddColumnsStatement( + visitMultipartIdentifier(ctx.multipartIdentifier), ctx.columns.qualifiedColTypeWithPosition.asScala.map(typedVisit[QualifiedColType]) ) } /** - * Parse a [[AlterTableRenameColumn]] command. + * Parse a [[AlterTableRenameColumnStatement]] command. * * For example: * {{{ @@ -2933,14 +2933,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging */ override def visitRenameTableColumn( ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) { - AlterTableRenameColumn( - UnresolvedTable(visitMultipartIdentifier(ctx.table)), + AlterTableRenameColumnStatement( + visitMultipartIdentifier(ctx.table), ctx.from.parts.asScala.map(_.getText), ctx.to.getText) } /** - * Parse a [[AlterTableAlterColumn]] command. + * Parse a [[AlterTableAlterColumnStatement]] command. * * For example: * {{{ @@ -2957,8 +2957,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging s"ALTER TABLE table $verb COLUMN requires a TYPE or a COMMENT or a FIRST/AFTER", ctx) } - AlterTableAlterColumn( - UnresolvedTable(visitMultipartIdentifier(ctx.table)), + AlterTableAlterColumnStatement( + visitMultipartIdentifier(ctx.table), typedVisit[Seq[String]](ctx.column), dataType = Option(ctx.dataType).map(typedVisit[DataType]), nullable = None, @@ -2967,7 +2967,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Parse a [[AlterTableAlterColumn]] command to change column nullability. + * Parse a [[AlterTableAlterColumnStatement]] command to change column nullability. * * For example: * {{{ @@ -2981,8 +2981,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case SqlBaseParser.SET => false case SqlBaseParser.DROP => true } - AlterTableAlterColumn( - UnresolvedTable(visitMultipartIdentifier(ctx.table)), + AlterTableAlterColumnStatement( + visitMultipartIdentifier(ctx.table), typedVisit[Seq[String]](ctx.column), dataType = None, nullable = Some(nullable), @@ -2992,7 +2992,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Parse a [[AlterTableAlterColumn]] command. This is Hive SQL syntax. + * Parse a [[AlterTableAlterColumnStatement]] command. This is Hive SQL syntax. * * For example: * {{{ @@ -3015,8 +3015,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging "please run ALTER COLUMN ... SET/DROP NOT NULL instead.") } - AlterTableAlterColumn( - UnresolvedTable(typedVisit[Seq[String]](ctx.table)), + AlterTableAlterColumnStatement( + typedVisit[Seq[String]](ctx.table), columnNameParts, dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]), nullable = None, @@ -3025,7 +3025,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } /** - * Parse a [[AlterTableDropColumns]] command. + * Parse a [[AlterTableDropColumnsStatement]] command. * * For example: * {{{ @@ -3036,13 +3036,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging override def visitDropTableColumns( ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) { val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]]) - AlterTableDropColumns( - UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)), + AlterTableDropColumnsStatement( + visitMultipartIdentifier(ctx.multipartIdentifier), columnsToDrop) } /** - * Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetProperties]] commands. + * Parse [[AlterViewSetPropertiesStatement]] or [[AlterTableSetPropertiesStatement]] commands. * * For example: * {{{ @@ -3058,12 +3058,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (ctx.VIEW != null) { AlterViewSetPropertiesStatement(identifier, cleanedTableProperties) } else { - AlterTableSetProperties(UnresolvedTable(identifier), cleanedTableProperties) + AlterTableSetPropertiesStatement(identifier, cleanedTableProperties) } } /** - * Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetProperties]] commands. + * Parse [[AlterViewUnsetPropertiesStatement]] or [[AlterTableUnsetPropertiesStatement]] commands. * * For example: * {{{ @@ -3081,12 +3081,12 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging if (ctx.VIEW != null) { AlterViewUnsetPropertiesStatement(identifier, cleanedProperties, ifExists) } else { - AlterTableUnsetProperties(UnresolvedTable(identifier), cleanedProperties, ifExists) + AlterTableUnsetPropertiesStatement(identifier, cleanedProperties, ifExists) } } /** - * Create an [[AlterTableSetLocation]] command. + * Create an [[AlterTableSetLocationStatement]] command. * * For example: * {{{ @@ -3094,8 +3094,8 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging * }}} */ override def visitSetTableLocation(ctx: SetTableLocationContext): LogicalPlan = withOrigin(ctx) { - AlterTableSetLocation( - UnresolvedTable(visitMultipartIdentifier(ctx.multipartIdentifier)), + AlterTableSetLocationStatement( + visitMultipartIdentifier(ctx.multipartIdentifier), Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec), visitLocationSpec(ctx.locationSpec)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 2083a00cae0d..44f7b4143926 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -149,6 +149,62 @@ case class QualifiedColType( comment: Option[String], position: Option[ColumnPosition]) +/** + * ALTER TABLE ... ADD COLUMNS command, as parsed from SQL. + */ +case class AlterTableAddColumnsStatement( + tableName: Seq[String], + columnsToAdd: Seq[QualifiedColType]) extends ParsedStatement + +/** + * ALTER TABLE ... CHANGE COLUMN command, as parsed from SQL. + */ +case class AlterTableAlterColumnStatement( + tableName: Seq[String], + column: Seq[String], + dataType: Option[DataType], + nullable: Option[Boolean], + comment: Option[String], + position: Option[ColumnPosition]) extends ParsedStatement + +/** + * ALTER TABLE ... RENAME COLUMN command, as parsed from SQL. + */ +case class AlterTableRenameColumnStatement( + tableName: Seq[String], + column: Seq[String], + newName: String) extends ParsedStatement + +/** + * ALTER TABLE ... DROP COLUMNS command, as parsed from SQL. + */ +case class AlterTableDropColumnsStatement( + tableName: Seq[String], + columnsToDrop: Seq[Seq[String]]) extends ParsedStatement + +/** + * ALTER TABLE ... SET TBLPROPERTIES command, as parsed from SQL. + */ +case class AlterTableSetPropertiesStatement( + tableName: Seq[String], + properties: Map[String, String]) extends ParsedStatement + +/** + * ALTER TABLE ... UNSET TBLPROPERTIES command, as parsed from SQL. + */ +case class AlterTableUnsetPropertiesStatement( + tableName: Seq[String], + propertyKeys: Seq[String], + ifExists: Boolean) extends ParsedStatement + +/** + * ALTER TABLE ... SET LOCATION command, as parsed from SQL. + */ +case class AlterTableSetLocationStatement( + tableName: Seq[String], + partitionSpec: Option[TablePartitionSpec], + location: String) extends ParsedStatement + /** * ALTER TABLE ... RECOVER PARTITIONS command, as parsed from SQL. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 3e3c81c22b61..c04e56355a68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable} import org.apache.spark.sql.catalyst.plans.DescribeTableSchema import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition +import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType} @@ -384,125 +384,37 @@ case class DropTable( ifExists: Boolean) extends Command /** - * The base class for ALTER TABLE commands that work for v2 tables. + * The logical plan of the ALTER TABLE command that works for v2 tables. */ -abstract class AlterTable extends Command { - def table: LogicalPlan - - def changes: Seq[TableChange] - - override def children: Seq[LogicalPlan] = Seq(table) +case class AlterTable( + catalog: TableCatalog, + ident: Identifier, + table: NamedRelation, + changes: Seq[TableChange]) extends Command { + + override lazy val resolved: Boolean = table.resolved && { + changes.forall { + case add: AddColumn => + add.fieldNames match { + case Array(_) => + // a top-level field can always be added + true + case _ => + // the parent field must exist + table.schema.findNestedField(add.fieldNames.init, includeCollections = true).isDefined + } - override lazy val resolved: Boolean = table.resolved -} + case colChange: ColumnChange => + // the column that will be changed must exist + table.schema.findNestedField(colChange.fieldNames, includeCollections = true).isDefined -/** - * The logical plan of the ALTER TABLE ... ADD COLUMNS command that works for v2 tables. - */ -case class AlterTableAddColumns( - table: LogicalPlan, - columnsToAdd: Seq[QualifiedColType]) extends AlterTable { - override lazy val changes: Seq[TableChange] = { - columnsToAdd.map { col => - TableChange.addColumn( - col.name.toArray, - col.dataType, - col.nullable, - col.comment.orNull, - col.position.orNull) + case _ => + // property changes require no resolution checks + true } } } -/** - * The logical plan of the ALTER TABLE ... CHANGE COLUMN command that works for v2 tables. - */ -case class AlterTableAlterColumn( - table: LogicalPlan, - column: Seq[String], - dataType: Option[DataType], - nullable: Option[Boolean], - comment: Option[String], - position: Option[ColumnPosition]) extends AlterTable { - override lazy val changes: Seq[TableChange] = { - val colName = column.toArray - val typeChange = dataType.map { newDataType => - TableChange.updateColumnType(colName, newDataType) - } - val nullabilityChange = nullable.map { nullable => - TableChange.updateColumnNullability(colName, nullable) - } - val commentChange = comment.map { newComment => - TableChange.updateColumnComment(colName, newComment) - } - val positionChange = position.map { newPosition => - TableChange.updateColumnPosition(colName, newPosition) - } - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange - } -} - -/** - * The logical plan of the ALTER TABLE ... RENAME COLUMN command that works for v2 tables. - */ -case class AlterTableRenameColumn( - table: LogicalPlan, - column: Seq[String], - newName: String) extends AlterTable { - override lazy val changes: Seq[TableChange] = { - Seq(TableChange.renameColumn(column.toArray, newName)) - } -} - -/** - * The logical plan of the ALTER TABLE ... DROP COLUMNS command that works for v2 tables. - */ -case class AlterTableDropColumns( - table: LogicalPlan, - columnsToDrop: Seq[Seq[String]]) extends AlterTable { - override lazy val changes: Seq[TableChange] = { - columnsToDrop.map(col => TableChange.deleteColumn(col.toArray)) - } -} - -/** - * The logical plan of the ALTER TABLE ... SET TBLPROPERTIES command that works for v2 tables. - */ -case class AlterTableSetProperties( - table: LogicalPlan, - properties: Map[String, String]) extends AlterTable { - override lazy val changes: Seq[TableChange] = { - properties.map { case (key, value) => - TableChange.setProperty(key, value) - }.toSeq - } -} - -/** - * The logical plan of the ALTER TABLE ... UNSET TBLPROPERTIES command that works for v2 tables. - */ -// TODO: v2 `UNSET TBLPROPERTIES` should respect the ifExists flag. -case class AlterTableUnsetProperties( - table: LogicalPlan, - propertyKeys: Seq[String], - ifExists: Boolean) extends AlterTable { - override lazy val changes: Seq[TableChange] = { - propertyKeys.map(key => TableChange.removeProperty(key)) - } -} - -/** - * The logical plan of the ALTER TABLE ... SET LOCATION command that works for v2 tables. - */ -case class AlterTableSetLocation( - table: LogicalPlan, - partitionSpec: Option[TablePartitionSpec], - location: String) extends AlterTable { - override lazy val changes: Seq[TableChange] = { - Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, location)) - } -} - /** * The logical plan of the ALTER TABLE RENAME command that works for v2 tables. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index a4c7b4c3a289..67726c734352 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -22,7 +22,8 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException} +import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation} +import org.apache.spark.sql.catalyst.plans.logical.AlterTable import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -280,6 +281,17 @@ private[sql] object CatalogV2Util { properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) } + def createAlterTable( + originalNameParts: Seq[String], + catalog: CatalogPlugin, + tableName: Seq[String], + changes: Seq[TableChange]): AlterTable = { + val tableCatalog = catalog.asTableCatalog + val ident = tableName.asIdentifier + val unresolved = UnresolvedV2Relation(originalNameParts, tableCatalog, ident) + AlterTable(tableCatalog, ident, unresolved, changes) + } + def getTableProviderCatalog( provider: SupportsCatalogOptions, catalogManager: CatalogManager, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0bcfccdd8b90..56d52571d1cc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -476,22 +476,22 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql1_table), - AlterTableSetProperties( - UnresolvedTable(Seq("table_name")), Map("test" -> "test", "comment" -> "new_comment"))) + AlterTableSetPropertiesStatement( + Seq("table_name"), Map("test" -> "test", "comment" -> "new_comment"))) comparePlans( parsePlan(sql2_table), - AlterTableUnsetProperties( - UnresolvedTable(Seq("table_name")), Seq("comment", "test"), ifExists = false)) + AlterTableUnsetPropertiesStatement( + Seq("table_name"), Seq("comment", "test"), ifExists = false)) comparePlans( parsePlan(sql3_table), - AlterTableUnsetProperties( - UnresolvedTable(Seq("table_name")), Seq("comment", "test"), ifExists = true)) + AlterTableUnsetPropertiesStatement( + Seq("table_name"), Seq("comment", "test"), ifExists = true)) } test("alter table: add column") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, None) ))) } @@ -499,7 +499,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add multiple columns") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, None), QualifiedColType(Seq("y"), StringType, true, None, None) ))) @@ -508,7 +508,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS x int"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, None) ))) } @@ -516,7 +516,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS (...)") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, None) ))) } @@ -524,7 +524,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COLUMNS (...) and COMMENT") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, true, Some("doc"), None) ))) } @@ -532,7 +532,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add non-nullable column") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int NOT NULL"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, false, None, None) ))) } @@ -540,7 +540,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with COMMENT") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, true, Some("doc"), None) ))) } @@ -548,13 +548,13 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with position") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, Some(first())) ))) comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x"), IntegerType, true, None, Some(after("y"))) ))) } @@ -562,7 +562,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add column with nested column name") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x", "y", "z"), IntegerType, true, Some("doc"), None) ))) } @@ -570,7 +570,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: add multiple columns with nested column name") { comparePlans( parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"), - AlterTableAddColumns(UnresolvedTable(Seq("table_name")), Seq( + AlterTableAddColumnsStatement(Seq("table_name"), Seq( QualifiedColType(Seq("x", "y", "z"), IntegerType, true, Some("doc"), None), QualifiedColType(Seq("a", "b"), StringType, true, None, Some(first())) ))) @@ -579,12 +579,12 @@ class DDLParserSuite extends AnalysisTest { test("alter table: set location") { comparePlans( parsePlan("ALTER TABLE a.b.c SET LOCATION 'new location'"), - AlterTableSetLocation(UnresolvedTable(Seq("a", "b", "c")), None, "new location")) + AlterTableSetLocationStatement(Seq("a", "b", "c"), None, "new location")) comparePlans( parsePlan("ALTER TABLE a.b.c PARTITION(ds='2017-06-10') SET LOCATION 'new location'"), - AlterTableSetLocation( - UnresolvedTable(Seq("a", "b", "c")), + AlterTableSetLocationStatement( + Seq("a", "b", "c"), Some(Map("ds" -> "2017-06-10")), "new location")) } @@ -592,8 +592,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: rename column") { comparePlans( parsePlan("ALTER TABLE table_name RENAME COLUMN a.b.c TO d"), - AlterTableRenameColumn( - UnresolvedTable(Seq("table_name")), + AlterTableRenameColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), "d")) } @@ -601,8 +601,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type using ALTER") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), Some(LongType), None, @@ -613,8 +613,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), Some(LongType), None, @@ -625,8 +625,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column comment") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), None, None, @@ -637,8 +637,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column position") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), None, None, @@ -650,8 +650,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c " + "TYPE bigint COMMENT 'new comment' AFTER d"), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), Some(LongType), None, @@ -662,8 +662,8 @@ class DDLParserSuite extends AnalysisTest { test("alter table: SET/DROP NOT NULL") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), None, Some(false), @@ -672,8 +672,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), None, Some(true), @@ -684,7 +684,7 @@ class DDLParserSuite extends AnalysisTest { test("alter table: drop column") { comparePlans( parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"), - AlterTableDropColumns(UnresolvedTable(Seq("table_name")), Seq(Seq("a", "b", "c")))) + AlterTableDropColumnsStatement(Seq("table_name"), Seq(Seq("a", "b", "c")))) } test("alter table: drop multiple columns") { @@ -692,8 +692,8 @@ class DDLParserSuite extends AnalysisTest { Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop => comparePlans( parsePlan(drop), - AlterTableDropColumns( - UnresolvedTable(Seq("table_name")), + AlterTableDropColumnsStatement( + Seq("table_name"), Seq(Seq("x"), Seq("y"), Seq("a", "b", "c")))) } } @@ -705,8 +705,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql1), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), Some(IntegerType), None, @@ -715,8 +715,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql2), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), Some(IntegerType), None, @@ -725,8 +725,8 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql3), - AlterTableAlterColumn( - UnresolvedTable(Seq("table_name")), + AlterTableAlterColumnStatement( + Seq("table_name"), Seq("a", "b", "c"), Some(IntegerType), None, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 0aaf9d7e2e1a..8b0d339dbb86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, RefreshTable} @@ -47,63 +47,141 @@ class ResolveSessionCatalog( import org.apache.spark.sql.connector.catalog.CatalogV2Util._ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case AlterTableAddColumns(ResolvedTable(_, ident, _: V1Table), cols) => - cols.foreach { c => - assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand") - if (!c.nullable) { - throw new AnalysisException( - "ADD COLUMN with v1 tables cannot specify NOT NULL.") + case AlterTableAddColumnsStatement( + nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => + loadTable(catalog, tbl.asIdentifier).collect { + case v1Table: V1Table => + cols.foreach { c => + assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand") + if (!c.nullable) { + throw new AnalysisException( + "ADD COLUMN with v1 tables cannot specify NOT NULL.") + } + } + AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField)) + }.getOrElse { + val changes = cols.map { col => + TableChange.addColumn( + col.name.toArray, + col.dataType, + col.nullable, + col.comment.orNull, + col.position.orNull) } + createAlterTable(nameParts, catalog, tbl, changes) } - AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField)) - case a @ AlterTableAlterColumn(ResolvedTable(_, ident, _: V1Table), _, _, _, _, _) => - if (a.column.length > 1) { - throw new AnalysisException( - "ALTER COLUMN with qualified column is only supported with v2 tables.") - } - if (a.dataType.isEmpty) { - throw new AnalysisException( - "ALTER COLUMN with v1 tables must specify new data type.") - } - if (a.nullable.isDefined) { - throw new AnalysisException( - "ALTER COLUMN with v1 tables cannot specify NOT NULL.") - } - if (a.position.isDefined) { - throw new AnalysisException("" + - "ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.") - } - val builder = new MetadataBuilder - // Add comment to metadata - a.comment.map(c => builder.putString("comment", c)) - // Add Hive type string to metadata. - val cleanedDataType = HiveStringType.replaceCharType(a.dataType.get) - if (a.dataType.get != cleanedDataType) { - builder.putString(HIVE_TYPE_STRING, a.dataType.get.catalogString) + case a @ AlterTableAlterColumnStatement( + nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) => + loadTable(catalog, tbl.asIdentifier).collect { + case v1Table: V1Table => + if (a.column.length > 1) { + throw new AnalysisException( + "ALTER COLUMN with qualified column is only supported with v2 tables.") + } + if (a.dataType.isEmpty) { + throw new AnalysisException( + "ALTER COLUMN with v1 tables must specify new data type.") + } + if (a.nullable.isDefined) { + throw new AnalysisException( + "ALTER COLUMN with v1 tables cannot specify NOT NULL.") + } + if (a.position.isDefined) { + throw new AnalysisException("" + + "ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.") + } + val builder = new MetadataBuilder + // Add comment to metadata + a.comment.map(c => builder.putString("comment", c)) + // Add Hive type string to metadata. + val cleanedDataType = HiveStringType.replaceCharType(a.dataType.get) + if (a.dataType.get != cleanedDataType) { + builder.putString(HIVE_TYPE_STRING, a.dataType.get.catalogString) + } + val newColumn = StructField( + a.column(0), + cleanedDataType, + nullable = true, + builder.build()) + AlterTableChangeColumnCommand(tbl.asTableIdentifier, a.column(0), newColumn) + }.getOrElse { + val colName = a.column.toArray + val typeChange = a.dataType.map { newDataType => + TableChange.updateColumnType(colName, newDataType) + } + val nullabilityChange = a.nullable.map { nullable => + TableChange.updateColumnNullability(colName, nullable) + } + val commentChange = a.comment.map { newComment => + TableChange.updateColumnComment(colName, newComment) + } + val positionChange = a.position.map { newPosition => + TableChange.updateColumnPosition(colName, newPosition) + } + createAlterTable( + nameParts, + catalog, + tbl, + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange) } - val newColumn = StructField( - a.column(0), - cleanedDataType, - nullable = true, - builder.build()) - AlterTableChangeColumnCommand(ident.asTableIdentifier, a.column(0), newColumn) - case AlterTableRenameColumn(ResolvedTable(_, _, _: V1Table), _, _) => - throw new AnalysisException("RENAME COLUMN is only supported with v2 tables.") + case AlterTableRenameColumnStatement( + nameParts @ SessionCatalogAndTable(catalog, tbl), col, newName) => + loadTable(catalog, tbl.asIdentifier).collect { + case v1Table: V1Table => + throw new AnalysisException("RENAME COLUMN is only supported with v2 tables.") + }.getOrElse { + val changes = Seq(TableChange.renameColumn(col.toArray, newName)) + createAlterTable(nameParts, catalog, tbl, changes) + } - case AlterTableDropColumns(ResolvedTable(_, _, _: V1Table), _) => - throw new AnalysisException("DROP COLUMN is only supported with v2 tables.") + case AlterTableDropColumnsStatement( + nameParts @ SessionCatalogAndTable(catalog, tbl), cols) => + loadTable(catalog, tbl.asIdentifier).collect { + case v1Table: V1Table => + throw new AnalysisException("DROP COLUMN is only supported with v2 tables.") + }.getOrElse { + val changes = cols.map(col => TableChange.deleteColumn(col.toArray)) + createAlterTable(nameParts, catalog, tbl, changes) + } - case AlterTableSetProperties(ResolvedTable(_, ident, _: V1Table), props) => - AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false) + case AlterTableSetPropertiesStatement( + nameParts @ SessionCatalogAndTable(catalog, tbl), props) => + loadTable(catalog, tbl.asIdentifier).collect { + case v1Table: V1Table => + AlterTableSetPropertiesCommand(tbl.asTableIdentifier, props, isView = false) + }.getOrElse { + val changes = props.map { case (key, value) => + TableChange.setProperty(key, value) + }.toSeq + createAlterTable(nameParts, catalog, tbl, changes) + } - case AlterTableUnsetProperties(ResolvedTable(_, ident, _: V1Table), keys, ifExists) => - AlterTableUnsetPropertiesCommand(ident.asTableIdentifier, keys, ifExists, isView = false) + case AlterTableUnsetPropertiesStatement( + nameParts @ SessionCatalogAndTable(catalog, tbl), keys, ifExists) => + loadTable(catalog, tbl.asIdentifier).collect { + case v1Table: V1Table => + AlterTableUnsetPropertiesCommand( + tbl.asTableIdentifier, keys, ifExists, isView = false) + }.getOrElse { + val changes = keys.map(key => TableChange.removeProperty(key)) + createAlterTable(nameParts, catalog, tbl, changes) + } - case AlterTableSetLocation( - ResolvedTable(_, ident, _: V1Table), partitionSpec, newLoc) => - AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, newLoc) + case AlterTableSetLocationStatement( + nameParts @ SessionCatalogAndTable(catalog, tbl), partitionSpec, newLoc) => + loadTable(catalog, tbl.asIdentifier).collect { + case v1Table: V1Table => + AlterTableSetLocationCommand(tbl.asTableIdentifier, partitionSpec, newLoc) + }.getOrElse { + if (partitionSpec.nonEmpty) { + throw new AnalysisException( + "ALTER TABLE SET LOCATION does not support partition for v2 tables.") + } + val changes = Seq(TableChange.setProperty(TableCatalog.PROP_LOCATION, newLoc)) + createAlterTable(nameParts, catalog, tbl, changes) + } // ALTER VIEW should always use v1 command if the resolved catalog is session catalog. case AlterViewSetPropertiesStatement(SessionCatalogAndTable(_, tbl), props) => @@ -140,7 +218,7 @@ class ResolveSessionCatalog( DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended) // Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet. - case DescribeRelation(ResolvedView(ident, _), partitionSpec, isExtended) => + case DescribeRelation(ResolvedView(ident), partitionSpec, isExtended) => DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended) case DescribeColumnStatement( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ab47d640f470..a92fbdf25975 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -257,6 +257,14 @@ case class AlterTableAddColumnsCommand( table: TableIdentifier): CatalogTable = { val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table) + if (catalogTable.tableType == CatalogTableType.VIEW) { + throw new AnalysisException( + s""" + |ALTER ADD COLUMNS does not support views. + |You must drop and re-create the views for adding the new columns. Views: $table + """.stripMargin) + } + if (DDLUtils.isDatasourceTable(catalogTable)) { DataSource.lookupDataSource(catalogTable.provider.get, conf). getConstructor().newInstance() match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 7c8fd4e105ca..448a4354ddd6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -241,18 +241,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case DropTable(catalog, ident, ifExists) => DropTableExec(catalog, ident, ifExists) :: Nil - case a @ AlterTableSetLocation(r: ResolvedTable, partitionSpec, _) => - if (partitionSpec.nonEmpty) { - throw new AnalysisException( - "ALTER TABLE SET LOCATION does not support partition for v2 tables.") - } - AlterTableExec(r.catalog, r.identifier, a.changes) :: Nil - - case a: AlterTable => - a.table match { - case r: ResolvedTable => AlterTableExec(r.catalog, r.identifier, a.changes) :: Nil - case _ => Nil - } + case AlterTable(catalog, ident, _, changes) => + AlterTableExec(catalog, ident, changes) :: Nil case RenameTable(catalog, oldIdent, newIdent) => RenameTableExec(catalog, oldIdent, newIdent) :: Nil diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out index bb5b4ae84d3b..82326346b361 100644 --- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out @@ -195,7 +195,7 @@ ALTER TABLE temp_view CHANGE a TYPE INT COMMENT 'this is column a' struct<> -- !query 20 output org.apache.spark.sql.AnalysisException -temp_view is a temp view not a table.; line 1 pos 0 +Invalid command: 'temp_view' is a view not a table.; line 1 pos 0 -- !query 21 @@ -212,7 +212,7 @@ ALTER TABLE global_temp.global_temp_view CHANGE a TYPE INT COMMENT 'this is colu struct<> -- !query 22 output org.apache.spark.sql.AnalysisException -global_temp.global_temp_view is a temp view not a table.; line 1 pos 0 +Invalid command: 'global_temp.global_temp_view' is a view not a table.; line 1 pos 0 -- !query 23 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 75c9bb7be05f..c19352a2267d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2201,7 +2201,7 @@ class DataSourceV2SQLSuite withTempView("v") { sql("create global temp view v as select 1") val e = intercept[AnalysisException](sql("COMMENT ON TABLE global_temp.v IS NULL")) - assert(e.getMessage.contains("global_temp.v is a temp view not a table.")) + assert(e.getMessage.contains("global_temp.v is a temp view not table.")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 4ed2506b35a8..9a393f19ce9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -145,13 +145,13 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { // For v2 ALTER TABLE statements, we have better error message saying view is not supported. assertAnalysisError( s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'", - s"$viewName is a temp view not a table") + s"'$viewName' is a view not a table") - // For the following v2 ALERT TABLE statements, relations are first resolved before - // unsupported operations are checked. + // For the following v2 ALERT TABLE statements, unsupported operations are checked first + // before resolving the relations. assertAnalysisError( s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'", - s"$viewName is a temp view not a table") + "ALTER TABLE SET LOCATION does not support partition for v2 tables") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index e3fb535ab4cd..1a9fe46bd6a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2779,7 +2779,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val e = intercept[AnalysisException] { sql("ALTER TABLE tmp_v ADD COLUMNS (c3 INT)") } - assert(e.message.contains("tmp_v is a temp view not a table")) + assert(e.message.contains("'tmp_v' is a view not a table")) } } @@ -2789,8 +2789,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val e = intercept[AnalysisException] { sql("ALTER TABLE v1 ADD COLUMNS (c3 INT)") } - assert(e.message.contains( - "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")) + assert(e.message.contains("ALTER ADD COLUMNS does not support views")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 8c73b366fa85..5aa15a453ac7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -26,7 +26,7 @@ import org.mockito.invocation.InvocationOnMock import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, Analyzer, CTESubstitution, EmptyFunctionRegistry, NoSuchTableException, ResolveCatalogs, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar, UnresolvedSubqueryColumnAliases, UnresolvedV2Relation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, StringLiteral} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser @@ -132,7 +132,6 @@ class PlanResolutionSuite extends AnalysisTest { val rules = Seq( CTESubstitution, analyzer.ResolveRelations, - analyzer.ResolveTables, new ResolveCatalogs(catalogManager), new ResolveSessionCatalog(catalogManager, conf, _ == Seq("v")), analyzer.ResolveTables, @@ -716,24 +715,24 @@ class PlanResolutionSuite extends AnalysisTest { comparePlans(parsed3, expected3) } else { parsed1 match { - case a: AlterTable => - assert(a.changes == Seq( + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( TableChange.setProperty("test", "test"), TableChange.setProperty("comment", "new_comment"))) case _ => fail("expect AlterTable") } parsed2 match { - case a: AlterTable => - assert(a.changes == Seq( + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( TableChange.removeProperty("comment"), TableChange.removeProperty("test"))) case _ => fail("expect AlterTable") } parsed3 match { - case a: AlterTable => - assert(a.changes == Seq( + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( TableChange.removeProperty("comment"), TableChange.removeProperty("test"))) case _ => fail("expect AlterTable") @@ -746,9 +745,15 @@ class PlanResolutionSuite extends AnalysisTest { val parsed4 = parseAndResolve(sql4) val parsed5 = parseAndResolve(sql5) - // For non-existing tables, we expect `UnresolvedTable` in the resolved plan. - assert(parsed4.collect{ case u: UnresolvedTable => u }.length == 1) - assert(parsed5.collect{ case u: UnresolvedTable => u }.length == 1) + // For non-existing tables, we convert it to v2 command with `UnresolvedV2Table` + parsed4 match { + case AlterTable(_, _, _: UnresolvedV2Relation, _) => // OK + case _ => fail("Expect AlterTable, but got:\n" + parsed4.treeString) + } + parsed5 match { + case AlterTable(_, _, _: UnresolvedV2Relation, _) => // OK + case _ => fail("Expect AlterTable, but got:\n" + parsed5.treeString) + } } test("support for other types in TBLPROPERTIES") { @@ -769,8 +774,8 @@ class PlanResolutionSuite extends AnalysisTest { comparePlans(parsed, expected) } else { parsed match { - case a: AlterTable => - assert(a.changes == Seq( + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( TableChange.setProperty("a", "1"), TableChange.setProperty("b", "0.1"), TableChange.setProperty("c", "true"))) @@ -793,8 +798,8 @@ class PlanResolutionSuite extends AnalysisTest { comparePlans(parsed, expected) } else { parsed match { - case a: AlterTable => - assert(a.changes == Seq(TableChange.setProperty("location", "new location"))) + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq(TableChange.setProperty("location", "new location"))) case _ => fail("Expect AlterTable, but got:\n" + parsed.treeString) } } @@ -1035,23 +1040,23 @@ class PlanResolutionSuite extends AnalysisTest { val parsed3 = parseAndResolve(sql3) parsed1 match { - case a: AlterTable => - assert(a.changes == Seq( + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( TableChange.updateColumnType(Array("i"), LongType))) case _ => fail("expect AlterTable") } parsed2 match { - case a: AlterTable => - assert(a.changes == Seq( + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( TableChange.updateColumnType(Array("i"), LongType), TableChange.updateColumnComment(Array("i"), "new comment"))) case _ => fail("expect AlterTable") } parsed3 match { - case a: AlterTable => - assert(a.changes == Seq( + case AlterTable(_, _, _: DataSourceV2Relation, changes) => + assert(changes == Seq( TableChange.updateColumnComment(Array("i"), "new comment"))) case _ => fail("expect AlterTable") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index 9b12ac1d79e7..dbbf2b29fe8b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -158,7 +158,7 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto val message = intercept[AnalysisException] { sql("SHOW TBLPROPERTIES parquet_temp") }.getMessage - assert(message.contains("parquet_temp is a temp view not a table")) + assert(message.contains("parquet_temp is a temp view not table")) } }