Skip to content

Commit 275e044

Browse files
committed
[SPARK-29039][SQL] centralize the catalog and table lookup logic
### What changes were proposed in this pull request? Currently we deal with different `ParsedStatement` in many places and write duplicated catalog/table lookup logic. In general the lookup logic is 1. try look up the catalog by name. If no such catalog, and default catalog is not set, convert `ParsedStatement` to v1 command like `ShowDatabasesCommand`. Otherwise, convert `ParsedStatement` to v2 command like `ShowNamespaces`. 2. try look up the table by name. If no such table, fail. If the table is a `V1Table`, convert `ParsedStatement` to v1 command like `CreateTable`. Otherwise, convert `ParsedStatement` to v2 command like `CreateV2Table`. However, since the code is duplicated we don't apply this lookup logic consistently. For example, we forget to consider the v2 session catalog in several places. This PR centralizes the catalog/table lookup logic by 3 rules. 1. `ResolveCatalogs` (in catalyst). This rule resolves v2 catalog from the multipart identifier in SQL statements, and convert the statement to v2 command if the resolved catalog is not session catalog. If the command needs to resolve the table (e.g. ALTER TABLE), put an `UnresolvedV2Table` in the command. 2. `ResolveTables` (in catalyst). It resolves `UnresolvedV2Table` to `DataSourceV2Relation`. 3. `ResolveSessionCatalog` (in sql/core). This rule is only effective if the resolved catalog is session catalog. For commands that don't need to resolve the table, this rule converts the statement to v1 command directly. Otherwise, it converts the statement to v1 command if the resolved table is v1 table, and convert to v2 command if the resolved table is v2 table. Hopefully we can remove this rule eventually when v1 fallback is not needed anymore. ### Why are the changes needed? Reduce duplicated code and make the catalog/table lookup logic consistent. ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests Closes #25747 from cloud-fan/lookup. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 93289b5 commit 275e044

File tree

19 files changed

+1048
-566
lines changed

19 files changed

+1048
-566
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.apache.spark.annotation.Experimental;
2121
import org.apache.spark.sql.types.DataType;
2222

