Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.types.DataType;

import java.util.Arrays;
import java.util.Objects;

/**
* TableChange subclasses represent requested changes to a table. These are passed to
* {@link TableCatalog#alterTable}. For example,
Expand Down Expand Up @@ -210,6 +213,20 @@ public String property() {
public String value() {
return value;
}

@Override
public boolean equals(Object o) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these equal/hashCode are useful in general and we need them in tests. These are generated by IDE, we can also implement these classes in Scala case class to get equal/hashCode for free.

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SetProperty that = (SetProperty) o;
return property.equals(that.property) &&
value.equals(that.value);
}

@Override
public int hashCode() {
return Objects.hash(property, value);
}
}

/**
Expand All @@ -227,6 +244,19 @@ private RemoveProperty(String property) {
public String property() {
return property;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RemoveProperty that = (RemoveProperty) o;
return property.equals(that.property);
}

@Override
public int hashCode() {
return Objects.hash(property);
}
}

interface ColumnChange extends TableChange {
Expand Down Expand Up @@ -269,6 +299,24 @@ public boolean isNullable() {
public String comment() {
return comment;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AddColumn addColumn = (AddColumn) o;
return isNullable == addColumn.isNullable &&
Arrays.equals(fieldNames, addColumn.fieldNames) &&
dataType.equals(addColumn.dataType) &&
comment.equals(addColumn.comment);
}

@Override
public int hashCode() {
int result = Objects.hash(dataType, isNullable, comment);
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
}

/**
Expand Down Expand Up @@ -296,6 +344,22 @@ public String[] fieldNames() {
public String newName() {
return newName;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RenameColumn that = (RenameColumn) o;
return Arrays.equals(fieldNames, that.fieldNames) &&
newName.equals(that.newName);
}

@Override
public int hashCode() {
int result = Objects.hash(newName);
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
}

/**
Expand Down Expand Up @@ -328,6 +392,23 @@ public DataType newDataType() {
public boolean isNullable() {
return isNullable;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateColumnType that = (UpdateColumnType) o;
return isNullable == that.isNullable &&
Arrays.equals(fieldNames, that.fieldNames) &&
newDataType.equals(that.newDataType);
}

@Override
public int hashCode() {
int result = Objects.hash(newDataType, isNullable);
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
}

/**
Expand All @@ -354,6 +435,22 @@ public String[] fieldNames() {
public String newComment() {
return newComment;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UpdateColumnComment that = (UpdateColumnComment) o;
return Arrays.equals(fieldNames, that.fieldNames) &&
newComment.equals(that.newComment);
}

@Override
public int hashCode() {
int result = Objects.hash(newComment);
result = 31 * result + Arrays.hashCode(fieldNames);
return result;
}
}

/**
Expand All @@ -372,6 +469,19 @@ private DeleteColumn(String[] fieldNames) {
public String[] fieldNames() {
return fieldNames;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteColumn that = (DeleteColumn) o;
return Arrays.equals(fieldNames, that.fieldNames);
}

@Override
public int hashCode() {
return Arrays.hashCode(fieldNames);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ class Analyzer(

private val catalog: SessionCatalog = catalogManager.v1SessionCatalog

override def isView(nameParts: Seq[String]): Boolean = catalog.isView(nameParts)

// Only for tests.
def this(catalog: SessionCatalog, conf: SQLConf) = {
this(
Expand Down Expand Up @@ -195,8 +197,7 @@ class Analyzer(
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveAlterTable ::
ResolveDescribeTable ::
new ResolveCatalogs(catalogManager) ::
ResolveInsertInto ::
ResolveTables ::
ResolveRelations ::
Expand Down Expand Up @@ -680,6 +681,11 @@ class Analyzer(
lookupV2Relation(u.multipartIdentifier)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)

case u: UnresolvedV2Relation =>
CatalogV2Util.loadTable(u.catalog, u.tableName).map { table =>
DataSourceV2Relation.create(table)
}.getOrElse(u)
}
}

Expand Down Expand Up @@ -910,82 +916,6 @@ class Analyzer(
}
}

/**
* Resolve ALTER TABLE statements that use a DSv2 catalog.
*
* This rule converts unresolved ALTER TABLE statements to v2 when a v2 catalog is responsible
* for the table identifier. A v2 catalog is responsible for an identifier when the identifier
* has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and
* the table identifier does not include a catalog.
*/
object ResolveAlterTable extends Rule[LogicalPlan] {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case alter @ AlterTableAddColumnsStatement(tableName, cols) =>
val changes = cols.map { col =>
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
}
resolveV2Alter(tableName, changes).getOrElse(alter)

case alter @ AlterTableAlterColumnStatement(tableName, colName, dataType, comment) =>
val typeChange = dataType.map { newDataType =>
TableChange.updateColumnType(colName.toArray, newDataType, true)
}

val commentChange = comment.map { newComment =>
TableChange.updateColumnComment(colName.toArray, newComment)
}

resolveV2Alter(tableName, typeChange.toSeq ++ commentChange.toSeq).getOrElse(alter)

case alter @ AlterTableRenameColumnStatement(tableName, col, newName) =>
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
resolveV2Alter(tableName, changes).getOrElse(alter)

case alter @ AlterTableDropColumnsStatement(tableName, cols) =>
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
resolveV2Alter(tableName, changes).getOrElse(alter)

case alter @ AlterTableSetPropertiesStatement(tableName, props) =>
val changes = props.map { case (key, value) =>
TableChange.setProperty(key, value)
}

resolveV2Alter(tableName, changes.toSeq).getOrElse(alter)

case alter @ AlterTableUnsetPropertiesStatement(tableName, keys, _) =>
resolveV2Alter(tableName, keys.map(key => TableChange.removeProperty(key))).getOrElse(alter)

case alter @ AlterTableSetLocationStatement(tableName, newLoc) =>
resolveV2Alter(tableName, Seq(TableChange.setProperty("location", newLoc))).getOrElse(alter)
}

