From 2a2f88141f6b0b0ac48d88b3fe868ce32c1e8592 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Fri, 21 Jan 2022 18:39:31 +0800 Subject: [PATCH 1/5] [SPARK-37929][SQL] Support cascade mode for JDBC V2 --- .../sql/jdbc/v2/V2JDBCNamespaceTest.scala | 34 ++++++++++++++++++- .../datasources/jdbc/JdbcUtils.scala | 10 ++++-- .../v2/jdbc/JDBCTableCatalog.scala | 2 +- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 0c6b2701c92b0..d55d9e3929a9d 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -17,21 +17,30 @@ package org.apache.spark.sql.jdbc.v2 +import java.util +import java.util.Collections + import scala.collection.JavaConverters._ import org.apache.logging.log4j.Level import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.connector.catalog.NamespaceChange +import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.tags.DockerTest @DockerTest private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerIntegrationFunSuite { val catalog = new JDBCTableCatalog() + private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] + private val schema: StructType = new StructType() + .add("id", IntegerType) + .add("data", StringType) + def builtinNamespaces: Array[Array[String]] test("listNamespaces: basic behavior") { @@ -60,4 +69,27 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte }.getMessage assert(msg.contains("Namespace 'foo' not found")) } + + test("Drop namespace") { + val ident1 = Identifier.of(Array("foo"), "tab") + // Drop empty namespace without cascade + catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) + assert(catalog.namespaceExists(Array("foo")) === true) + catalog.dropNamespace(Array("foo"), cascade = false) + assert(catalog.namespaceExists(Array("foo")) === false) + + // Drop non empty namespace without cascade + catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) + assert(catalog.namespaceExists(Array("foo")) === true) + catalog.createTable(ident1, schema, Array.empty, emptyProps) + val msg = intercept[IllegalStateException] { + catalog.dropNamespace(Array("foo"), cascade = false) + }.getMessage + assert(msg.contains("Namespace foo is not empty")) + + // Drop non empty namespace with cascade + assert(catalog.namespaceExists(Array("foo")) === true) + catalog.dropNamespace(Array("foo"), cascade = false) + assert(catalog.namespaceExists(Array("foo")) === false) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 7f68a73f8950a..cc40d19693b4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1014,9 +1014,15 @@ object JdbcUtils extends Logging with SQLConfHelper { /** * Drops a namespace from the JDBC database. */ - def dropNamespace(conn: Connection, options: JDBCOptions, namespace: String): Unit = { + def dropNamespace( + conn: Connection, options: JDBCOptions, namespace: String, cascade: Boolean): Unit = { val dialect = JdbcDialects.get(options.url) - executeStatement(conn, options, s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)}") + val dropCmd = if (cascade) { + s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)} CASCADE" + } else { + s"DROP SCHEMA ${dialect.quoteIdentifier(namespace)}" + } + executeStatement(conn, options, dropCmd) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 1658f0dce7fbe..8a8ec2c61a5ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -287,7 +287,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging } JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) { - JdbcUtils.dropNamespace(conn, options, db) + JdbcUtils.dropNamespace(conn, options, db, cascade) true } } From 146ac41d3c7a47afa58fd368a0cfb51b7fcf92fd Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Sat, 22 Jan 2022 09:03:49 +0800 Subject: [PATCH 2/5] Update code --- .../org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index d55d9e3929a9d..c2f71582d9a89 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -89,7 +89,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte // Drop non empty namespace with cascade assert(catalog.namespaceExists(Array("foo")) === true) - catalog.dropNamespace(Array("foo"), cascade = false) + catalog.dropNamespace(Array("foo"), cascade = true) assert(catalog.namespaceExists(Array("foo")) === false) } } From 4ebd3f3c79748ce1223396b822a7fd3561bcd28b Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Sat, 22 Jan 2022 18:02:32 +0800 Subject: [PATCH 3/5] Update code --- .../sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 8a8ec2c61a5ec..7b0f1c815bc2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -282,7 +282,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging namespace: Array[String], cascade: Boolean): Boolean = namespace match { case Array(db) if namespaceExists(namespace) => - if (listTables(Array(db)).nonEmpty) { + if (!cascade && listTables(Array(db)).nonEmpty) { throw QueryExecutionErrors.namespaceNotEmptyError(namespace) } JdbcUtils.withConnection(options) { conn => From 0744168ab678b0d0297c95ad96c8d5b477201a25 Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Tue, 25 Jan 2022 14:03:16 +0800 Subject: [PATCH 4/5] Update code --- .../org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala | 7 +++---- .../execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 3 --- .../scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 3 +++ .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 1 + 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index c2f71582d9a89..79831fc87b4fc 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -27,7 +27,7 @@ import org.apache.logging.log4j.Level import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog -import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite +import org.apache.spark.sql.jdbc.{DockerIntegrationFunSuite, NamespaceNotEmptyException} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.tags.DockerTest @@ -82,10 +82,9 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) assert(catalog.namespaceExists(Array("foo")) === true) catalog.createTable(ident1, schema, Array.empty, emptyProps) - val msg = intercept[IllegalStateException] { + val msg = intercept[NamespaceNotEmptyException] { catalog.dropNamespace(Array("foo"), cascade = false) - }.getMessage - assert(msg.contains("Namespace foo is not empty")) + } // Drop non empty namespace with cascade assert(catalog.namespaceExists(Array("foo")) === true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 7b0f1c815bc2d..d06a28d952b38 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -282,9 +282,6 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging namespace: Array[String], cascade: Boolean): Boolean = namespace match { case Array(db) if namespaceExists(namespace) => - if (!cascade && listTables(Array(db)).nonEmpty) { - throw QueryExecutionErrors.namespaceNotEmptyError(namespace) - } JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) { JdbcUtils.dropNamespace(conn, options, db, cascade) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 7b8b362e64c6d..9b2275416b646 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -51,6 +51,9 @@ import org.apache.spark.sql.types._ @DeveloperApi case class JdbcType(databaseTypeDefinition : String, jdbcNullType : Int) +class NamespaceNotEmptyException(message: String, cause: Option[Throwable] = None) + extends AnalysisException(message, cause = cause) + /** * :: DeveloperApi :: * Encapsulates everything (extensions, workarounds, quirks) to handle the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 356cb4ddbd008..11f580f91daf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -215,6 +215,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => throw new IndexAlreadyExistsException(message, cause = Some(e)) case "42704" => throw new NoSuchIndexException(message, cause = Some(e)) + case "2BP01" => throw new NamespaceNotEmptyException(message, cause = Some(e)) case _ => super.classifyException(message, e) } case unsupported: UnsupportedOperationException => throw unsupported From 6bc248d24e446ec871b5c4e11570551636bac1df Mon Sep 17 00:00:00 2001 From: Jiaan Geng Date: Wed, 26 Jan 2022 10:54:04 +0800 Subject: [PATCH 5/5] Update code --- .../org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala | 5 +++-- .../main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala | 3 --- .../scala/org/apache/spark/sql/jdbc/PostgresDialect.scala | 4 ++-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala index 79831fc87b4fc..4f56f1f4ea1e7 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala @@ -25,9 +25,10 @@ import scala.collection.JavaConverters._ import org.apache.logging.log4j.Level import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange} import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog -import org.apache.spark.sql.jdbc.{DockerIntegrationFunSuite, NamespaceNotEmptyException} +import org.apache.spark.sql.jdbc.DockerIntegrationFunSuite import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.tags.DockerTest @@ -82,7 +83,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte catalog.createNamespace(Array("foo"), Map("comment" -> "test comment").asJava) assert(catalog.namespaceExists(Array("foo")) === true) catalog.createTable(ident1, schema, Array.empty, emptyProps) - val msg = intercept[NamespaceNotEmptyException] { + intercept[NonEmptyNamespaceException] { catalog.dropNamespace(Array("foo"), cascade = false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 9b2275416b646..7b8b362e64c6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -51,9 +51,6 @@ import org.apache.spark.sql.types._ @DeveloperApi case class JdbcType(databaseTypeDefinition : String, jdbcNullType : Int) -class NamespaceNotEmptyException(message: String, cause: Option[Throwable] = None) - extends AnalysisException(message, cause = cause) - /** * :: DeveloperApi :: * Encapsulates everything (extensions, workarounds, quirks) to handle the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 11f580f91daf4..fc33a57da7282 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -23,7 +23,7 @@ import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException} +import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException} import org.apache.spark.sql.connector.expressions.NamedReference import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo @@ -215,7 +215,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => throw new IndexAlreadyExistsException(message, cause = Some(e)) case "42704" => throw new NoSuchIndexException(message, cause = Some(e)) - case "2BP01" => throw new NamespaceNotEmptyException(message, cause = Some(e)) + case "2BP01" => throw NonEmptyNamespaceException(message, cause = Some(e)) case _ => super.classifyException(message, e) } case unsupported: UnsupportedOperationException => throw unsupported