23+
import java.util.Arrays;
24+
import java.util.Objects;
25+
2326
/**
2427
* TableChange subclasses represent requested changes to a table. These are passed to
2528
* {@link TableCatalog#alterTable}. For example,
@@ -210,6 +213,20 @@ public String property() {
210213
public String value() {
211214
return value;
212215
}
216+
217+
@Override
218+
public boolean equals(Object o) {
219+
if (this == o) return true;
220+
if (o == null || getClass() != o.getClass()) return false;
221+
SetProperty that = (SetProperty) o;
222+
return property.equals(that.property) &&
223+
value.equals(that.value);
224+
}
225+
226+
@Override
227+
public int hashCode() {
228+
return Objects.hash(property, value);
229+
}
213230
}
214231

215232
/**
@@ -227,6 +244,19 @@ private RemoveProperty(String property) {
227244
public String property() {
228245
return property;
229246
}
247+
248+
@Override
249+
public boolean equals(Object o) {
250+
if (this == o) return true;
251+
if (o == null || getClass() != o.getClass()) return false;
252+
RemoveProperty that = (RemoveProperty) o;
253+
return property.equals(that.property);
254+
}
255+
256+
@Override
257+
public int hashCode() {
258+
return Objects.hash(property);
259+
}
230260
}
231261

232262
interface ColumnChange extends TableChange {
@@ -269,6 +299,24 @@ public boolean isNullable() {
269299
public String comment() {
270300
return comment;
271301
}
302+
303+
@Override
304+
public boolean equals(Object o) {
305+
if (this == o) return true;
306+
if (o == null || getClass() != o.getClass()) return false;
307+
AddColumn addColumn = (AddColumn) o;
308+
return isNullable == addColumn.isNullable &&
309+
Arrays.equals(fieldNames, addColumn.fieldNames) &&
310+
dataType.equals(addColumn.dataType) &&
311+
comment.equals(addColumn.comment);
312+
}
313+
314+
@Override
315+
public int hashCode() {
316+
int result = Objects.hash(dataType, isNullable, comment);
317+
result = 31 * result + Arrays.hashCode(fieldNames);
318+
return result;
319+
}
272320
}
273321

274322
/**
@@ -296,6 +344,22 @@ public String[] fieldNames() {
296344
public String newName() {
297345
return newName;
298346
}
347+
348+
@Override
349+
public boolean equals(Object o) {
350+
if (this == o) return true;
351+
if (o == null || getClass() != o.getClass()) return false;
352+
RenameColumn that = (RenameColumn) o;
353+
return Arrays.equals(fieldNames, that.fieldNames) &&
354+
newName.equals(that.newName);
355+
}
356+
357+
@Override
358+
public int hashCode() {
359+
int result = Objects.hash(newName);
360+
result = 31 * result + Arrays.hashCode(fieldNames);
361+
return result;
362+
}
299363
}
300364

301365
/**
@@ -328,6 +392,23 @@ public DataType newDataType() {
328392
public boolean isNullable() {
329393
return isNullable;
330394
}
395+
396+
@Override
397+
public boolean equals(Object o) {
398+
if (this == o) return true;
399+
if (o == null || getClass() != o.getClass()) return false;
400+
UpdateColumnType that = (UpdateColumnType) o;
401+
return isNullable == that.isNullable &&
402+
Arrays.equals(fieldNames, that.fieldNames) &&
403+
newDataType.equals(that.newDataType);
404+
}
405+
406+
@Override
407+
public int hashCode() {
408+
int result = Objects.hash(newDataType, isNullable);
409+
result = 31 * result + Arrays.hashCode(fieldNames);
410+
return result;
411+
}
331412
}
332413

333414
/**
@@ -354,6 +435,22 @@ public String[] fieldNames() {
354435
public String newComment() {
355436
return newComment;
356437
}
438+
439+
@Override
440+
public boolean equals(Object o) {
441+
if (this == o) return true;
442+
if (o == null || getClass() != o.getClass()) return false;
443+
UpdateColumnComment that = (UpdateColumnComment) o;
444+
return Arrays.equals(fieldNames, that.fieldNames) &&
445+
newComment.equals(that.newComment);
446+
}
447+
448+
@Override
449+
public int hashCode() {
450+
int result = Objects.hash(newComment);
451+
result = 31 * result + Arrays.hashCode(fieldNames);
452+
return result;
453+
}
357454
}
358455

359456
/**
@@ -372,6 +469,19 @@ private DeleteColumn(String[] fieldNames) {
372469
public String[] fieldNames() {
373470
return fieldNames;
374471
}
472+
473+
@Override
474+
public boolean equals(Object o) {
475+
if (this == o) return true;
476+
if (o == null || getClass() != o.getClass()) return false;
477+
DeleteColumn that = (DeleteColumn) o;
478+
return Arrays.equals(fieldNames, that.fieldNames);
479+
}
480+
481+
@Override
482+
public int hashCode() {
483+
return Arrays.hashCode(fieldNames);
484+
}
375485
}
376486

377487
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 8 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ class Analyzer(
128128

129129
private val catalog: SessionCatalog = catalogManager.v1SessionCatalog
130130

131+
override def isView(nameParts: Seq[String]): Boolean = catalog.isView(nameParts)
132+
131133
// Only for tests.
132134
def this(catalog: SessionCatalog, conf: SQLConf) = {
133135
this(
@@ -195,8 +197,7 @@ class Analyzer(
195197
new SubstituteUnresolvedOrdinals(conf)),
196198
Batch("Resolution", fixedPoint,
197199
ResolveTableValuedFunctions ::
198-
ResolveAlterTable ::
199-
ResolveDescribeTable ::
200+
new ResolveCatalogs(catalogManager) ::
200201
ResolveInsertInto ::
201202
ResolveTables ::
202203
ResolveRelations ::
@@ -680,6 +681,11 @@ class Analyzer(
680681
lookupV2Relation(u.multipartIdentifier)
681682
.map(v2Relation => i.copy(table = v2Relation))
682683
.getOrElse(i)
684+
685+
case u: UnresolvedV2Relation =>
686+
CatalogV2Util.loadTable(u.catalog, u.tableName).map { table =>
687+
DataSourceV2Relation.create(table)
688+
}.getOrElse(u)
683689
}
684690
}
685691

@@ -910,82 +916,6 @@ class Analyzer(
910916
}
911917
}
912918

913-
/**
914-
* Resolve ALTER TABLE statements that use a DSv2 catalog.
915-
*
916-
* This rule converts unresolved ALTER TABLE statements to v2 when a v2 catalog is responsible
917-
* for the table identifier. A v2 catalog is responsible for an identifier when the identifier
918-
* has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and
919-
* the table identifier does not include a catalog.
920-
*/
921-
object ResolveAlterTable extends Rule[LogicalPlan] {
922-
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
923-
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
924-
case alter @ AlterTableAddColumnsStatement(tableName, cols) =>
925-
val changes = cols.map { col =>
926-
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
927-
}
928-
resolveV2Alter(tableName, changes).getOrElse(alter)
929-
930-
case alter @ AlterTableAlterColumnStatement(tableName, colName, dataType, comment) =>
931-
val typeChange = dataType.map { newDataType =>
932-
TableChange.updateColumnType(colName.toArray, newDataType, true)
933-
}
934-
935-
val commentChange = comment.map { newComment =>
936-
TableChange.updateColumnComment(colName.toArray, newComment)
937-
}
938-
939-
resolveV2Alter(tableName, typeChange.toSeq ++ commentChange.toSeq).getOrElse(alter)
940-
941-
case alter @ AlterTableRenameColumnStatement(tableName, col, newName) =>
942-
val changes = Seq(TableChange.renameColumn(col.toArray, newName))
943-
resolveV2Alter(tableName, changes).getOrElse(alter)
944-
945-
case alter @ AlterTableDropColumnsStatement(tableName, cols) =>
946-
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
947-
resolveV2Alter(tableName, changes).getOrElse(alter)
948-
949-
case alter @ AlterTableSetPropertiesStatement(tableName, props) =>
950-
val changes = props.map { case (key, value) =>
951-
TableChange.setProperty(key, value)
952-
}
953-
954-
resolveV2Alter(tableName, changes.toSeq).getOrElse(alter)
955-
956-
case alter @ AlterTableUnsetPropertiesStatement(tableName, keys, _) =>
957-
resolveV2Alter(tableName, keys.map(key => TableChange.removeProperty(key))).getOrElse(alter)
958-
959-
case alter @ AlterTableSetLocationStatement(tableName, newLoc) =>
960-
resolveV2Alter(tableName, Seq(TableChange.setProperty("location", newLoc))).getOrElse(alter)
961-
}
962-
963-
private def resolveV2Alter(
964-
tableName: Seq[String],
965-
changes: Seq[TableChange]): Option[AlterTable] = {
966-
lookupV2RelationAndCatalog(tableName).map {
967-
case (relation, catalog, ident) =>
968-
AlterTable(catalog.asTableCatalog, ident, relation, changes)
969-
}
970-
}
971-
}
972-
973-
/**
974-
* Resolve DESCRIBE TABLE statements that use a DSv2 catalog.
975-
*
976-
* This rule converts unresolved DESCRIBE TABLE statements to v2 when a v2 catalog is responsible
977-
* for the table identifier. A v2 catalog is responsible for an identifier when the identifier
978-
* has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and
979-
* the table identifier does not include a catalog.
980-
*/
981-
object ResolveDescribeTable extends Rule[LogicalPlan] {
982-
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
983-
case describe @ DescribeTableStatement(
984-
CatalogObjectIdentifier(Some(v2Catalog), ident), _, isExtended) =>
985-
DescribeTable(UnresolvedRelation(describe.tableName), isExtended)
986-
}
987-
}
988-
989919
/**
990920
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from
991921
* a logical plan node's children.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2424
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
2525
import org.apache.spark.sql.catalyst.plans._
2626
import org.apache.spark.sql.catalyst.plans.logical._
27-
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableStatement, InsertIntoStatement}
27+
import org.apache.spark.sql.catalyst.plans.logical.sql.InsertIntoStatement
2828
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType}
2929
import org.apache.spark.sql.internal.SQLConf
3030
import org.apache.spark.sql.types._
@@ -34,6 +34,8 @@ import org.apache.spark.sql.types._
3434
*/
3535
trait CheckAnalysis extends PredicateHelper {
3636

37+
protected def isView(nameParts: Seq[String]): Boolean
38+
3739
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
3840

3941
/**
@@ -96,6 +98,13 @@ trait CheckAnalysis extends PredicateHelper {
9698
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) =>
9799
failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
98100

101+
case u: UnresolvedV2Relation if isView(u.originalNameParts) =>
102+
u.failAnalysis(
103+
s"Invalid command: '${u.originalNameParts.quoted}' is a view not a table.")
104+
105+
case u: UnresolvedV2Relation =>
106+
u.failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
107+
99108
case operator: LogicalPlan =>
100109
// Check argument data types of higher-order functions downwards first.
101110
// If the arguments of the higher-order functions are resolved but the type check fails,
@@ -357,9 +366,6 @@ trait CheckAnalysis extends PredicateHelper {
357366
case _ =>
358367
}
359368

360-
case alter: AlterTableStatement =>
361-
alter.failAnalysis(s"Table or view not found: ${alter.tableName.quoted}")
362-
363369
case alter: AlterTable if alter.childrenResolved =>
364370
val table = alter.table
365371
def findField(operation: String, fieldName: Array[String]): StructField = {

0 commit comments

Comments
 (0)