diff --git a/docs/sql-keywords.md b/docs/sql-keywords.md index 4ea1754618cd..7a0e3efee8ff 100644 --- a/docs/sql-keywords.md +++ b/docs/sql-keywords.md @@ -179,6 +179,7 @@ Below is a list of all the keywords in Spark SQL. MONTHreservednon-reservedreserved MONTHSnon-reservednon-reservednon-reserved MSCKnon-reservednon-reservednon-reserved + NAMESPACEnon-reservednon-reservednon-reserved NAMESPACESnon-reservednon-reservednon-reserved NATURALreservedstrict-non-reservedreserved NOnon-reservednon-reservedreserved diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index dedfa846d854..aa96e079070f 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -82,7 +82,7 @@ singleTableSchema statement : query #statementDefault | ctes? dmlStatementNoWith #dmlStatement - | USE db=errorCapturingIdentifier #use + | USE NAMESPACE? multipartIdentifier #use | CREATE database (IF NOT EXISTS)? db=errorCapturingIdentifier ((COMMENT comment=STRING) | locationSpec | @@ -1019,6 +1019,7 @@ ansiNonReserved | MINUTES | MONTHS | MSCK + | NAMESPACE | NAMESPACES | NO | NULLS @@ -1270,6 +1271,7 @@ nonReserved | MONTH | MONTHS | MSCK + | NAMESPACE | NAMESPACES | NO | NOT @@ -1532,6 +1534,7 @@ MINUTES: 'MINUTES'; MONTH: 'MONTH'; MONTHS: 'MONTHS'; MSCK: 'MSCK'; +NAMESPACE: 'NAMESPACE'; NAMESPACES: 'NAMESPACES'; NATURAL: 'NATURAL'; NO: 'NO'; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java index 32ba24ff0256..65e0b6be00ef 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/CatalogExtension.java @@ -22,17 +22,17 @@ /** * An API to extend the Spark built-in session catalog. Implementation can get the built-in session - * catalog from {@link #setDelegateCatalog(TableCatalog)}, implement catalog functions with + * catalog from {@link #setDelegateCatalog(CatalogPlugin)}, implement catalog functions with * some custom logic and call the built-in session catalog at the end. For example, they can * implement {@code createTable}, do something else before calling {@code createTable} of the * built-in session catalog. */ @Experimental -public interface CatalogExtension extends TableCatalog { +public interface CatalogExtension extends TableCatalog, SupportsNamespaces { /** * This will be called only once by Spark to pass in the Spark built-in session catalog, after * {@link #initialize(String, CaseInsensitiveStringMap)} is called. */ - void setDelegateCatalog(TableCatalog delegate); + void setDelegateCatalog(CatalogPlugin delegate); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java index d7f0ee15f840..b93acdc777e9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; @@ -36,9 +37,9 @@ @Experimental public abstract class DelegatingCatalogExtension implements CatalogExtension { - private TableCatalog delegate; + private CatalogPlugin delegate; - public final void setDelegateCatalog(TableCatalog delegate) { + public final void setDelegateCatalog(CatalogPlugin delegate) { this.delegate = delegate; } @@ -52,22 +53,22 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {} @Override public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException { - return delegate.listTables(namespace); + return asTableCatalog().listTables(namespace); } @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - return delegate.loadTable(ident); + return asTableCatalog().loadTable(ident); } @Override public void invalidateTable(Identifier ident) { - delegate.invalidateTable(ident); + asTableCatalog().invalidateTable(ident); } @Override public boolean tableExists(Identifier ident) { - return delegate.tableExists(ident); + return asTableCatalog().tableExists(ident); } @Override @@ -76,25 +77,78 @@ public Table createTable( StructType schema, Transform[] partitions, Map properties) throws TableAlreadyExistsException, NoSuchNamespaceException { - return delegate.createTable(ident, schema, partitions, properties); + return asTableCatalog().createTable(ident, schema, partitions, properties); } @Override public Table alterTable( Identifier ident, TableChange... changes) throws NoSuchTableException { - return delegate.alterTable(ident, changes); + return asTableCatalog().alterTable(ident, changes); } @Override public boolean dropTable(Identifier ident) { - return delegate.dropTable(ident); + return asTableCatalog().dropTable(ident); } @Override public void renameTable( Identifier oldIdent, Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException { - delegate.renameTable(oldIdent, newIdent); + asTableCatalog().renameTable(oldIdent, newIdent); + } + + @Override + public String[] defaultNamespace() { + return asNamespaceCatalog().defaultNamespace(); + } + + @Override + public String[][] listNamespaces() throws NoSuchNamespaceException { + return asNamespaceCatalog().listNamespaces(); + } + + @Override + public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException { + return asNamespaceCatalog().listNamespaces(namespace); + } + + @Override + public boolean namespaceExists(String[] namespace) { + return asNamespaceCatalog().namespaceExists(namespace); + } + + @Override + public Map loadNamespaceMetadata( + String[] namespace) throws NoSuchNamespaceException { + return asNamespaceCatalog().loadNamespaceMetadata(namespace); + } + + @Override + public void createNamespace( + String[] namespace, + Map metadata) throws NamespaceAlreadyExistsException { + asNamespaceCatalog().createNamespace(namespace, metadata); + } + + @Override + public void alterNamespace( + String[] namespace, + NamespaceChange... changes) throws NoSuchNamespaceException { + asNamespaceCatalog().alterNamespace(namespace, changes); + } + + @Override + public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException { + return asNamespaceCatalog().dropNamespace(namespace); + } + + private TableCatalog asTableCatalog() { + return (TableCatalog)delegate; + } + + private SupportsNamespaces asNamespaceCatalog() { + return (SupportsNamespaces)delegate; } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b7e33b1d2f6f..5e22d4ecde2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -52,12 +52,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * to resolve attribute references. */ object SimpleAnalyzer extends Analyzer( - new SessionCatalog( - new InMemoryCatalog, - EmptyFunctionRegistry, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) { - override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {} - }, + new CatalogManager( + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true), + FakeV2SessionCatalog, + new SessionCatalog( + new InMemoryCatalog, + EmptyFunctionRegistry, + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) { + override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {} + }), new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) object FakeV2SessionCatalog extends TableCatalog { @@ -118,23 +121,25 @@ object AnalysisContext { * [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]]. */ class Analyzer( - catalog: SessionCatalog, - v2SessionCatalog: TableCatalog, + override val catalogManager: CatalogManager, conf: SQLConf, maxIterations: Int) extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { + private val catalog: SessionCatalog = catalogManager.v1SessionCatalog + // Only for tests. def this(catalog: SessionCatalog, conf: SQLConf) = { - this(catalog, FakeV2SessionCatalog, conf, conf.optimizerMaxIterations) + this( + new CatalogManager(conf, FakeV2SessionCatalog, catalog), + conf, + conf.optimizerMaxIterations) } - def this(catalog: SessionCatalog, v2SessionCatalog: TableCatalog, conf: SQLConf) = { - this(catalog, v2SessionCatalog, conf, conf.optimizerMaxIterations) + def this(catalogManager: CatalogManager, conf: SQLConf) = { + this(catalogManager, conf, conf.optimizerMaxIterations) } - override val catalogManager: CatalogManager = new CatalogManager(conf, v2SessionCatalog, catalog) - def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { AnalysisHelper.markInAnalyzer { val analyzed = executeAndTrack(plan, tracker) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b992ab130bbc..eab4c3efe4f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -35,7 +36,7 @@ import org.apache.spark.util.Utils * Abstract class all optimizers should inherit of, contains the standard batches (extending * Optimizers can override this. */ -abstract class Optimizer(sessionCatalog: SessionCatalog) +abstract class Optimizer(catalogManager: CatalogManager) extends RuleExecutor[LogicalPlan] { // Check for structural integrity of the plan in test mode. @@ -129,7 +130,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) EliminateView, ReplaceExpressions, ComputeCurrentTime, - GetCurrentDatabase(sessionCatalog), + GetCurrentDatabase(catalogManager), RewriteDistinctAggregates, ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// @@ -212,7 +213,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) EliminateView.ruleName :: ReplaceExpressions.ruleName :: ComputeCurrentTime.ruleName :: - GetCurrentDatabase(sessionCatalog).ruleName :: + GetCurrentDatabase(catalogManager).ruleName :: RewriteDistinctAggregates.ruleName :: ReplaceDeduplicateWithAggregate.ruleName :: ReplaceIntersectWithSemiJoin.ruleName :: @@ -318,10 +319,10 @@ object EliminateDistinct extends Rule[LogicalPlan] { object SimpleTestOptimizer extends SimpleTestOptimizer class SimpleTestOptimizer extends Optimizer( - new SessionCatalog( - new InMemoryCatalog, - EmptyFunctionRegistry, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))) + new CatalogManager( + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true), + FakeV2SessionCatalog, + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf()))) /** * Remove redundant aliases from a query plan. A redundant alias is an alias that does not change diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala index 69ba76827c78..9d7564175314 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/finishAnalysis.scala @@ -21,12 +21,12 @@ import java.time.LocalDate import scala.collection.mutable -import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.types._ @@ -78,11 +78,14 @@ object ComputeCurrentTime extends Rule[LogicalPlan] { /** Replaces the expression of CurrentDatabase with the current database name. */ -case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[LogicalPlan] { +case class GetCurrentDatabase(catalogManager: CatalogManager) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val currentNamespace = catalogManager.currentNamespace.quoted + plan transformAllExpressions { case CurrentDatabase() => - Literal.create(sessionCatalog.getCurrentDatabase, StringType) + Literal.create(currentNamespace, StringType) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ae036a831e8e..f3c603abb9a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement, UseStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform} import org.apache.spark.sql.internal.SQLConf @@ -2462,6 +2462,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging ctx.EXISTS != null) } + /** + * Create a [[UseStatement]] logical plan. + */ + override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) { + val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier) + UseStatement(ctx.NAMESPACE != null, nameParts) + } + /** * Create a [[ShowTablesStatement]] command. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 9356d7707104..c0d53104874a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog, TableChange} import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange} import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types._ @@ -659,6 +659,14 @@ case class ShowTables( AttributeReference("tableName", StringType, nullable = false)()) } +/** + * The logical plan of the USE/USE NAMESPACE command that works for v2 catalogs. + */ +case class SetCatalogAndNamespace( + catalogManager: CatalogManager, + catalogName: Option[String], + namespace: Option[Seq[String]]) extends Command + /** * Insert query result into a directory. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/UseStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/UseStatement.scala new file mode 100644 index 000000000000..1fd76a252d82 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/UseStatement.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +/** + * A USE statement, as parsed from SQL. + */ +case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends ParsedStatement diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index ebb947febb6c..be14b1770127 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -39,8 +39,8 @@ import org.apache.spark.sql.internal.SQLConf private[sql] class CatalogManager( conf: SQLConf, - defaultSessionCatalog: TableCatalog, - v1SessionCatalog: SessionCatalog) extends Logging { + defaultSessionCatalog: CatalogPlugin, + val v1SessionCatalog: SessionCatalog) extends Logging { import CatalogManager.SESSION_CATALOG_NAME private val catalogs = mutable.HashMap.empty[String, CatalogPlugin] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index baea8204d9be..f99ea61527b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -38,6 +38,11 @@ private[sql] trait LookupCatalog extends Logging { */ def defaultCatalog: Option[CatalogPlugin] = catalogManager.defaultCatalog + /** + * Returns the current catalog set. + */ + def currentCatalog: CatalogPlugin = catalogManager.currentCatalog + /** * This catalog is a v2 catalog that delegates to the v1 session catalog. it is used when the * session catalog is responsible for an identifier, but the source requires the v2 catalog API. @@ -80,14 +85,14 @@ private[sql] trait LookupCatalog extends Logging { } } - type CatalogNamespace = (Option[CatalogPlugin], Seq[String]) + type DefaultCatalogAndNamespace = (Option[CatalogPlugin], Seq[String]) /** * Extract catalog and namespace from a multi-part identifier with the default catalog if needed. * Catalog name takes precedence over namespaces. */ - object CatalogNamespace { - def unapply(parts: Seq[String]): Some[CatalogNamespace] = parts match { + object DefaultCatalogAndNamespace { + def unapply(parts: Seq[String]): Some[DefaultCatalogAndNamespace] = parts match { case Seq(catalogName, tail @ _*) => try { Some((Some(catalogManager.catalog(catalogName)), tail)) @@ -98,6 +103,24 @@ private[sql] trait LookupCatalog extends Logging { } } + type CurrentCatalogAndNamespace = (CatalogPlugin, Seq[String]) + + /** + * Extract catalog and namespace from a multi-part identifier with the current catalog if needed. + * Catalog name takes precedence over namespaces. + */ + object CurrentCatalogAndNamespace { + def unapply(parts: Seq[String]): Some[CurrentCatalogAndNamespace] = parts match { + case Seq(catalogName, tail @ _*) => + try { + Some((catalogManager.catalog(catalogName), tail)) + } catch { + case _: CatalogNotFoundException => + Some((currentCatalog, parts)) + } + } + } + /** * Extract legacy table identifier from a multi-part identifier. * diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index 5e0d2041fac5..5998437f11f4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, NamedExpressio import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, LogicalPlan, OneRowRelation, Project} import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.internal.SQLConf @@ -43,10 +44,10 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { } object Optimize extends Optimizer( - new SessionCatalog( - new InMemoryCatalog, - EmptyFunctionRegistry, - new SQLConf())) { + new CatalogManager( + new SQLConf(), + FakeV2SessionCatalog, + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf()))) { val newBatch = Batch("OptimizeRuleBreakSI", Once, OptimizeRuleBreakSI) override def defaultBatches: Seq[Batch] = Seq(newBatch) ++ super.defaultBatches } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 98d6be0374da..4a8b56faddf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -20,15 +20,17 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.ExperimentalMethods import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer._ +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.dynamicpruning.{CleanupDynamicPruningFilters, PartitionPruning} import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions import org.apache.spark.sql.execution.datasources.SchemaPruning import org.apache.spark.sql.execution.python.{ExtractGroupingPythonUDFFromAggregate, ExtractPythonUDFFromAggregate, ExtractPythonUDFs} class SparkOptimizer( + catalogManager: CatalogManager, catalog: SessionCatalog, experimentalMethods: ExperimentalMethods) - extends Optimizer(catalog) { + extends Optimizer(catalogManager) { override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ super.defaultBatches :+ Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 790eba241bdb..2346545876b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -138,13 +138,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } } - /** - * Create a [[SetDatabaseCommand]] logical plan. - */ - override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) { - SetDatabaseCommand(ctx.db.getText) - } - /** * Create a [[ShowTablesCommand]] logical plan. * Example SQL : diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index dfa5a6866128..33e1ecd359ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowNamespaces, ShowTables, SubqueryAlias} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement, UseStatement} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog, TableCatalog} import org.apache.spark.sql.connector.expressions.Transform @@ -178,7 +178,7 @@ case class DataSourceResolution( } case ShowNamespacesStatement(Some(namespace), pattern) => - val CatalogNamespace(maybeCatalog, ns) = namespace + val DefaultCatalogAndNamespace(maybeCatalog, ns) = namespace maybeCatalog match { case Some(catalog) => ShowNamespaces(catalog.asNamespaceCatalog, Some(ns), pattern) @@ -202,7 +202,7 @@ case class DataSourceResolution( } case ShowTablesStatement(Some(namespace), pattern) => - val CatalogNamespace(maybeCatalog, ns) = namespace + val DefaultCatalogAndNamespace(maybeCatalog, ns) = namespace maybeCatalog match { case Some(catalog) => ShowTables(catalog.asTableCatalog, ns, pattern) @@ -213,6 +213,15 @@ case class DataSourceResolution( } ShowTablesCommand(Some(namespace.quoted), pattern) } + + case UseStatement(isNamespaceSet, nameParts) => + if (isNamespaceSet) { + SetCatalogAndNamespace(catalogManager, None, Some(nameParts)) + } else { + val CurrentCatalogAndNamespace(catalog, namespace) = nameParts + val ns = if (namespace.isEmpty) { None } else { Some(namespace) } + SetCatalogAndNamespace(catalogManager, Some(catalog.name()), ns) + } } object V1Provider { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 6387333d0688..c8d29520bcfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, ShowNamespaces, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowNamespaces, ShowTables} import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability} import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownRequiredColumns} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} @@ -295,6 +295,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case r : ShowTables => ShowTablesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil + case SetCatalogAndNamespace(catalogManager, catalogName, namespace) => + SetCatalogAndNamespaceExec(catalogManager, catalogName, namespace) :: Nil + case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala new file mode 100644 index 000000000000..93e0c89cf648 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for setting the current catalog and/or namespace. + */ +case class SetCatalogAndNamespaceExec( + catalogManager: CatalogManager, + catalogName: Option[String], + namespace: Option[Seq[String]]) + extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = { + // The catalog is updated first because CatalogManager resets the current namespace + // when the current catalog is set. + catalogName.map(catalogManager.setCurrentCatalog) + namespace.map(ns => catalogManager.setCurrentNamespace(ns.toArray)) + + sqlContext.sparkContext.parallelize(Seq.empty, 1) + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 525d9c4e541e..dffb9cb67b5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -52,7 +52,10 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) override def listTables(namespace: Array[String]): Array[Identifier] = { namespace match { case Array(db) => - catalog.listTables(db).map(ident => Identifier.of(Array(db), ident.table)).toArray + catalog + .listTables(db) + .map(ident => Identifier.of(Array(ident.database.getOrElse("")), ident.table)) + .toArray case _ => throw new NoSuchNamespaceException(namespace) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 20b0143f098c..1e1e86be1502 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.execution.{ColumnarRule, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser} import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.datasources._ @@ -153,6 +154,8 @@ abstract class BaseSessionStateBuilder( protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf) + protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog) + /** * Interface exposed to the user for registering user-defined functions. * @@ -166,7 +169,7 @@ abstract class BaseSessionStateBuilder( * * Note: this depends on the `conf` and `catalog` fields. */ - protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) { + protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: @@ -225,7 +228,7 @@ abstract class BaseSessionStateBuilder( * Note: this depends on `catalog` and `experimentalMethods` fields. */ protected def optimizer: Optimizer = { - new SparkOptimizer(catalog, experimentalMethods) { + new SparkOptimizer(catalogManager, catalog, experimentalMethods) { override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 7fde00fe9a41..ff3d828ff312 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -830,6 +830,67 @@ class DataSourceV2SQLSuite assert(df.collect().map(_.getAs[String](0)).sorted === expected.sorted) } + test("Use: basic tests with USE statements") { + val catalogManager = spark.sessionState.catalogManager + + // Validate the initial current catalog and namespace. + assert(catalogManager.currentCatalog.name() == "session") + assert(catalogManager.currentNamespace === Array("default")) + + // The following implicitly creates namespaces. + sql("CREATE TABLE testcat.ns1.ns1_1.table (id bigint) USING foo") + sql("CREATE TABLE testcat2.ns2.ns2_2.table (id bigint) USING foo") + sql("CREATE TABLE testcat2.ns3.ns3_3.table (id bigint) USING foo") + sql("CREATE TABLE testcat2.testcat.table (id bigint) USING foo") + + // Catalog is resolved to 'testcat'. + sql("USE testcat.ns1.ns1_1") + assert(catalogManager.currentCatalog.name() == "testcat") + assert(catalogManager.currentNamespace === Array("ns1", "ns1_1")) + + // Catalog is resolved to 'testcat2'. + sql("USE testcat2.ns2.ns2_2") + assert(catalogManager.currentCatalog.name() == "testcat2") + assert(catalogManager.currentNamespace === Array("ns2", "ns2_2")) + + // Only the namespace is changed. + sql("USE ns3.ns3_3") + assert(catalogManager.currentCatalog.name() == "testcat2") + assert(catalogManager.currentNamespace === Array("ns3", "ns3_3")) + + // Only the namespace is changed (explicit). + sql("USE NAMESPACE testcat") + assert(catalogManager.currentCatalog.name() == "testcat2") + assert(catalogManager.currentNamespace === Array("testcat")) + + // Catalog is resolved to `testcat`. + sql("USE testcat") + assert(catalogManager.currentCatalog.name() == "testcat") + assert(catalogManager.currentNamespace === Array()) + } + + test("Use: set v2 catalog as a current catalog") { + val catalogManager = spark.sessionState.catalogManager + assert(catalogManager.currentCatalog.name() == "session") + + sql("USE testcat") + assert(catalogManager.currentCatalog.name() == "testcat") + } + + test("Use: v2 session catalog is used and namespace does not exist") { + val exception = intercept[NoSuchDatabaseException] { + sql("USE ns1") + } + assert(exception.getMessage.contains("Database 'ns1' not found")) + } + + test("Use: v2 catalog is used and namespace does not exist") { + // Namespaces are not required to exist for v2 catalogs. + sql("USE testcat.ns1.ns2") + val catalogManager = spark.sessionState.catalogManager + assert(catalogManager.currentNamespace === Array("ns1", "ns2")) + } + test("tableCreation: partition column case insensitive resolution") { val testCatalog = catalog("testcat").asTableCatalog val sessionCatalog = catalog("session").asTableCatalog diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 3fc7908e9c4c..75b21a4458fd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -67,7 +67,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session /** * A logical query plan `Analyzer` with rules specific to Hive. */ - override protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) { + override protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: