Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Post-Hoc Resolution", Once,
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove ResolveAlterTableChanges once all the alter table commands are migrated to ResolveAlterTableCommands.

Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints),
Expand Down Expand Up @@ -3515,6 +3516,33 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

/**
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
* for alter table commands.
*/
object ResolveFieldNames extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a: AlterTableCommand if a.table.resolved =>
a.transformExpressions {
case u: UnresolvedFieldName =>
val table = a.table.asInstanceOf[ResolvedTable]
resolveFieldNames(table.schema, u.name).map(ResolvedFieldName(_)).getOrElse(u)
}
}

/**
* Returns the resolved field name if the field can be resolved, returns None if the column is
* not found. An error will be thrown in CheckAnalysis for columns that can't be resolved.
*/
private def resolveFieldNames(
schema: StructType,
fieldNames: Seq[String]): Option[Seq[String]] = {
val fieldOpt = schema.findNestedField(
fieldNames, includeCollections = true, conf.resolver)
fieldOpt.map { case (path, field) => path :+ field.name }
}
}

/** Rule to mostly resolve, normalize and rewrite column names based on case sensitivity. */
object ResolveAlterTableChanges extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,15 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
case write: V2WriteCommand if write.resolved =>
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))

case alter: AlterTableCommand if alter.table.resolved =>
alter.transformExpressions {
case u: UnresolvedFieldName =>
val table = alter.table.asInstanceOf[ResolvedTable]
alter.failAnalysis(
s"Cannot ${alter.operation} missing field ${u.name.quoted} in ${table.name} " +
s"schema: ${table.schema.treeString}")
}

case alter: AlterTable if alter.table.resolved =>
val table = alter.table
def findField(operation: String, fieldName: Array[String]): StructField = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes)

case AlterTableDropColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ case class UnresolvedPartitionSpec(
override lazy val resolved = false
}

sealed trait FieldName extends LeafExpression with Unevaluable {
def name: Seq[String]
override def dataType: DataType = throw new IllegalStateException(
"UnresolvedFieldName.dataType should not be called.")
override def nullable: Boolean = throw new IllegalStateException(
"UnresolvedFieldName.nullable should not be called.")
}

case class UnresolvedFieldName(name: Seq[String]) extends FieldName {
override lazy val resolved = false
}

/**
* Holds the name of a function that has yet to be looked up in a catalog. It will be resolved to
* [[ResolvedFunc]] during analysis.
Expand Down Expand Up @@ -138,6 +150,8 @@ case class ResolvedPartitionSpec(
ident: InternalRow,
location: Option[String] = None) extends PartitionSpec

case class ResolvedFieldName(name: Seq[String]) extends FieldName

/**
* A plan containing resolved (temp) views.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3661,7 +3661,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}

/**
* Parse a [[AlterTableDropColumnsStatement]] command.
* Parse a [[AlterTableDropColumns]] command.
*
* For example:
* {{{
Expand All @@ -3672,9 +3672,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
override def visitDropTableColumns(
ctx: DropTableColumnsContext): LogicalPlan = withOrigin(ctx) {
val columnsToDrop = ctx.columns.multipartIdentifier.asScala.map(typedVisit[Seq[String]])
AlterTableDropColumnsStatement(
visitMultipartIdentifier(ctx.multipartIdentifier),
columnsToDrop.toSeq)
AlterTableDropColumns(
createUnresolvedTable(
ctx.multipartIdentifier,
"ALTER TABLE ... DROP COLUMNS"),
columnsToDrop.map(UnresolvedFieldName(_)).toSeq)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,6 @@ case class AlterTableRenameColumnStatement(
column: Seq[String],
newName: String) extends LeafParsedStatement

/**
* ALTER TABLE ... DROP COLUMNS command, as parsed from SQL.
*/
case class AlterTableDropColumnsStatement(
tableName: Seq[String],
columnsToDrop: Seq[Seq[String]]) extends LeafParsedStatement

/**
* An INSERT INTO statement, as parsed from SQL.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, UnresolvedException}
import org.apache.spark.sql.catalyst.analysis.{FieldName, NamedRelation, PartitionSpec, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
Expand Down Expand Up @@ -1098,3 +1098,29 @@ case class UnsetTableProperties(
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}

trait AlterTableCommand extends UnaryCommand {
def table: LogicalPlan
def operation: String
def changes: Seq[TableChange]
override def child: LogicalPlan = table
}

/**
* The logical plan of the ALTER TABLE ... DROP COLUMNS command.
*/
case class AlterTableDropColumns(
table: LogicalPlan,
columnsToDrop: Seq[FieldName]) extends AlterTableCommand {
override def operation: String = "delete"

override def changes: Seq[TableChange] = {
columnsToDrop.map { col =>
require(col.resolved, "FieldName should be resolved before it's converted to TableChange.")
TableChange.deleteColumn(col.name.toArray)
}
}

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(table = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -992,17 +992,21 @@ class DDLParserSuite extends AnalysisTest {
test("alter table: drop column") {
comparePlans(
parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),
AlterTableDropColumnsStatement(Seq("table_name"), Seq(Seq("a", "b", "c"))))
AlterTableDropColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
Seq(UnresolvedFieldName(Seq("a", "b", "c")))))
}

test("alter table: drop multiple columns") {
val sql = "ALTER TABLE table_name DROP COLUMN x, y, a.b.c"
Seq(sql, sql.replace("COLUMN", "COLUMNS")).foreach { drop =>
comparePlans(
parsePlan(drop),
AlterTableDropColumnsStatement(
Seq("table_name"),
Seq(Seq("x"), Seq("y"), Seq("a", "b", "c"))))
AlterTableDropColumns(
UnresolvedTable(Seq("table_name"), "ALTER TABLE ... DROP COLUMNS", None),
Seq(UnresolvedFieldName(Seq("x")),
UnresolvedFieldName(Seq("y")),
UnresolvedFieldName(Seq("a", "b", "c")))))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
createAlterTable(nameParts, catalog, tbl, changes)
}

case AlterTableDropColumnsStatement(
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
loadTable(catalog, tbl.asIdentifier).collect {
case v1Table: V1Table =>
throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError
}.getOrElse {
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)
}
case AlterTableDropColumns(ResolvedV1TableIdentifier(_), _) =>
throw QueryCompilationErrors.dropColumnOnlySupportedWithV2TableError

case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
val changes = keys.map(key => TableChange.removeProperty(key))
AlterTableExec(table.catalog, table.identifier, changes) :: Nil

case a: AlterTableCommand if a.table.resolved =>
val table = a.table.asInstanceOf[ResolvedTable]
AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil

case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName DROP COLUMN bad_column")
}.getMessage
assert(msg.contains("Cannot delete missing field bad_column in test.alt_table schema"))
assert(msg.contains("Cannot delete missing field bad_column in h2.test.alt_table schema"))
}
// Drop a column to not existing table and namespace
Seq("h2.test.not_existing_table", "h2.bad_test.not_existing_table").foreach { table =>
Expand Down Expand Up @@ -362,7 +362,7 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession {
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $tableName DROP COLUMN C3")
}.getMessage
assert(msg.contains("Cannot delete missing field C3 in test.alt_table schema"))
assert(msg.contains("Cannot delete missing field C3 in h2.test.alt_table schema"))
}

withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
Expand Down