diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 24e60529d227..5fccce2678f8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -36,12 +36,11 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient @@ -94,22 +93,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } /** - * Run some code involving `client` in a [[synchronized]] block and wrap non-fatal + * Run some code involving `client` in a [[synchronized]] block and wrap certain * exceptions thrown in the process in [[AnalysisException]]. */ - private def withClient[T](body: => T): T = withClientWrappingException { - body - } { - _ => None // Will fallback to default wrapping strategy in withClientWrappingException. - } - - /** - * Run some code involving `client` in a [[synchronized]] block and wrap non-fatal - * exceptions thrown in the process in [[AnalysisException]] using the given - * `wrapException` function. - */ - private def withClientWrappingException[T](body: => T) - (wrapException: Throwable => Option[AnalysisException]): T = synchronized { + private def withClient[T](body: => T): T = synchronized { try { body } catch { @@ -120,11 +107,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case i: InvocationTargetException => i.getCause case o => o } - wrapException(e) match { - case Some(wrapped) => throw wrapped - case None => throw new AnalysisException( - e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) - } + throw new AnalysisException( + e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) } } @@ -204,32 +188,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat override def createDatabase( dbDefinition: CatalogDatabase, - ignoreIfExists: Boolean): Unit = withClientWrappingException { + ignoreIfExists: Boolean): Unit = withClient { client.createDatabase(dbDefinition, ignoreIfExists) - } { exception => - if (exception.getClass.getName.equals( - "org.apache.hadoop.hive.metastore.api.AlreadyExistsException") - && exception.getMessage.contains( - s"Database ${dbDefinition.name} already exists")) { - Some(new DatabaseAlreadyExistsException(dbDefinition.name)) - } else { - None - } } override def dropDatabase( db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = withClient { - try { - client.dropDatabase(db, ignoreIfNotExists, cascade) - } catch { - case NonFatal(exception) => - if (exception.getClass.getName.equals("org.apache.hadoop.hive.ql.metadata.HiveException") - && exception.getMessage.contains(s"Database $db is not empty.")) { - throw QueryCompilationErrors.cannotDropNonemptyDatabaseError(db) - } else throw exception - } + client.dropDatabase(db, ignoreIfNotExists, cascade) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 9c9a4fd2b374..3dddca844750 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -49,7 +49,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression @@ -332,14 +332,24 @@ private[hive] class HiveClientImpl( database: CatalogDatabase, ignoreIfExists: Boolean): Unit = withHiveState { val hiveDb = toHiveDatabase(database, Some(userName)) - shim.createDatabase(client, hiveDb, ignoreIfExists) + try { + shim.createDatabase(client, hiveDb, ignoreIfExists) + } catch { + case _: AlreadyExistsException => + throw new DatabaseAlreadyExistsException(database.name) + } } override def dropDatabase( name: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = withHiveState { - shim.dropDatabase(client, name, true, ignoreIfNotExists, cascade) + try { + shim.dropDatabase(client, name, true, ignoreIfNotExists, cascade) + } catch { + case e: HiveException if e.getMessage.contains(s"Database $name is not empty") => + throw QueryCompilationErrors.cannotDropNonemptyDatabaseError(name) + } } override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 14b2a51bff8c..422a905f69b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException} +import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPermanentFunctionException, PartitionsAlreadyExistException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.util.quietly @@ -184,14 +184,8 @@ class VersionsSuite extends SparkFunSuite with Logging { "temporary", description = "test create", tempDatabasePath, Map()) client.createDatabase(tempDB, ignoreIfExists = true) - try { + intercept[DatabaseAlreadyExistsException] { client.createDatabase(tempDB, ignoreIfExists = false) - assert(false, "createDatabase should throw AlreadyExistsException") - } catch { - case ex: Throwable => - assert(ex.getClass.getName.equals( - "org.apache.hadoop.hive.metastore.api.AlreadyExistsException")) - assert(ex.getMessage.contains(s"Database ${tempDB.name} already exists")) } } @@ -275,6 +269,14 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: dropDatabase") { assert(client.databaseExists("temporary")) + + client.createTable(table("temporary", tableName = "tbl"), ignoreIfExists = false) + val ex = intercept[AnalysisException] { + client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = false) + assert(false, "dropDatabase should throw HiveException") + } + assert(ex.message.contains("Cannot drop a non-empty database: temporary.")) + client.dropDatabase("temporary", ignoreIfNotExists = false, cascade = true) assert(client.databaseExists("temporary") == false) }