Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn}
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction}
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction.MAGIC_METHOD_NAME
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
Expand Down Expand Up @@ -299,7 +299,7 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Post-Hoc Resolution", Once,
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table Field Names", Once, ResolveFieldNames),
Batch("Normalize Alter Table Commands", Once, ResolveAlterTableCommands),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints),
Expand Down Expand Up @@ -3527,13 +3527,35 @@ class Analyzer(override val catalogManager: CatalogManager)
* Rule to mostly resolve, normalize and rewrite column names based on case sensitivity
* for alter table commands.
*/
object ResolveFieldNames extends Rule[LogicalPlan] {
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a: AlterTableCommand if a.table.resolved =>
a.transformExpressions {
val table = a.table.asInstanceOf[ResolvedTable]
val transformed = a.transformExpressions {
case u: UnresolvedFieldName =>
val table = a.table.asInstanceOf[ResolvedTable]
resolveFieldNames(table.schema, u.name).map(ResolvedFieldName(_)).getOrElse(u)
resolveFieldNames(table.schema, u.name).getOrElse(u)
case u: UnresolvedFieldPosition => u.position match {
case after: After =>
resolveFieldNames(table.schema, u.fieldName.init :+ after.column())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to put fieldName in UnresolvedFieldPosition? We can easily get it via AlterTableAlterColumn.column.name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map { resolved =>
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
}.getOrElse(u)
case _ => ResolvedFieldPosition(u.position)
}
}

transformed match {
case alter @ AlterTableAlterColumn(
_: ResolvedTable, ResolvedFieldName(_, field), Some(dataType), _, _, _) =>
// Hive style syntax provides the column type, even if it may not have changed.
val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (dt == dataType) {
// The user didn't want the field to change, so remove this change.
alter.copy(dataType = None)
} else {
alter
}
case other => other
}
}

Expand All @@ -3543,10 +3565,10 @@ class Analyzer(override val catalogManager: CatalogManager)
*/
private def resolveFieldNames(
schema: StructType,
fieldNames: Seq[String]): Option[Seq[String]] = {
fieldNames: Seq[String]): Option[ResolvedFieldName] = {
val fieldOpt = schema.findNestedField(
fieldNames, includeCollections = true, conf.resolver)
fieldOpt.map { case (path, field) => path :+ field.name }
fieldOpt.map { case (path, field) => ResolvedFieldName(path, field) }
}
}

Expand Down Expand Up @@ -3598,68 +3620,6 @@ class Analyzer(override val catalogManager: CatalogManager)
Some(addColumn(schema, "root", Nil))
}

case typeChange: UpdateColumnType =>
// Hive style syntax provides the column type, even if it may not have changed
val fieldOpt = schema.findNestedField(
typeChange.fieldNames(), includeCollections = true, conf.resolver)

if (fieldOpt.isEmpty) {
// We couldn't resolve the field. Leave it to CheckAnalysis
Some(typeChange)
} else {
val (fieldNames, field) = fieldOpt.get
val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (dt == typeChange.newDataType()) {
// The user didn't want the field to change, so remove this change
None
} else {
Some(TableChange.updateColumnType(
(fieldNames :+ field.name).toArray, typeChange.newDataType()))
}
}
case n: UpdateColumnNullability =>
// Need to resolve column
resolveFieldNames(
schema,
n.fieldNames(),
TableChange.updateColumnNullability(_, n.nullable())).orElse(Some(n))

case position: UpdateColumnPosition =>
position.position() match {
case after: After =>
// Need to resolve column as well as position reference
val fieldOpt = schema.findNestedField(
position.fieldNames(), includeCollections = true, conf.resolver)

if (fieldOpt.isEmpty) {
Some(position)
} else {
val (normalizedPath, field) = fieldOpt.get
val targetCol = schema.findNestedField(
normalizedPath :+ after.column(), includeCollections = true, conf.resolver)
if (targetCol.isEmpty) {
// Leave unchanged to CheckAnalysis
Some(position)
} else {
Some(TableChange.updateColumnPosition(
(normalizedPath :+ field.name).toArray,
ColumnPosition.after(targetCol.get._2.name)))
}
}
case _ =>
// Need to resolve column
resolveFieldNames(
schema,
position.fieldNames(),
TableChange.updateColumnPosition(_, position.position())).orElse(Some(position))
}

case comment: UpdateColumnComment =>
resolveFieldNames(
schema,
comment.fieldNames(),
TableChange.updateColumnComment(_, comment.newComment())).orElse(Some(comment))

case delete: DeleteColumn =>
resolveFieldNames(schema, delete.fieldNames(), TableChange.deleteColumn)
.orElse(Some(delete))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -444,12 +444,42 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))

case alter: AlterTableCommand if alter.table.resolved =>
val table = alter.table.asInstanceOf[ResolvedTable]
def findField(fieldName: Seq[String]): StructField = {
// Include collections because structs nested in maps and arrays may be altered.
val field = table.schema.findNestedField(fieldName, includeCollections = true)
if (field.isEmpty) {
alter.failAnalysis(s"Cannot ${alter.operation} missing field ${fieldName.quoted} " +
s"in ${table.name} schema: ${table.schema.treeString}")
}
field.get._2
}
def findParentStruct(fieldNames: Seq[String]): StructType = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findField and findParentStruct are existing functions in CheckAnalysis with a slight modification (e.g., not passing operation argument). The existing functions will be removed once all alter table commands are migrated.

