Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN bad_column DROP NOT NULL")
}.getMessage
assert(msg.contains("Cannot update missing field bad_column"))
assert(msg.contains("Missing field bad_column"))
}

def testRenameColumn(tbl: String): Unit = {
Expand Down Expand Up @@ -103,8 +103,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val msg = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.alt_table DROP COLUMN bad_column")
}.getMessage
assert(
msg.contains(s"Cannot delete missing field bad_column in $catalogName.alt_table schema"))
assert(msg.contains(s"Missing field bad_column in table $catalogName.alt_table"))
}
// Drop a column from a not existing table
val msg = intercept[AnalysisException] {
Expand All @@ -120,7 +119,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val msg2 = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.alt_table ALTER COLUMN bad_column TYPE DOUBLE")
}.getMessage
assert(msg2.contains("Cannot update missing field bad_column"))
assert(msg2.contains("Missing field bad_column"))
}
// Update column type in not existing table
val msg = intercept[AnalysisException] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
ResolveAlterTableCommands ::
AddMetadataColumns ::
DeduplicateRelations ::
ResolveReferences ::
Expand Down Expand Up @@ -310,7 +311,6 @@ class Analyzer(override val catalogManager: CatalogManager)
Batch("Post-Hoc Resolution", Once,
Seq(ResolveCommandsWithIfExists) ++
postHocResolutionRules: _*),
Batch("Normalize Alter Table Commands", Once, ResolveAlterTableCommands),
Batch("Normalize Alter Table", Once, ResolveAlterTableChanges),
Batch("Remove Unresolved Hints", Once,
new ResolveHints.RemoveAllHints),
Expand Down Expand Up @@ -3577,46 +3577,49 @@ class Analyzer(override val catalogManager: CatalogManager)
*/
object ResolveAlterTableCommands extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a: AlterTableCommand if a.table.resolved =>
case a: AlterTableCommand if a.table.resolved && hasUnresolvedFieldName(a) =>
val table = a.table.asInstanceOf[ResolvedTable]
val transformed = a.transformExpressions {
case u: UnresolvedFieldName =>
resolveFieldNames(table.schema, u.name).getOrElse(u)
case u: UnresolvedFieldPosition => u.position match {
case after: After =>
resolveFieldNames(table.schema, u.fieldName.init :+ after.column())
.map { resolved =>
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
}.getOrElse(u)
case _ => ResolvedFieldPosition(u.position)
}
a.transformExpressions {
case u: UnresolvedFieldName => resolveFieldNames(table, u.name, u)
}

transformed match {
case alter @ AlterTableAlterColumn(
_: ResolvedTable, ResolvedFieldName(_, field), Some(dataType), _, _, _) =>
// Hive style syntax provides the column type, even if it may not have changed.
val dt = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (dt == dataType) {
// The user didn't want the field to change, so remove this change.
alter.copy(dataType = None)
} else {
alter
}
case a @ AlterTableAlterColumn(
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position) =>
val newDataType = dataType.flatMap { dt =>
// Hive style syntax provides the column type, even if it may not have changed.
val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (existing == dt) None else Some(dt)
}
val newPosition = position map {
case u @ UnresolvedFieldPosition(after: After) =>
// TODO: since the field name is already resolved, it's more efficient if
// `ResolvedFieldName` carries the parent struct and we resolve column position
// based on the parent struct, instead of re-resolving the entire column path.
val resolved = resolveFieldNames(table, path :+ after.column(), u)
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
case u: UnresolvedFieldPosition => ResolvedFieldPosition(u.position)
case other => other
}
val resolved = a.copy(dataType = newDataType, position = newPosition)
resolved.copyTagsFrom(a)
resolved
}

/**
* Returns the resolved field name if the field can be resolved, returns None if the column is
* not found. An error will be thrown in CheckAnalysis for columns that can't be resolved.
*/
private def resolveFieldNames(
schema: StructType,
fieldNames: Seq[String]): Option[ResolvedFieldName] = {
val fieldOpt = schema.findNestedField(
fieldNames, includeCollections = true, conf.resolver)
fieldOpt.map { case (path, field) => ResolvedFieldName(path, field) }
table: ResolvedTable,
fieldName: Seq[String],
context: Expression): ResolvedFieldName = {
table.schema.findNestedField(fieldName, includeCollections = true, conf.resolver).map {
case (path, field) => ResolvedFieldName(path, field)
}.getOrElse(throw QueryCompilationErrors.missingFieldError(fieldName, table, context))
}

private def hasUnresolvedFieldName(a: AlterTableCommand): Boolean = {
a.expressions.exists(_.find(_.isInstanceOf[UnresolvedFieldName]).isDefined)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,43 +450,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))

