Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,7 @@ class InMemoryCatalog(
val db = tableDefinition.identifier.database.get
requireDbExists(db)
val table = tableDefinition.identifier.table
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistsException(db = db, table = table)
}
} else {
if (!tableExists(db, table)) {
// Set the default table location if this is a managed table and its location is not
// specified.
// Ideally we should not create a managed table with location, but Hive serde table can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,37 @@ class SessionCatalog(
/**
* Create a metastore table in the database specified in `tableDefinition`.
* If no such database is specified, create it in the current database.
* suggestIgnoreIfPathExists suggest whether should check if the path of table
* exists when ignoreIfExists is false, if it is false, it will only check the
* path of a managed table, if it is true, it will not do the check for all
* type tables.
*/
def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = {
def createTable(
tableDefinition: CatalogTable,
ignoreIfExists: Boolean,
suggestIgnoreIfPathExists: Boolean = false): Unit = {
val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableDefinition.identifier.table)
validateName(table)
val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db)))
requireDbExists(db)

if (!ignoreIfExists) {
if (tableExists(newTableDefinition.identifier)) {
throw new TableAlreadyExistsException(db = db, table = table)
}
// As discussed in SPARK-19583, the default location of a managed table should not exists
if (!suggestIgnoreIfPathExists && tableDefinition.tableType == CatalogTableType.MANAGED) {
val tblLocationPath =
new Path(defaultTablePath(tableDefinition.identifier))
val fs = tblLocationPath.getFileSystem(hadoopConf)
if (fs.exists(tblLocationPath)) {
throw new AnalysisException(s"the location('$tblLocationPath') of table" +
s"('${newTableDefinition.identifier}') already exists.")
}
}
}

externalCatalog.createTable(newTableDefinition, ignoreIfExists)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
assert(actual.tableType === CatalogTableType.EXTERNAL)
}

test("create table when the table already exists") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
val table = newTable("tbl1", "db2")
intercept[TableAlreadyExistsException] {
catalog.createTable(table, ignoreIfExists = false)
}
}

test("drop table") {
val catalog = newBasicCatalog()
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ case class CreateDataSourceTableAsSelectCommand(
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
schema = result.schema)
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
// the path of the table has been created above before create table, we should not
// check if the path existes, so suggestIgnoreIfPathExists set to true .
sessionState.catalog.createTable(newTable, ignoreIfExists = false,
suggestIgnoreIfPathExists = true)

result match {
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1952,4 +1952,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}

test("CTAS for managed data source table with a created default location throw an exception") {
withTable("t", "t1", "t2") {
val warehousePath = spark.sharedState.warehousePath.stripPrefix("file:")
val tFile = new File(warehousePath, "t")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are lots of non-existent default location test cases, so here we just add existed default location test cases

tFile.mkdirs()
assert(tFile.exists)

val e = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists."))

// partition table(table path exists)
val tFile1 = new File(warehousePath, "t1")
tFile1.mkdirs()
assert(tFile1.exists)
val e1 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t1
|USING parquet
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists."))

// partition table(partition path exists)
val tFile2 = new File(warehousePath, "t2")
val tPartFile = new File(tFile2, "a=3/b=4")
tPartFile.mkdirs()
assert(tPartFile.exists)
val e2 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t2
|USING parquet
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
requireDbExists(db)
verifyTableProperties(tableDefinition)

if (tableExists(db, table) && !ignoreIfExists) {
throw new TableAlreadyExistsException(db = db, table = table)
}

if (tableDefinition.tableType == VIEW) {
client.createTable(tableDefinition, ignoreIfExists)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1289,7 +1289,7 @@ class HiveDDLSuite
import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX
import org.apache.spark.sql.hive.HiveExternalCatalog.STATISTICS_PREFIX

withTable("tbl") {
withTable("tbl", "tbl1") {
sql("CREATE TABLE tbl(a INT) STORED AS parquet")

Seq(DATASOURCE_PREFIX, STATISTICS_PREFIX).foreach { forbiddenPrefix =>
Expand All @@ -1304,7 +1304,7 @@ class HiveDDLSuite
assert(e2.getMessage.contains(forbiddenPrefix + "foo"))

val e3 = intercept[AnalysisException] {
sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
sql(s"CREATE TABLE tbl1 (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
}
assert(e3.getMessage.contains(forbiddenPrefix + "foo"))
}
Expand Down Expand Up @@ -1587,4 +1587,173 @@ class HiveDDLSuite
}
}
}

test("CTAS for managed datasource table with a created default location throw an exception") {
withTable("t", "t1", "t2") {
val warehousePath = spark.sharedState.warehousePath
val tFile = new File(warehousePath, "t")
tFile.mkdirs()
assert(tFile.exists)

val e = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists."))

// partition table(table path exists)
val tFile1 = new File(warehousePath, "t1")
tFile1.mkdirs()
assert(tFile1.exists)
val e1 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t1
|USING parquet
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists."))

// partition table(partition path exists)
val tFile2 = new File(warehousePath, "t2")
val tPartFile = new File(tFile2, "a=3/b=4")
tPartFile.mkdirs()
assert(tPartFile.exists)
val e2 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t2
|USING parquet
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists."))
}
}

test("CTAS for managed hive table with a created default location throw an exception") {
withTable("t", "t1", "t2") {
val warehousePath = spark.sharedState.warehousePath
val tFile = new File(warehousePath, "t")
tFile.mkdirs()
assert(tFile.exists)

val e = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t
|USING hive
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" +
s"('`default`.`t`') already exists."))

// partition table(table path exists)
val tFile1 = new File(warehousePath, "t1")
tFile1.mkdirs()
assert(tFile1.exists)
val e1 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t1
|USING hive
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" +
s"('`default`.`t1`') already exists."))

// partition table(partition path exists)
val tFile2 = new File(warehousePath, "t2")
val tPartFile = new File(tFile2, "a=3/b=4")
tPartFile.mkdirs()
assert(tPartFile.exists)
val e2 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t2
|USING hive
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" +
s"('`default`.`t1`') already exists."))
}
}

test("create table for managed datasource table with a created location throw an exception") {
withTable("t", "t1") {
val warehousePath = spark.sharedState.warehousePath
val tFile = new File(warehousePath, "t")
tFile.mkdirs()
assert(tFile.exists)

val e = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t(a string, b string)
|USING parquet
""".stripMargin)
}.getMessage
assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" +
s"('`default`.`t`') already exists."))
// partition table(table path exists)
val tFile1 = new File(warehousePath, "t1")
tFile1.mkdirs()
assert(tFile1.exists)
val e1 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t1(a string, b string)
|USING parquet
|PARTITIONED BY(a)
""".stripMargin)
}.getMessage
assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" +
s"('`default`.`t1`') already exists."))
}
}

test("create table for managed hive table with a created location throw an exception") {
withTable("t", "t1") {
val warehousePath = spark.sharedState.warehousePath
val tFile = new File(warehousePath, "t")
tFile.mkdirs()
assert(tFile.exists)

val e = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t(a string, b string)
|USING hive
""".stripMargin)
}.getMessage
assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" +
s"('`default`.`t`') already exists."))
// partition table(table path exists)
val tFile1 = new File(warehousePath, "t1")
tFile1.mkdirs()
assert(tFile1.exists)
val e1 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t1(a string, b string)
|USING hive
|PARTITIONED BY(a)
""".stripMargin)
}.getMessage
assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" +
s"('`default`.`t1`') already exists."))
}
}
}