Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -162,6 +163,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(actual.tableType === CatalogTableType.EXTERNAL)
}

test("create table when the table already exists") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
val table = newTable("tbl1", "db2")
intercept[TableAlreadyExistsException] {
catalog.createTable(table, ignoreIfExists = false)
}
}

test("drop table") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
Expand Down Expand Up @@ -171,9 +172,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
ignoreIfExists: Boolean): Unit = withClient {
assert(tableDefinition.identifier.database.isDefined)
val db = tableDefinition.identifier.database.get
val table = tableDefinition.identifier.table
requireDbExists(db)
verifyTableProperties(tableDefinition)

if (tableExists(db, table) && !ignoreIfExists) {
throw new TableAlreadyExistsException(db = db, table = table)
}
Copy link
Member Author

@gatorsmile gatorsmile Nov 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a few months, I found the above code looks weird. We should follow the same logics in InMemoryCatalog.scala. Will improve it.

// Before saving data source table metadata into Hive metastore, we should:
// 1. Put table schema, partition column names and bucket specification in table properties.
// 2. Check if this table is hive compatible
Expand Down Expand Up @@ -450,7 +455,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
}

override def tableExists(db: String, table: String): Boolean = withClient {
client.getTableOption(db, table).isDefined
client.tableExists(db, table)
}

override def listTables(db: String): Seq[String] = withClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ private[hive] trait HiveClient {
/** List the names of all the databases that match the specified pattern. */
def listDatabases(pattern: String): Seq[String]

/** Return whether a table/view with the specified name exists. */
def tableExists(dbName: String, tableName: String): Boolean

/** Returns the specified table, or throws [[NoSuchTableException]]. */
final def getTable(dbName: String, tableName: String): CatalogTable = {
getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ private[hive] class HiveClientImpl(
client.getDatabasesByPattern(pattern).asScala
}

override def tableExists(dbName: String, tableName: String): Boolean = withHiveState {
Option(client.getTable(dbName, tableName, false /* do not throw exception */)).nonEmpty
}

override def getTableOption(
dbName: String,
tableName: String): Option[CatalogTable] = withHiveState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
holdDDLTime = false)
}

test(s"$version: tableExists") {
// No exception should be thrown
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hive's tableExists may throw exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(client.tableExists("default", "src"))
assert(!client.tableExists("default", "nonexistent"))
}

test(s"$version: getTable") {
// No exception should be thrown
client.getTable("default", "src")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import org.scalatest.BeforeAndAfterEach

import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -637,6 +639,37 @@ class HiveDDLSuite
}
}

test("create table with the same name as an index table") {
val tabName = "tab1"
val indexName = tabName + "_index"
withTable(tabName) {
// Spark SQL does not support creating index. Thus, we have to use Hive client.
val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
sql(s"CREATE TABLE $tabName(a int)")

try {
client.runSqlHive(
s"CREATE INDEX $indexName ON TABLE $tabName (a) AS 'COMPACT' WITH DEFERRED REBUILD")
val indexTabName =
spark.sessionState.catalog.listTables("default", s"*$indexName*").head.table
intercept[TableAlreadyExistsException] {
sql(s"CREATE TABLE $indexTabName(b int)")
}
intercept[TableAlreadyExistsException] {
sql(s"ALTER TABLE $tabName RENAME TO $indexTabName")
}

// When tableExists is not invoked, we still can get an AnalysisException
val e = intercept[AnalysisException] {
sql(s"DESCRIBE $indexTabName")
}.getMessage
assert(e.contains("Hive index table is not supported."))
} finally {
client.runSqlHive(s"DROP INDEX IF EXISTS $indexName ON $tabName")
}
}
}

test("desc table for data source table - no user-defined schema") {
Seq("parquet", "json", "orc").foreach { fileFormat =>
withTable("t1") {
Expand Down