Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
d956f10
initial checkin
imback82 Sep 6, 2019
826c148
add rules
imback82 Sep 7, 2019
eda2d9d
Merge branch 'master' into use_namespace + more feature work
imback82 Sep 11, 2019
3faebcf
more changes
imback82 Sep 11, 2019
b1b59fc
adding more unit tests
imback82 Sep 12, 2019
f8b3058
minor cleanups
imback82 Sep 12, 2019
081fdb3
Merge branch 'master' into use_namespace
imback82 Sep 12, 2019
0d5246d
update docs/sql-keywords.md
imback82 Sep 12, 2019
680b16d
Merge branch 'master' into use_namespace
imback82 Sep 16, 2019
12685e0
Address PR comments (introduce BaseSessionCatalog)
imback82 Sep 17, 2019
ad08096
Fix tests
imback82 Sep 17, 2019
1fb5751
Fix java style check failures.
imback82 Sep 17, 2019
45d4cf1
Address PR comments
imback82 Sep 18, 2019
55d7d46
Merge branch 'master' into use_namespace
imback82 Sep 18, 2019
4d2865c
Address PR comments
imback82 Sep 18, 2019
467a8fa
Update CATALOG keyword info.
imback82 Sep 18, 2019
2dbf689
Pass in CatalogManager to Optmimizer instead of SessionCatalog: fixes…
imback82 Sep 19, 2019
86b6c79
Address PR comments. Fix hive-thriftserver tests.
imback82 Sep 21, 2019
f87247e
Add a test for session catalog.
imback82 Sep 21, 2019
8c5a337
Merge branch 'master' into use_namespace
imback82 Sep 21, 2019
e6e71a9
Fix build error + check namespace existence.
imback82 Sep 21, 2019
401f275
Fix failing tests
imback82 Sep 22, 2019
f252bec
Merge branch 'master' into use_namespace
imback82 Sep 26, 2019
30a0f7a
Fix hive tests
imback82 Sep 26, 2019
110f6bf
Fix GetCurrentDatabase to use namespace.quoted
imback82 Sep 26, 2019
341d8d2
Address PR comments
imback82 Sep 27, 2019
a1b6d72
Fix formatting
imback82 Sep 27, 2019
d42d72a
Fix compile errors
imback82 Sep 27, 2019
c192710
Merge branch 'master' into use_namespace
imback82 Sep 27, 2019
1579f97
Fix unit tests
imback82 Sep 27, 2019
b7c1e79
Fix sql tests
imback82 Sep 27, 2019
0bceddb
address PR comments
imback82 Sep 28, 2019
b33060b
Simplify USE parsing.
imback82 Sep 28, 2019
2534baf
Retain existing formatting
imback82 Sep 28, 2019
3f2e1c1
Address PR comments
imback82 Sep 30, 2019
57f05fd
Merge branch 'master' into use_namespace
imback82 Oct 2, 2019
88872ea
formatting
imback82 Oct 2, 2019
8485619
Reverting back GlobalTempView test since namespace check is not requi…
imback82 Oct 2, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-keywords.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ Below is a list of all the keywords in Spark SQL.
<tr><td>MONTH</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>MONTHS</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>MSCK</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>NAMESPACE</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>NAMESPACES</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>NATURAL</td><td>reserved</td><td>strict-non-reserved</td><td>reserved</td></tr>
<tr><td>NO</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ singleTableSchema
statement
: query #statementDefault
| ctes? dmlStatementNoWith #dmlStatement
| USE db=errorCapturingIdentifier #use
| USE NAMESPACE? multipartIdentifier #use
| CREATE database (IF NOT EXISTS)? db=errorCapturingIdentifier
((COMMENT comment=STRING) |
locationSpec |
Expand Down Expand Up @@ -1019,6 +1019,7 @@ ansiNonReserved
| MINUTES
| MONTHS
| MSCK
| NAMESPACE
| NAMESPACES
| NO
| NULLS
Expand Down Expand Up @@ -1270,6 +1271,7 @@ nonReserved
| MONTH
| MONTHS
| MSCK
| NAMESPACE
| NAMESPACES
| NO
| NOT
Expand Down Expand Up @@ -1532,6 +1534,7 @@ MINUTES: 'MINUTES';
MONTH: 'MONTH';
MONTHS: 'MONTHS';
MSCK: 'MSCK';
NAMESPACE: 'NAMESPACE';
NAMESPACES: 'NAMESPACES';
NATURAL: 'NATURAL';
NO: 'NO';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@

/**
* An API to extend the Spark built-in session catalog. Implementation can get the built-in session
* catalog from {@link #setDelegateCatalog(TableCatalog)}, implement catalog functions with
* catalog from {@link #setDelegateCatalog(CatalogPlugin)}, implement catalog functions with
* some custom logic and call the built-in session catalog at the end. For example, they can
* implement {@code createTable}, do something else before calling {@code createTable} of the
* built-in session catalog.
*/
@Experimental
public interface CatalogExtension extends TableCatalog {
public interface CatalogExtension extends TableCatalog, SupportsNamespaces {

/**
* This will be called only once by Spark to pass in the Spark built-in session catalog, after
* {@link #initialize(String, CaseInsensitiveStringMap)} is called.
*/
void setDelegateCatalog(TableCatalog delegate);
void setDelegateCatalog(CatalogPlugin delegate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
Expand All @@ -36,9 +37,9 @@
@Experimental
public abstract class DelegatingCatalogExtension implements CatalogExtension {

private TableCatalog delegate;
private CatalogPlugin delegate;

public final void setDelegateCatalog(TableCatalog delegate) {
public final void setDelegateCatalog(CatalogPlugin delegate) {
this.delegate = delegate;
}

Expand All @@ -52,22 +53,22 @@ public final void initialize(String name, CaseInsensitiveStringMap options) {}

@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
return delegate.listTables(namespace);
return asTableCatalog().listTables(namespace);
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
return delegate.loadTable(ident);
return asTableCatalog().loadTable(ident);
}

@Override
public void invalidateTable(Identifier ident) {
delegate.invalidateTable(ident);
asTableCatalog().invalidateTable(ident);
}

@Override
public boolean tableExists(Identifier ident) {
return delegate.tableExists(ident);
return asTableCatalog().tableExists(ident);
}

@Override
Expand All @@ -76,25 +77,78 @@ public Table createTable(
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
return delegate.createTable(ident, schema, partitions, properties);
return asTableCatalog().createTable(ident, schema, partitions, properties);
}

@Override
public Table alterTable(
Identifier ident,
TableChange... changes) throws NoSuchTableException {
return delegate.alterTable(ident, changes);
return asTableCatalog().alterTable(ident, changes);
}

@Override
public boolean dropTable(Identifier ident) {
return delegate.dropTable(ident);
return asTableCatalog().dropTable(ident);
}

@Override
public void renameTable(
Identifier oldIdent,
Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException {
delegate.renameTable(oldIdent, newIdent);
asTableCatalog().renameTable(oldIdent, newIdent);
}

@Override
public String[] defaultNamespace() {
return asNamespaceCatalog().defaultNamespace();
}

@Override
public String[][] listNamespaces() throws NoSuchNamespaceException {
return asNamespaceCatalog().listNamespaces();
}

@Override
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
return asNamespaceCatalog().listNamespaces(namespace);
}

@Override
public boolean namespaceExists(String[] namespace) {
return asNamespaceCatalog().namespaceExists(namespace);
}

@Override
public Map<String, String> loadNamespaceMetadata(
String[] namespace) throws NoSuchNamespaceException {
return asNamespaceCatalog().loadNamespaceMetadata(namespace);
}

@Override
public void createNamespace(
String[] namespace,
Map<String, String> metadata) throws NamespaceAlreadyExistsException {
asNamespaceCatalog().createNamespace(namespace, metadata);
}

@Override
public void alterNamespace(
String[] namespace,
NamespaceChange... changes) throws NoSuchNamespaceException {
asNamespaceCatalog().alterNamespace(namespace, changes);
}

@Override
public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
return asNamespaceCatalog().dropNamespace(namespace);
}

private TableCatalog asTableCatalog() {
return (TableCatalog)delegate;
}

private SupportsNamespaces asNamespaceCatalog() {
return (SupportsNamespaces)delegate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* to resolve attribute references.
*/
object SimpleAnalyzer extends Analyzer(
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
},
new CatalogManager(
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true),
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
}),
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))

object FakeV2SessionCatalog extends TableCatalog {
Expand Down Expand Up @@ -118,23 +121,25 @@ object AnalysisContext {
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
class Analyzer(
catalog: SessionCatalog,
v2SessionCatalog: TableCatalog,
override val catalogManager: CatalogManager,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {

private val catalog: SessionCatalog = catalogManager.v1SessionCatalog

// Only for tests.
def this(catalog: SessionCatalog, conf: SQLConf) = {
this(catalog, FakeV2SessionCatalog, conf, conf.optimizerMaxIterations)
this(
new CatalogManager(conf, FakeV2SessionCatalog, catalog),
conf,
conf.optimizerMaxIterations)
}

def this(catalog: SessionCatalog, v2SessionCatalog: TableCatalog, conf: SQLConf) = {
this(catalog, v2SessionCatalog, conf, conf.optimizerMaxIterations)
def this(catalogManager: CatalogManager, conf: SQLConf) = {
this(catalogManager, conf, conf.optimizerMaxIterations)
}

override val catalogManager: CatalogManager = new CatalogManager(conf, v2SessionCatalog, catalog)

def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
AnalysisHelper.markInAnalyzer {
val analyzed = executeAndTrack(plan, tracker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand All @@ -35,7 +36,7 @@ import org.apache.spark.util.Utils
* Abstract class all optimizers should inherit of, contains the standard batches (extending
* Optimizers can override this.
*/
abstract class Optimizer(sessionCatalog: SessionCatalog)
abstract class Optimizer(catalogManager: CatalogManager)
extends RuleExecutor[LogicalPlan] {

// Check for structural integrity of the plan in test mode.
Expand Down Expand Up @@ -129,7 +130,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
EliminateView,
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
GetCurrentDatabase(catalogManager),
RewriteDistinctAggregates,
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -212,7 +213,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
EliminateView.ruleName ::
ReplaceExpressions.ruleName ::
ComputeCurrentTime.ruleName ::
GetCurrentDatabase(sessionCatalog).ruleName ::
GetCurrentDatabase(catalogManager).ruleName ::
RewriteDistinctAggregates.ruleName ::
ReplaceDeduplicateWithAggregate.ruleName ::
ReplaceIntersectWithSemiJoin.ruleName ::
Expand Down Expand Up @@ -318,10 +319,10 @@ object EliminateDistinct extends Rule[LogicalPlan] {
object SimpleTestOptimizer extends SimpleTestOptimizer

class SimpleTestOptimizer extends Optimizer(
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)))
new CatalogManager(
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true),
FakeV2SessionCatalog,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf())))

/**
* Remove redundant aliases from a query plan. A redundant alias is an alias that does not change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import java.time.LocalDate

import scala.collection.mutable

import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.types._


Expand Down Expand Up @@ -78,11 +78,14 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {


/** Replaces the expression of CurrentDatabase with the current database name. */
case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[LogicalPlan] {
case class GetCurrentDatabase(catalogManager: CatalogManager) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val currentNamespace = catalogManager.currentNamespace.quoted

plan transformAllExpressions {
case CurrentDatabase() =>
Literal.create(sessionCatalog.getCurrentDatabase, StringType)
Literal.create(currentNamespace, StringType)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement, UseStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -2462,6 +2462,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
ctx.EXISTS != null)
}

/**
* Create a [[UseStatement]] logical plan.
*/
override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) {
val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
UseStatement(ctx.NAMESPACE != null, nameParts)
}

/**
* Create a [[ShowTablesStatement]] command.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -659,6 +659,14 @@ case class ShowTables(
AttributeReference("tableName", StringType, nullable = false)())
}

/**
* The logical plan of the USE/USE NAMESPACE command that works for v2 catalogs.
*/
case class SetCatalogAndNamespace(
catalogManager: CatalogManager,
catalogName: Option[String],
namespace: Option[Seq[String]]) extends Command

/**
* Insert query result into a directory.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

/**
* A USE statement, as parsed from SQL.
*/
case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends ParsedStatement
Loading