-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29039][SQL] centralize the catalog and table lookup logic #25747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| val relation = UnresolvedRelation(delete.tableName) | ||
| val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation) | ||
| DeleteFromTable(aliased, delete.condition) | ||
| case ShowNamespacesStatement(None, pattern) if defaultCatalog.isEmpty => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this problem while refactoring the catalog lookup logic. If no catalog is specified and default catalog is not set, we should fallback to v1 command which is ShowDatabasesCommand in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I may have misunderstood this discussion: #25601 (comment) - I had an impression that SHOW NAMESPACES is supported only in v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is correct. The SHOW DATABASES command was not changed, so using SHOW DATABASES is the v1 fallback. SHOW NAMESPACES always uses v2. I think that's fine.
|
also cc @imback82 . With this refactor, it should be much easier to refine the catalog/table lookup logic to fix https://issues.apache.org/jira/browse/SPARK-29014 |
|
Test build #110429 has finished for PR 25747 at commit
|
| object CatalogAndRestNameParts { | ||
| def unapply(nameParts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] = { | ||
| if (nameParts.isEmpty) { | ||
| Some((defaultCatalog, Nil)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we introduce currentCatalog in CatalogManager and use it instead of defaultCatalog as @rdblue suggested? I was thinking doing something like the following: master...imback82:current_ns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to make this PR a pure refactor and leave it to followups. For example, we should add more tests when changing this to currentCatalog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
| } | ||
|
|
||
| implicit class NamePartsHelper(nameParts: Seq[String]) { | ||
| def toIdentifier: Identifier = Identifier.of(nameParts.init.toArray, nameParts.last) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not add this to MultipartIdentifierHelper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MultipartIdentifierHelper takes a parameter called namespace. I'm a little worried about semantic mismatch here so I created a new one. How about we merge these two and call it NamePartsHelper? nameParts is very general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could just change that one to use nameParts and combine the two.
| catalog: TableCatalog, | ||
| ident: Identifier, | ||
| table: NamedRelation, | ||
| table: Table, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was previously NamedRelation so that it could be converted from a statement when the catalog is resolved using UnresolvedRelation. Then the normal table resolution rule would run to resolve this as to a DSv2 relation. Why change that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I think about this more, it seems like a bad idea.
This ties the SQL logical plans to a particular type of relation. There is no need to do that. What if we decide to add a v3 connector API? Then we would have to change all of these logical plans instead of using a new type of relation.
I think we should move this back to NamedRelation to keep logical plans and the connector API separate. That also makes rules cleaner, because catalog resolution and table resolution are done in independent rules.
| def loadV2Table(catalog: CatalogPlugin, ident: Identifier): Option[Table] = | ||
| try { | ||
| Option(catalog.asTableCatalog.loadTable(ident)) | ||
| Option(catalog.asTableCatalog.loadTable(ident)).filterNot(_.isInstanceOf[V1Table]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while this does seem to simplify code a little bit, I think it's going to hamper the small incremental work that we can do to migrate to V2 from V1. Having this one centralized thing would cause PRs to have humongous changes when we want ALTER TABLE ADD COLUMNS behavior go fully through V2SessionCatalog for example
| } | ||
|
|
||
| if (catalogTable.tableType == CatalogTableType.VIEW) { | ||
| throw new NoSuchTableException(ident) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a separate bug fix?
|
|
||
| test("DropTable: if exists") { | ||
| intercept[NoSuchTableException] { | ||
| intercept[AnalysisException] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this change?
| case _ => plan | ||
| } | ||
|
|
||
| private def tryResolveV2Relation(u: UnresolvedRelation): Option[DataSourceV2Relation] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should continue to use extractors instead of inline logic. The rules are cleaner when written with extractors. We might need to update them, but I'd like to avoid methods like this one that do basically the same thing.
| case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved => | ||
| tryResolveV2Relation(u).map(v2Relation => i.copy(table = v2Relation)).getOrElse { | ||
| if (u.multipartIdentifier.length <= 2) { | ||
| val ident = u.multipartIdentifier match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should get the identifier value from an extractor instead.
|
@cloud-fan, overall I like some parts of this PR quite a bit. It is really nice how the rules are collapsed in this PR. The problem is that I'm not sure that the trade-off is worth it. I don't think it is good for SQL statements to be responsible for converting themselves into DSv2 plans. We want to keep the statements that are parsed and the plan implementations separate. And it seems odd to move lots of rule logic into plan nodes themselves. I think that the right approach for keeping this logic in one place is to use extractors in rules. Recent changes have moved further away from that, but I think that's the cleanest way to keep this logic in one place. The reason we have changed this recently is the need to handle v1 tables. But I think we should take a different approach to support those. The only reason why catalog/identifier resolution and table resolution happen at the same time is to support fallback to v1. But we don't need fallback to v1 outside of the Here's what I would do:
With this approach, catalog/identifier resolution is separate from table resolution. Table resolution would happen in What do you think? |
|
Hi @rdblue , thanks for taking a look! What I want to get consensus on is the categories of catalog/table resolution I mentioned in the PR description:
If we agree with this, I don't mind to use different extractors to implement this. I'll update this PR to use extractors. |
d08e930 to
14aff3e
Compare
|
I've updated this PR to use extractors and updated the PR description. |
|
Test build #110642 has finished for PR 25747 at commit
|
|
Test build #110724 has finished for PR 25747 at commit
|
|
Test build #110740 has finished for PR 25747 at commit
|
|
retest this please |
|
Test build #110752 has finished for PR 25747 at commit
|
| DescribeTableCommand(tblName.toV1Identifier, partitionSpec, isExtended) | ||
| } | ||
|
|
||
| private def assertTopLeveColumn(colName: Seq[String], command: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and below methods are moved from the old DataSourceResolution.
|
Test build #111693 has finished for PR 25747 at commit
|
|
Test build #111695 has finished for PR 25747 at commit
|
|
There are a few problems with mixing identifier resolution with table lookup. First, there is only one case where we need to do them at the same time: for v1 fallback tables. But there are other cases where loading and inspecting a table has been suggested. For example, if we are looking up Second, by separating the resolution rules in the Analyzer, we won't need to rewrite them later when we remove v1 fallback. When we remove v1 tables, we want to simply remove the I see the code duplication argument, but we can separate conversion into a utility class that is used in both places. |
|
Test build #111732 has finished for PR 25747 at commit
|
|
Test build #111733 has finished for PR 25747 at commit
|
| DeleteFromTable(aliased, condition) | ||
|
|
||
| case update: UpdateTableStatement => | ||
| throw new AnalysisException(s"Update table is not supported temporarily.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: in other cases, SQL keywords use all caps to distinguish from text. Should be "UPDATE TABLE is not ..."
| case DescribeTableStatement( | ||
| nameParts @ NonSessionCatalog(catalog, tableName), partitionSpec, isExtended) => | ||
| if (partitionSpec.nonEmpty) { | ||
| throw new AnalysisException("DESC TABLE does not support partition for v2 tables.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should we use the full keyword instead of an abbreviation? "DESCRIBE TABLE does not ..."
| writeOptions = c.options.filterKeys(_ != "path"), | ||
| orCreate = c.orCreate) | ||
|
|
||
| case DropTableStatement(NonSessionCatalog(catalog, tableName), ifExists, purge) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: purge isn't used so it should be _.
| } | ||
| } | ||
|
|
||
| object NonSessionCatalog { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for this. We may want to move this into the lookup methods and use it as a common extractor pattern. Looks like we probably need to clean those up and simplify after all the planned updates are done.
| val namespace = if (nameParts.isEmpty) None else Some(nameParts) | ||
| ShowNamespaces(catalog.asNamespaceCatalog, namespace, pattern) | ||
|
|
||
| // TODO (SPARK-29014): we should check if the current catalog is not session catalog here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this should use the current catalog. After ShowNamespaces and ShowDatabases are merged, we won't need the fallback.
| * @param catalog The catalog which the table should be looked up from. | ||
| * @param tableName The name of the table to look up. | ||
| */ | ||
| case class UnresolvedV2Table( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: I think we may want to use UnresolvedV2Relation instead. The only thing we know is that this uses a v2 catalog. It could be a v2 view in the future.
|
|
||
| assert(exception.getMessage.contains("No default v2 catalog is set")) | ||
| assert(exception.getMessage.contains( | ||
| "SHOW NAMESPACES is not supported with the session catalog")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the existing behavior. We will support it once we unify SHOW NAMESPACE and SHOW DATABASE. AFAIK @imback82 already has a PR for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will update this as soon as this PR is merged.
|
|
||
| private def assertViewNotSupported(query: String): Unit = { | ||
| val e = intercept[AnalysisException](sql(query)) | ||
| assert(e.message.contains("'testView' is a view not table")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: should be not a table.
| AlterTableAddColumnsCommand(tableName.asTableIdentifier, cols.map(convertToStructField)) | ||
| }.getOrElse { | ||
| val changes = cols.map { col => | ||
| TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: boolean arguments should be named.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or is this not allowed since it's a Java method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the method parameter name is not kept in Java.
| nameParts @ SessionCatalog(catalog, tableName), colName, dataType, comment) => | ||
| loadTable(catalog, tableName.asIdentifier).collect { | ||
| case v1Table: V1Table => | ||
| // TODO: we should fallback to the v1 `AlterTableChangeColumnCommand`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we open an issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ticket created: https://issues.apache.org/jira/browse/SPARK-29353
| } | ||
| } | ||
|
|
||
| def createV2AlterTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used?
| |LOCATION '/user/external/page_view' | ||
| |TBLPROPERTIES ('p1'='v1', 'p2'='v2') | ||
| |AS SELECT * FROM src | ||
| |AS SELECT 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this change? To avoid needing to look up src?
These test cases originally came directly from the SparkSql parser tests. Not changing them was a way to ensure compatibility.
|
@cloud-fan, looks good. The only thing other than nits is that I think we should probably rename |
|
Test build #111763 has finished for PR 25747 at commit
|
|
thanks for the review, merging to master! |
| orCreate = c.orCreate) | ||
|
|
||
| case DropTableStatement(NonSessionCatalog(catalog, tableName), ifExists, purge) => | ||
| DropTable(catalog.asTableCatalog, tableName.asIdentifier, ifExists) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @cloud-fan , I got your message on TableCatalog # dropTable() not having purge as its option. It seems purge is not passed on from here (not passed into DropTable). Do I get it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are converting to v2 DropTable. We can fail if purge = true
| comparePlans(parsed1_table, expected1_table) | ||
| comparePlans(parsed2_table, expected2_table) | ||
| comparePlans(parsed3_table, expected3_table) | ||
| // For non-existing tables, we convert it to v2 command with `UnresolvedV2Table` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @cloud-fan I'm curious why we need convert non-existing table to v2 command. In my idea, we can just handle this in v1 command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use v2 command for v2 table, and v1 command for v1 table. If table not exists, I think v2 command is better, as we want to migrate everything to v2 command in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do agree v2 command is better than v1. But it will make some things unexpected. when load table return none, ResolveSessionCatalog will check if v2 support it, and throw not support msg when not support and throw table not found when support. I do not determine which msg is the high priority. Here is a example:
create table test(c int);
// throw Table not found
desc tes;
// throw Describing columns is not supported for v2 tables.;
desc tes c;
Shall we keep this consistent when table is non-existing ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use v1 command directly, things will be more simple. We even not need load table(it will affect perf), just use v1 command when catalog is session catalog.
E.G.
case DescribeTableStatement(SessionCatalog(_, tableName), partitionSpec, isExtended) =>
DescribeTableCommand(tableName.asTableIdentifier, partitionSpec, isExtended)
case DescribeColumnStatement(SessionCatalog(_, tableName), colNameParts, isExtended) =>
DescribeColumnCommand(tableName.asTableIdentifier, colNameParts, isExtended)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine. fail earlier is better than looking up the table and report table not found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's OK. Thanks.
What changes were proposed in this pull request?
Currently we deal with different
ParsedStatementin many places and write duplicated catalog/table lookup logic. In general the lookup logic isParsedStatementto v1 command likeShowDatabasesCommand. Otherwise, convertParsedStatementto v2 command likeShowNamespaces.V1Table, convertParsedStatementto v1 command likeCreateTable. Otherwise, convertParsedStatementto v2 command likeCreateV2Table.However, since the code is duplicated we don't apply this lookup logic consistently. For example, we forget to consider the v2 session catalog in several places.
This PR centralizes the catalog/table lookup logic by 3 rules.
ResolveCatalogs(in catalyst). This rule resolves v2 catalog from the multipart identifier in SQL statements, and convert the statement to v2 command if the resolved catalog is not session catalog. If the command needs to resolve the table (e.g. ALTER TABLE), put anUnresolvedV2Tablein the command.ResolveTables(in catalyst). It resolvesUnresolvedV2TabletoDataSourceV2Relation.ResolveSessionCatalog(in sql/core). This rule is only effective if the resolved catalog is session catalog. For commands that don't need to resolve the table, this rule converts the statement to v1 command directly. Otherwise, it converts the statement to v1 command if the resolved table is v1 table, and convert to v2 command if the resolved table is v2 table. Hopefully we can remove this rule eventually when v1 fallback is not needed anymore.Why are the changes needed?
Reduce duplicated code and make the catalog/table lookup logic consistent.
Does this PR introduce any user-facing change?
no
How was this patch tested?
existing tests