val parent = fieldNames.init
val field = if (parent.nonEmpty) {
findField(parent).dataType
} else {
table.schema
}
field match {
case s: StructType => s
case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldNames.quoted}, " +
s"because its parent is not a StructType. Found $o")
}
}
alter.transformExpressions {
case u: UnresolvedFieldName =>
val table = alter.table.asInstanceOf[ResolvedTable]
alter.failAnalysis(
s"Cannot ${alter.operation} missing field ${u.name.quoted} in ${table.name} " +
s"schema: ${table.schema.treeString}")
case UnresolvedFieldName(name) =>
alter.failAnalysis(s"Cannot ${alter.operation} missing field ${name.quoted} in " +
s"${table.name} schema: ${table.schema.treeString}")
case UnresolvedFieldPosition(fieldName, position: After) =>
val parent = findParentStruct(fieldName)
val allFields = parent match {
case s: StructType => s.fieldNames
case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldName.quoted}, " +
s"because its parent is not a StructType. Found $o")
}
alter.failAnalysis(s"Couldn't resolve positional argument $position amongst " +
s"${allFields.mkString("[", ", ", "]")}")
}
checkAlterTableCommand(alter)

Expand Down Expand Up @@ -522,66 +552,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
positionArgumentExists(add.position(), parent, fieldsAdded)
TypeUtils.failWithIntervalType(add.dataType())
colsToAdd(parentName) = fieldsAdded :+ add.fieldNames().last
case update: UpdateColumnType =>
val field = {
val f = findField("update", update.fieldNames)
CharVarcharUtils.getRawType(f.metadata)
.map(dt => f.copy(dataType = dt))
.getOrElse(f)
}
val fieldName = update.fieldNames.quoted
update.newDataType match {
case _: StructType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a struct by updating its fields")
case _: MapType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a map by updating $fieldName.key or $fieldName.value")
case _: ArrayType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update the element by updating $fieldName.element")
case u: UserDefinedType[_] =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a UserDefinedType[${u.sql}] by updating its fields")
case _: CalendarIntervalType | _: YearMonthIntervalType |
_: DayTimeIntervalType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName to " +
s"interval type")
case _ =>
// update is okay
}

// We don't need to handle nested types here which shall fail before
Comment on lines -533 to -554
Copy link
Contributor Author

@imback82 imback82 Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are moved below to checkAlterTableCommand.

def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
}

if (!canAlterColumnType(field.dataType, update.newDataType)) {
alter.failAnalysis(
s"Cannot update ${table.name} field $fieldName: " +
s"${field.dataType.simpleString} cannot be cast to " +
s"${update.newDataType.simpleString}")
}
case update: UpdateColumnNullability =>
val field = findField("update", update.fieldNames)
val fieldName = update.fieldNames.quoted
if (!update.nullable && field.nullable) {
alter.failAnalysis(
s"Cannot change nullable column to non-nullable: $fieldName")
}
case updatePos: UpdateColumnPosition =>
findField("update", updatePos.fieldNames)
val parent = findParentStruct("update", updatePos.fieldNames())
val parentName = updatePos.fieldNames().init
positionArgumentExists(
updatePos.position(),
parent,
colsToAdd.getOrElse(parentName, Nil))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should consider colsToAdd since the CHANGE COLUMN command doesn't add a new column. So, in this PR, I didn't consider new columns added. Please let me know if I understood this wrong.

case update: UpdateColumnComment =>
findField("update", update.fieldNames)
case delete: DeleteColumn =>
findField("delete", delete.fieldNames)
// REPLACE COLUMNS has deletes followed by adds. Remember the deleted columns
Expand Down Expand Up @@ -1088,8 +1058,51 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
}

alter match {
case AlterTableRenameColumn(table: ResolvedTable, ResolvedFieldName(name), newName) =>
checkColumnNotExists(name.init :+ newName, table.schema)
case AlterTableRenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
checkColumnNotExists(col.path :+ newName, table.schema)
case a @ AlterTableAlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _) =>
val fieldName = col.name.quoted
if (a.dataType.isDefined) {
val field = CharVarcharUtils.getRawType(col.field.metadata)
.map(dt => col.field.copy(dataType = dt))
.getOrElse(col.field)
val newDataType = a.dataType.get
newDataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR: I think there is a small bug here, if the data type is not changed, we shouldn't fail even if the new data type is struct/array/map. @imback82 can you help to fix it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this is OK. In the analzer, we will set the newDataType to None if it's the same with the existing data type in the table.

case _: StructType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
"update a struct by updating its fields")
case _: MapType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a map by updating $fieldName.key or $fieldName.value")
case _: ArrayType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update the element by updating $fieldName.element")
case u: UserDefinedType[_] =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName type: " +
s"update a UserDefinedType[${u.sql}] by updating its fields")
case _: CalendarIntervalType | _: YearMonthIntervalType | _: DayTimeIntervalType =>
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName to interval type")
case _ => // update is okay
}

// We don't need to handle nested types here which shall fail before.
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
}

if (!canAlterColumnType(field.dataType, newDataType)) {
alter.failAnalysis(s"Cannot update ${table.name} field $fieldName: " +
s"${field.dataType.simpleString} cannot be cast to ${newDataType.simpleString}")
}
}
if (a.nullable.isDefined) {
if (!a.nullable.get && col.field.nullable) {
alter.failAnalysis(s"Cannot change nullable column to non-nullable: $fieldName")
}
}
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
}
createAlterTable(nameParts, catalog, tbl, changes)

case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
Expand Down
Loading