case alter: AlterTableCommand if alter.table.resolved =>
val table = alter.table.asInstanceOf[ResolvedTable]
def findField(fieldName: Seq[String]): StructField = {
// Include collections because structs nested in maps and arrays may be altered.
val field = table.schema.findNestedField(fieldName, includeCollections = true)
if (field.isEmpty) {
alter.failAnalysis(s"Cannot ${alter.operation} missing field ${fieldName.quoted} " +
s"in ${table.name} schema: ${table.schema.treeString}")
}
field.get._2
}
def findParentStruct(fieldNames: Seq[String]): StructType = {
val parent = fieldNames.init
val field = if (parent.nonEmpty) {
findField(parent).dataType
} else {
table.schema
}
field match {
case s: StructType => s
case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldNames.quoted}, " +
s"because its parent is not a StructType. Found $o")
}
}
alter.transformExpressions {
case UnresolvedFieldName(name) =>
alter.failAnalysis(s"Cannot ${alter.operation} missing field ${name.quoted} in " +
s"${table.name} schema: ${table.schema.treeString}")
case UnresolvedFieldPosition(fieldName, position: After) =>
val parent = findParentStruct(fieldName)
val allFields = parent match {
case s: StructType => s.fieldNames
case o => alter.failAnalysis(s"Cannot ${alter.operation} ${fieldName.quoted}, " +
s"because its parent is not a StructType. Found $o")
}
alter.failAnalysis(s"Couldn't resolve positional argument $position amongst " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnresolvedFieldPosition was carrying fieldName to reconstruct this message (no need to match by commands). If this is not needed anymore, we can drop fieldName. Thanks for cleaning this up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message can refer to a position in the SQL string, which should provide sufficient context. The missing top-level column also uses a simple error message.

s"${allFields.mkString("[", ", ", "]")}")
}
checkAlterTableCommand(alter)

case alter: AlterTable if alter.table.resolved =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,7 @@ sealed trait FieldPosition extends LeafExpression with Unevaluable {
"FieldPosition.nullable should not be called.")
}

case class UnresolvedFieldPosition(
fieldName: Seq[String],
position: ColumnPosition) extends FieldPosition {
case class UnresolvedFieldPosition(position: ColumnPosition) extends FieldPosition {
override lazy val resolved = false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3553,7 +3553,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ctx: RenameTableColumnContext): LogicalPlan = withOrigin(ctx) {
AlterTableRenameColumn(
createUnresolvedTable(ctx.table, "ALTER TABLE ... RENAME COLUMN"),
UnresolvedFieldName(ctx.from.parts.asScala.map(_.getText).toSeq),
UnresolvedFieldName(typedVisit[Seq[String]](ctx.from)),
ctx.to.getText)
}

Expand All @@ -3579,7 +3579,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
s"ALTER TABLE table $verb COLUMN requires a TYPE, a SET/DROP, a COMMENT, or a FIRST/AFTER",
ctx)
}
val columnNameParts = typedVisit[Seq[String]](ctx.column)
val dataType = if (action.dataType != null) {
Some(typedVisit[DataType](action.dataType))
} else {
Expand All @@ -3599,7 +3598,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
None
}
val position = if (action.colPosition != null) {
Some(UnresolvedFieldPosition(columnNameParts, typedVisit[ColumnPosition](action.colPosition)))
Some(UnresolvedFieldPosition(typedVisit[ColumnPosition](action.colPosition)))
} else {
None
}
Expand All @@ -3608,7 +3607,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg

AlterTableAlterColumn(
createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
UnresolvedFieldName(columnNameParts),
UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)),
dataType = dataType,
nullable = nullable,
comment = comment,
Expand Down Expand Up @@ -3647,7 +3646,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
nullable = None,
comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec),
position = Option(ctx.colPosition).map(
pos => UnresolvedFieldPosition(columnNameParts, typedVisit[ColumnPosition](pos))))
pos => UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))))
}

override def visitHiveReplaceColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1983,4 +1983,18 @@ private[spark] object QueryCompilationErrors {
new AnalysisException(
s"class $className doesn't implement interface UserDefinedAggregateFunction")
}

def missingFieldError(
fieldName: Seq[String], table: ResolvedTable, context: Expression): Throwable = {
throw new AnalysisException(
s"Missing field ${fieldName.quoted} in table ${table.name} with schema:\n" +
table.schema.treeString,
context.origin.line,
context.origin.startPosition)
}

def invalidFieldName(fieldName: Seq[String], path: Seq[String]): Throwable = {
new AnalysisException(
s"Field name ${fieldName.quoted} is invalid, ${path.quoted} is not a struct.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
findField(struct, names, normalizedPath ++ Seq(field.name, "element"))

case _ =>
None
throw QueryCompilationErrors.invalidFieldName(fieldNames, normalizedPath)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ class DDLParserSuite extends AnalysisTest {
None,
None,
None,
Some(UnresolvedFieldPosition(Seq("a", "b", "c"), first()))))
Some(UnresolvedFieldPosition(first()))))
}

