Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/sql-keywords.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ Below is a list of all the keywords in Spark SQL.
<tr><td>OVERLAPS</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>OVERLAY</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>OVERWRITE</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>OWNER</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>PARTITION</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>PARTITIONED</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>PARTITIONS</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ statement
SET (DBPROPERTIES | PROPERTIES) tablePropertyList #setNamespaceProperties
| ALTER namespace multipartIdentifier
SET locationSpec #setNamespaceLocation
| ALTER namespace multipartIdentifier
SET OWNER ownerType=(USER | ROLE | GROUP) identifier #setNamespaceOwner
| DROP namespace (IF EXISTS)? multipartIdentifier
(RESTRICT | CASCADE)? #dropNamespace
| SHOW (DATABASES | NAMESPACES) ((FROM | IN) multipartIdentifier)?
Expand Down Expand Up @@ -1095,6 +1097,7 @@ ansiNonReserved
| OVER
| OVERLAY
| OVERWRITE
| OWNER
| PARTITION
| PARTITIONED
| PARTITIONS
Expand Down Expand Up @@ -1347,6 +1350,7 @@ nonReserved
| OVERLAPS
| OVERLAY
| OVERWRITE
| OWNER
| PARTITION
| PARTITIONED
| PARTITIONS
Expand Down Expand Up @@ -1605,6 +1609,7 @@ OVER: 'OVER';
OVERLAPS: 'OVERLAPS';
OVERLAY: 'OVERLAY';
OVERWRITE: 'OVERWRITE';
OWNER: 'OWNER';
PARTITION: 'PARTITION';
PARTITIONED: 'PARTITIONED';
PARTITIONS: 'PARTITIONS';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,16 @@ public interface SupportsNamespaces extends CatalogPlugin {
String PROP_OWNER_TYPE = "ownerType";

/**
* The list of reserved namespace properties.
* The list of reserved namespace properties, which can not be removed or changed directly by
* the syntax:
* {{
* ALTER NAMESPACE ... SET PROPERTIES ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we support UNSET?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have no ALTER NAMESPACE UNSET PROPERTIES yet

* }}
*
* They need specific syntax to modify
*/
List<String> RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION);
List<String> RESERVED_PROPERTIES =
Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_OWNER_NAME, PROP_OWNER_TYPE);

/**
* Return a default namespace for the catalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2532,6 +2532,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
case (PROP_COMMENT, _) =>
throw new ParseException(s"$PROP_COMMENT is a reserved namespace property, please use" +
s" the COMMENT clause to specify it.", ctx)
case (ownership, _) if ownership == PROP_OWNER_NAME || ownership == PROP_OWNER_TYPE =>
throw new ParseException(s"$ownership is a reserved namespace property , please use" +
" ALTER NAMESPACE ... SET OWNER ... to specify it.", ctx)
case _ =>
}
properties
Expand Down Expand Up @@ -3527,4 +3530,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
CommentOnTable(UnresolvedTable(nameParts), comment)
}

/**
* Create an [[AlterNamespaceSetOwner]] logical plan.
*
* For example:
* {{{
* ALTER (DATABASE|SCHEMA|NAMESPACE) namespace SET OWNER (USER|ROLE|GROUP) identityName;
* }}}
*/
override def visitSetNamespaceOwner(ctx: SetNamespaceOwnerContext): LogicalPlan = {
withOrigin(ctx) {
val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
AlterNamespaceSetOwner(
UnresolvedNamespace(nameParts),
ctx.identifier.getText,
ctx.ownerType.getText)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,16 @@ case class AlterNamespaceSetLocation(
override def children: Seq[LogicalPlan] = Seq(namespace)
}

/**
* ALTER (DATABASE|SCHEMA|NAMESPACE) ... SET OWNER command, as parsed from SQL.
*/
case class AlterNamespaceSetOwner(
child: LogicalPlan,
ownerName: String,
ownerType: String) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan of the SHOW NAMESPACES command that works for v2 catalogs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,19 @@ class DDLParserSuite extends AnalysisTest {
UnresolvedNamespace(Seq("a", "b", "c")), "/home/user/db"))
}

