Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1036,14 +1036,6 @@ class Analyzer(override val catalogManager: CatalogManager)
}.getOrElse(write)
case _ => write
}

case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => alter.copy(table = rel))
.getOrElse(alter)

case u: UnresolvedV2Relation =>
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
}

/**
Expand Down Expand Up @@ -3516,15 +3508,16 @@ class Analyzer(override val catalogManager: CatalogManager)
/** Rule to mostly resolve, normalize and rewrite column names based on case sensitivity. */
object ResolveAlterTableChanges extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case a @ AlterTable(_, _, t: NamedRelation, changes) if t.resolved =>
case a: AlterTable if a.table.resolved =>
// 'colsToAdd' keeps track of new columns being added. It stores a mapping from a
// normalized parent name of fields to field names that belong to the parent.
// For example, if we add columns "a.b.c", "a.b.d", and "a.c", 'colsToAdd' will become
// Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]
val schema = t.schema
val normalizedChanges = changes.flatMap {
val schema = a.table.schema
val normalizedChanges = a.changes.flatMap {
case add: AddColumn =>
CatalogV2Util.failNullType(add.dataType)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from ResolveSessionCatalog.scala.

def addColumn(
parentSchema: StructType,
parentName: String,
Expand Down Expand Up @@ -3562,6 +3555,7 @@ class Analyzer(override val catalogManager: CatalogManager)
}

case typeChange: UpdateColumnType =>
CatalogV2Util.failNullType(typeChange.newDataType)
// Hive style syntax provides the column type, even if it may not have changed
val fieldOpt = schema.findNestedField(
typeChange.fieldNames(), includeCollections = true, conf.resolver)
Expand Down Expand Up @@ -3639,7 +3633,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case other => Some(other)
}

a.copy(changes = normalizedChanges)
a.withNewChanges(normalizedChanges)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
write.table.failAnalysis(s"Table or view not found: ${tblName.quoted}")

case u: UnresolvedV2Relation if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case u: UnresolvedV2Relation =>
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case AlterTable(_, _, u: UnresolvedV2Relation, _) if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case command: V2PartitionCommand =>
command.table match {
case r @ ResolvedTable(_, _, table, _) => table match {
Expand Down Expand Up @@ -441,7 +427,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))

case alter: AlterTable if alter.table.resolved =>
val table = alter.table
val table = alter.table.asInstanceOf[ResolvedTable]
def findField(operation: String, fieldName: Array[String]): StructField = {
// include collections because structs nested in maps and arrays may be altered
val field = table.schema.findNestedField(fieldName, includeCollections = true)
Expand Down Expand Up @@ -496,11 +482,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
// Map(Seq("a", "b") -> Seq("c", "d"), Seq("a") -> Seq("c")).
val colsToAdd = mutable.Map.empty[Seq[String], Seq[String]]

val isReplacingCols = alter.isInstanceOf[AlterTableReplaceColumns]
alter.changes.foreach {
case add: AddColumn =>
// If a column to add is a part of columns to delete, we don't need to check
// if column already exists - applies to REPLACE COLUMNS scenario.
if (!colsToDelete.contains(add.fieldNames())) {
// REPLACE COLUMNS deletes all the existing columns, thus we don't need
// to check if a column already exists if we are replacing columns.
// Note that columns to delete are added in DataSourceV2Strategy.
if (!isReplacingCols) {
checkColumnNotExists("add", add.fieldNames(), table.schema)
}
val parent = findParentStruct("add", add.fieldNames())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog}

/**
* Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
Expand All @@ -31,73 +31,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
import org.apache.spark.sql.connector.catalog.CatalogV2Util._

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case AlterTableAddColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
createAlterTable(nameParts, catalog, tbl, changes)

case AlterTableReplaceColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
cols.foreach(c => failNullType(c.dataType))
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
case Some(table) =>
// REPLACE COLUMNS deletes all the existing columns and adds new columns specified.
val deleteChanges = table.schema.fieldNames.map { name =>
TableChange.deleteColumn(Array(name))
}
val addChanges = cols.map { col =>
TableChange.addColumn(
col.name.toArray,
col.dataType,
col.nullable,
col.comment.orNull,
col.position.orNull)
}
deleteChanges ++ addChanges
case None => Seq()
}
createAlterTable(nameParts, catalog, tbl, changes)

case a @ AlterTableAlterColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
a.dataType.foreach(failNullType)
val colName = a.column.toArray
val typeChange = a.dataType.map { newDataType =>
TableChange.updateColumnType(colName, newDataType)
}
val nullabilityChange = a.nullable.map { nullable =>
TableChange.updateColumnNullability(colName, nullable)
}
val commentChange = a.comment.map { newComment =>
TableChange.updateColumnComment(colName, newComment)
}
val positionChange = a.position.map { newPosition =>
TableChange.updateColumnPosition(colName, newPosition)
}
createAlterTable(
nameParts,
catalog,
tbl,
typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)

case AlterTableRenameColumnStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
createAlterTable(nameParts, catalog, tbl, changes)

case AlterTableDropColumnsStatement(
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
createAlterTable(nameParts, catalog, tbl, changes)

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoNullTypeInSchema(c.tableSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -72,28 +71,6 @@ object UnresolvedRelation {
UnresolvedRelation(tableIdentifier.database.toSeq :+ tableIdentifier.table)
}

/**
* A variant of [[UnresolvedRelation]] which can only be resolved to a v2 relation
* (`DataSourceV2Relation`), not v1 relation or temp view.
*
* @param originalNameParts the original table identifier name parts before catalog is resolved.
* @param catalog The catalog which the table should be looked up from.
* @param tableName The name of the table to look up.
*/
case class UnresolvedV2Relation(
originalNameParts: Seq[String],
catalog: TableCatalog,
tableName: Identifier)
extends LeafNode with NamedRelation {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override def name: String = originalNameParts.quoted

override def output: Seq[Attribute] = Nil

override lazy val resolved = false
}

/**
* An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into
* a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]].
Expand Down
Loading