private def resolveV2Alter(
tableName: Seq[String],
changes: Seq[TableChange]): Option[AlterTable] = {
lookupV2RelationAndCatalog(tableName).map {
case (relation, catalog, ident) =>
AlterTable(catalog.asTableCatalog, ident, relation, changes)
}
}
}

/**
* Resolve DESCRIBE TABLE statements that use a DSv2 catalog.
*
* This rule converts unresolved DESCRIBE TABLE statements to v2 when a v2 catalog is responsible
* for the table identifier. A v2 catalog is responsible for an identifier when the identifier
* has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and
* the table identifier does not include a catalog.
*/
object ResolveDescribeTable extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case describe @ DescribeTableStatement(
CatalogObjectIdentifier(Some(v2Catalog), ident), _, isExtended) =>
DescribeTable(UnresolvedRelation(describe.tableName), isExtended)
}
}

/**
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from
* a logical plan node's children.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableStatement, InsertIntoStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand All @@ -34,6 +34,8 @@ import org.apache.spark.sql.types._
*/
trait CheckAnalysis extends PredicateHelper {

protected def isView(nameParts: Seq[String]): Boolean

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

/**
Expand Down Expand Up @@ -96,6 +98,13 @@ trait CheckAnalysis extends PredicateHelper {
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) =>
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case u: UnresolvedV2Relation if isView(u.originalNameParts) =>
u.failAnalysis(
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")

case u: UnresolvedV2Relation =>
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")

case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
Expand Down Expand Up @@ -357,9 +366,6 @@ trait CheckAnalysis extends PredicateHelper {
case _ =>
}

case alter: AlterTableStatement =>
alter.failAnalysis(s"Table or view not found: ${alter.tableName.quoted}")

case alter: AlterTable if alter.childrenResolved =>
val table = alter.table
def findField(operation: String, fieldName: Array[String]): StructField = {
Expand Down
Loading