test("alter table: multiple property changes are not allowed") {
Expand Down Expand Up @@ -1043,7 +1043,7 @@ class DDLParserSuite extends AnalysisTest {
Some(IntegerType),
None,
None,
Some(UnresolvedFieldPosition(Seq("a", "b", "c"), after("other_col")))))
Some(UnresolvedFieldPosition(after("other_col")))))

// renaming column not supported in hive style ALTER COLUMN.
intercept("ALTER TABLE table_name CHANGE COLUMN a.b.c new_name INT",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,12 @@ ALTER TABLE test_change CHANGE invalid_col TYPE INT
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Can't find column `invalid_col` given table data columns [`a`, `b`, `c`]
Missing field invalid_col in table spark_catalog.default.test_change with schema:
root
|-- a: integer (nullable = true)
|-- b: string (nullable = true)
|-- c: integer (nullable = true)
; line 1 pos 0


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,8 +616,7 @@ trait AlterTableTests extends SharedSparkSession {
sql(s"ALTER TABLE $t ALTER COLUMN data TYPE string")
}

assert(exc.getMessage.contains("data"))
assert(exc.getMessage.contains("missing field"))
assert(exc.getMessage.contains("Missing field data"))
}
}

Expand All @@ -630,8 +629,7 @@ trait AlterTableTests extends SharedSparkSession {
sql(s"ALTER TABLE $t ALTER COLUMN point.x TYPE double")
}

assert(exc.getMessage.contains("point.x"))
assert(exc.getMessage.contains("missing field"))
assert(exc.getMessage.contains("Missing field point.x"))
}
}

Expand Down Expand Up @@ -689,7 +687,7 @@ trait AlterTableTests extends SharedSparkSession {

val e1 = intercept[AnalysisException](
sql(s"ALTER TABLE $t ALTER COLUMN b AFTER non_exist"))
assert(e1.getMessage.contains("Couldn't resolve positional argument"))
assert(e1.getMessage.contains("Missing field non_exist"))

sql(s"ALTER TABLE $t ALTER COLUMN point.y FIRST")
assert(getTableMetadata(tableName).schema == new StructType()
Expand All @@ -711,7 +709,7 @@ trait AlterTableTests extends SharedSparkSession {

val e2 = intercept[AnalysisException](
sql(s"ALTER TABLE $t ALTER COLUMN point.y AFTER non_exist"))
assert(e2.getMessage.contains("Couldn't resolve positional argument"))
assert(e2.getMessage.contains("Missing field point.non_exist"))

// `AlterTable.resolved` checks column existence.
intercept[AnalysisException](
Expand Down Expand Up @@ -802,8 +800,7 @@ trait AlterTableTests extends SharedSparkSession {
sql(s"ALTER TABLE $t ALTER COLUMN data COMMENT 'doc'")
}

assert(exc.getMessage.contains("data"))
assert(exc.getMessage.contains("missing field"))
assert(exc.getMessage.contains("Missing field data"))
}
}

Expand All @@ -816,8 +813,7 @@ trait AlterTableTests extends SharedSparkSession {
sql(s"ALTER TABLE $t ALTER COLUMN point.x COMMENT 'doc'")
}

assert(exc.getMessage.contains("point.x"))
assert(exc.getMessage.contains("missing field"))
assert(exc.getMessage.contains("Missing field point.x"))
}
}

Expand Down Expand Up @@ -918,8 +914,7 @@ trait AlterTableTests extends SharedSparkSession {
sql(s"ALTER TABLE $t RENAME COLUMN data TO some_string")
}

assert(exc.getMessage.contains("data"))
assert(exc.getMessage.contains("missing field"))
assert(exc.getMessage.contains("Missing field data"))
}
}

Expand All @@ -932,8 +927,7 @@ trait AlterTableTests extends SharedSparkSession {
sql(s"ALTER TABLE $t RENAME COLUMN point.x TO z")
}

assert(exc.getMessage.contains("point.x"))
assert(exc.getMessage.contains("missing field"))
assert(exc.getMessage.contains("Missing field point.x"))
}
}

Expand Down Expand Up @@ -1063,8 +1057,7 @@ trait AlterTableTests extends SharedSparkSession {
sql(s"ALTER TABLE $t DROP COLUMN data")
}

assert(exc.getMessage.contains("data"))
assert(exc.getMessage.contains("missing field"))
assert(exc.getMessage.contains("Missing field data"))
}
}

Expand All @@ -1077,8 +1070,7 @@ trait AlterTableTests extends SharedSparkSession {
sql(s"ALTER TABLE $t DROP COLUMN point.x")
}

assert(exc.getMessage.contains("point.x"))
assert(exc.getMessage.contains("missing field"))
assert(exc.getMessage.contains("Missing field point.x"))
}
}

Expand Down
Loading