test("set namespace owner") {
comparePlans(
parsePlan("ALTER DATABASE a.b.c SET OWNER USER user1"),
AlterNamespaceSetOwner(UnresolvedNamespace(Seq("a", "b", "c")), "user1", "USER"))

comparePlans(
parsePlan("ALTER DATABASE a.b.c SET OWNER ROLE role1"),
AlterNamespaceSetOwner(UnresolvedNamespace(Seq("a", "b", "c")), "role1", "ROLE"))
comparePlans(
parsePlan("ALTER DATABASE a.b.c SET OWNER GROUP group1"),
AlterNamespaceSetOwner(UnresolvedNamespace(Seq("a", "b", "c")), "group1", "GROUP"))
}

test("show databases: basic") {
comparePlans(
parsePlan("SHOW DATABASES"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ case class DescribeDatabaseCommand(
Row("Owner Type", allDbProperties.getOrElse(PROP_OWNER_TYPE, "")) :: Nil

if (extended) {
val properties = allDbProperties -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)
val properties = allDbProperties -- RESERVED_PROPERTIES.asScala
val propertiesStr =
if (properties.isEmpty) {
""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
import org.apache.spark.util.Utils

/**
* Physical plan node for creating a namespace.
Expand All @@ -35,11 +36,14 @@ case class CreateNamespaceExec(
extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._

val ns = namespace.toArray
if (!catalog.namespaceExists(ns)) {
try {
catalog.createNamespace(ns, properties.asJava)
val ownership =
Map(PROP_OWNER_NAME -> Utils.getCurrentUserName(), PROP_OWNER_TYPE -> "USER")
catalog.createNamespace(ns, (properties ++ ownership).asJava)
} catch {
case _: NamespaceAlreadyExistsException if ifNotExists =>
logWarning(s"Namespace ${namespace.quoted} was created concurrently. Ignoring.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetLocation, AlterNamespaceSetProperties, AlterTable, AppendData, CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetLocation, AlterNamespaceSetOwner, AlterNamespaceSetProperties, AlterTable, AppendData, CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
Expand Down Expand Up @@ -249,6 +249,11 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case r @ ShowTableProperties(DataSourceV2Relation(table, _, _), propertyKey) =>
ShowTablePropertiesExec(r.output, table, propertyKey) :: Nil

case AlterNamespaceSetOwner(ResolvedNamespace(catalog, namespace), name, typ) =>
val properties =
Map(SupportsNamespaces.PROP_OWNER_NAME -> name, SupportsNamespaces.PROP_OWNER_TYPE -> typ)
AlterNamespaceSetPropertiesExec(catalog, namespace, properties) :: Nil

case _ => Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,31 @@ case class DescribeNamespaceExec(
namespace: Seq[String],
isExtended: Boolean) extends V2CommandExec {
private val encoder = RowEncoder(StructType.fromAttributes(output)).resolveAndBind()
import SupportsNamespaces._

override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()
val ns = namespace.toArray
val metadata = catalog.loadNamespaceMetadata(ns)

rows += toCatalystRow("Namespace Name", ns.last)
rows += toCatalystRow("Description", metadata.get(SupportsNamespaces.PROP_COMMENT))
rows += toCatalystRow("Location", metadata.get(SupportsNamespaces.PROP_LOCATION))
Option(metadata.get(PROP_COMMENT)).foreach {
rows += toCatalystRow("Description", _)
}
Option(metadata.get(PROP_LOCATION)).foreach {
rows += toCatalystRow("Location", _)
}
Option(metadata.get(PROP_OWNER_NAME)).foreach {
rows += toCatalystRow("Owner Name", _)
}
Option(metadata.get(PROP_OWNER_TYPE)).foreach {
rows += toCatalystRow("Owner Type", _)
}

if (isExtended) {
val properties =
metadata.asScala.toSeq.filter(p =>
!SupportsNamespaces.RESERVED_PROPERTIES.contains(p._1))
val properties = metadata.asScala -- RESERVED_PROPERTIES.asScala
if (properties.nonEmpty) {
rows += toCatalystRow("Properties", properties.mkString("(", ",", ")"))
rows += toCatalystRow("Properties", properties.toSeq.mkString("(", ",", ")"))
}
}
rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ private[sql] object V2SessionCatalog {
.map(CatalogUtils.stringToURI)
.orElse(defaultLocation)
.getOrElse(throw new IllegalArgumentException("Missing database location")),
properties = metadata.asScala.toMap -- SupportsNamespaces.RESERVED_PROPERTIES.asScala)
properties = metadata.asScala.toMap --
Seq(SupportsNamespaces.PROP_COMMENT, SupportsNamespaces.PROP_LOCATION))
}

private implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.SimpleScanSource
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

class DataSourceV2SQLSuite
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = true)
Expand Down Expand Up @@ -884,7 +885,8 @@ class DataSourceV2SQLSuite
.isEmpty, s"$key is a reserved namespace property and ignored")
val meta =
catalog("testcat").asNamespaceCatalog.loadNamespaceMetadata(Array("reservedTest"))
assert(meta.get(key) === null, "reserved properties should not have side effects")
assert(meta.get(key) == null || !meta.get(key).contains("foo"),
"reserved properties should not have side effects")
}
}
}
Expand Down Expand Up @@ -967,7 +969,9 @@ class DataSourceV2SQLSuite
assert(description === Seq(
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test")
Row("Location", "/tmp/ns_test"),
Row("Owner Name", Utils.getCurrentUserName()),
Row("Owner Type", "USER")
))
}
}
Expand All @@ -982,6 +986,8 @@ class DataSourceV2SQLSuite
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test"),
Row("Owner Name", Utils.getCurrentUserName()),
Row("Owner Type", "USER"),
Row("Properties", "((a,b),(b,a),(c,c))")
))
}
Expand Down Expand Up @@ -1010,7 +1016,8 @@ class DataSourceV2SQLSuite
.isEmpty, s"$key is a reserved namespace property and ignored")
val meta =
catalog("testcat").asNamespaceCatalog.loadNamespaceMetadata(Array("reservedTest"))
assert(meta.get(key) === null, "reserved properties should not have side effects")
assert(meta.get(key) == null || !meta.get(key).contains("foo"),
"reserved properties should not have side effects")
}
}
}
Expand All @@ -1025,7 +1032,25 @@ class DataSourceV2SQLSuite
assert(descriptionDf.collect() === Seq(
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test_2")
Row("Location", "/tmp/ns_test_2"),
Row("Owner Name", Utils.getCurrentUserName()),
Row("Owner Type", "USER")
))
}
}

test("AlterNamespaceSetOwner using v2 catalog") {
withNamespace("testcat.ns1.ns2") {
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " +
"'test namespace' LOCATION '/tmp/ns_test_3'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, does ANSI SQL define a OWNER clause for CREATE TABLE/NAMESPACE?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the Standard and did not find such a clause for CREATE TABLE/NAMESPACE syntaxes

sql("ALTER NAMESPACE testcat.ns1.ns2 SET OWNER ROLE adminRole")
val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2")
assert(descriptionDf.collect() === Seq(
Row("Namespace Name", "ns2"),
Row("Description", "test namespace"),
Row("Location", "/tmp/ns_test_3"),
Row("Owner Name", "adminRole"),
Row("Owner Type", "ROLE")
))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1010,31 +1010,18 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(exc.getMessage.contains(testNs.quoted))
}

test("alterNamespace: fail to remove location") {
test("alterNamespace: fail to remove reserved properties") {
val catalog = newCatalog()

catalog.createNamespace(testNs, emptyProps)

val exc = intercept[UnsupportedOperationException] {
catalog.alterNamespace(testNs, NamespaceChange.removeProperty("location"))
}

assert(exc.getMessage.contains("Cannot remove reserved property: location"))

catalog.dropNamespace(testNs)
}
SupportsNamespaces.RESERVED_PROPERTIES.asScala.foreach { p =>
val exc = intercept[UnsupportedOperationException] {
catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
}
assert(exc.getMessage.contains(s"Cannot remove reserved property: $p"))

test("alterNamespace: fail to remove comment") {
val catalog = newCatalog()

catalog.createNamespace(testNs, Map("comment" -> "test db").asJava)

val exc = intercept[UnsupportedOperationException] {
catalog.alterNamespace(testNs, NamespaceChange.removeProperty("comment"))
}

assert(exc.getMessage.contains("Cannot remove reserved property: comment"))

catalog.dropNamespace(testNs)
}
}
Loading