diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 15aed5f9b1bd..bc0d7f434be4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -186,11 +186,7 @@ class InMemoryCatalog( val db = tableDefinition.identifier.database.get requireDbExists(db) val table = tableDefinition.identifier.table - if (tableExists(db, table)) { - if (!ignoreIfExists) { - throw new TableAlreadyExistsException(db = db, table = table) - } - } else { + if (!tableExists(db, table)) { // Set the default table location if this is a managed table and its location is not // specified. // Ideally we should not create a managed table with location, but Hive serde table can diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 73ef0e6a1869..317448002822 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -249,13 +249,37 @@ class SessionCatalog( /** * Create a metastore table in the database specified in `tableDefinition`. * If no such database is specified, create it in the current database. + * suggestIgnoreIfPathExists suggest whether should check if the path of table + * exists when ignoreIfExists is false, if it is false, it will only check the + * path of a managed table, if it is true, it will not do the check for all + * type tables. */ - def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { + def createTable( + tableDefinition: CatalogTable, + ignoreIfExists: Boolean, + suggestIgnoreIfPathExists: Boolean = false): Unit = { val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) validateName(table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) requireDbExists(db) + + if (!ignoreIfExists) { + if (tableExists(newTableDefinition.identifier)) { + throw new TableAlreadyExistsException(db = db, table = table) + } + // As discussed in SPARK-19583, the default location of a managed table should not exists + if (!suggestIgnoreIfPathExists && tableDefinition.tableType == CatalogTableType.MANAGED) { + val tblLocationPath = + new Path(defaultTablePath(tableDefinition.identifier)) + val fs = tblLocationPath.getFileSystem(hadoopConf) + if (fs.exists(tblLocationPath)) { + throw new AnalysisException(s"the location('$tblLocationPath') of table" + + s"('${newTableDefinition.identifier}') already exists.") + } + } + } + externalCatalog.createTable(newTableDefinition, ignoreIfExists) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index a5d399a06558..484bcfe492f2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -162,15 +162,6 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(actual.tableType === CatalogTableType.EXTERNAL) } - test("create table when the table already exists") { - val catalog = newBasicCatalog() - assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) - val table = newTable("tbl1", "db2") - intercept[TableAlreadyExistsException] { - catalog.createTable(table, ignoreIfExists = false) - } - } - test("drop table") { val catalog = newBasicCatalog() assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 5abd57947650..36ff5e388824 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -158,7 +158,10 @@ case class CreateDataSourceTableAsSelectCommand( // the schema of df). It is important since the nullability may be changed by the relation // provider (for example, see org.apache.spark.sql.parquet.DefaultSource). schema = result.schema) - sessionState.catalog.createTable(newTable, ignoreIfExists = false) + // the path of the table has been created above before create table, we should not + // check if the path existes, so suggestIgnoreIfPathExists set to true . + sessionState.catalog.createTable(newTable, ignoreIfExists = false, + suggestIgnoreIfPathExists = true) result match { case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b44f20e367f0..30a096006b7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1952,4 +1952,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } } + + test("CTAS for managed data source table with a created default location throw an exception") { + withTable("t", "t1", "t2") { + val warehousePath = spark.sharedState.warehousePath.stripPrefix("file:") + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t + |USING parquet + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists.")) + + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1 + |USING parquet + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists.")) + + // partition table(partition path exists) + val tFile2 = new File(warehousePath, "t2") + val tPartFile = new File(tFile2, "a=3/b=4") + tPartFile.mkdirs() + assert(tPartFile.exists) + val e2 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t2 + |USING parquet + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists.")) + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ea4825614785..3cb9080bd9ce 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -195,10 +195,6 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireDbExists(db) verifyTableProperties(tableDefinition) - if (tableExists(db, table) && !ignoreIfExists) { - throw new TableAlreadyExistsException(db = db, table = table) - } - if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 792ac1e25949..0ade2cec3389 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1289,7 +1289,7 @@ class HiveDDLSuite import org.apache.spark.sql.hive.HiveExternalCatalog.DATASOURCE_PREFIX import org.apache.spark.sql.hive.HiveExternalCatalog.STATISTICS_PREFIX - withTable("tbl") { + withTable("tbl", "tbl1") { sql("CREATE TABLE tbl(a INT) STORED AS parquet") Seq(DATASOURCE_PREFIX, STATISTICS_PREFIX).foreach { forbiddenPrefix => @@ -1304,7 +1304,7 @@ class HiveDDLSuite assert(e2.getMessage.contains(forbiddenPrefix + "foo")) val e3 = intercept[AnalysisException] { - sql(s"CREATE TABLE tbl (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") + sql(s"CREATE TABLE tbl1 (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") } assert(e3.getMessage.contains(forbiddenPrefix + "foo")) } @@ -1587,4 +1587,173 @@ class HiveDDLSuite } } } + + test("CTAS for managed datasource table with a created default location throw an exception") { + withTable("t", "t1", "t2") { + val warehousePath = spark.sharedState.warehousePath + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t + |USING parquet + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists.")) + + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1 + |USING parquet + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists.")) + + // partition table(partition path exists) + val tFile2 = new File(warehousePath, "t2") + val tPartFile = new File(tFile2, "a=3/b=4") + tPartFile.mkdirs() + assert(tPartFile.exists) + val e2 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t2 + |USING parquet + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists.")) + } + } + + test("CTAS for managed hive table with a created default location throw an exception") { + withTable("t", "t1", "t2") { + val warehousePath = spark.sharedState.warehousePath + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t + |USING hive + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" + + s"('`default`.`t`') already exists.")) + + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1 + |USING hive + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" + + s"('`default`.`t1`') already exists.")) + + // partition table(partition path exists) + val tFile2 = new File(warehousePath, "t2") + val tPartFile = new File(tFile2, "a=3/b=4") + tPartFile.mkdirs() + assert(tPartFile.exists) + val e2 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t2 + |USING hive + |PARTITIONED BY(a, b) + |AS SELECT 3 as a, 4 as b, 1 as c, 2 as d + """.stripMargin) + }.getMessage + assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" + + s"('`default`.`t1`') already exists.")) + } + } + + test("create table for managed datasource table with a created location throw an exception") { + withTable("t", "t1") { + val warehousePath = spark.sharedState.warehousePath + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t(a string, b string) + |USING parquet + """.stripMargin) + }.getMessage + assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" + + s"('`default`.`t`') already exists.")) + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1(a string, b string) + |USING parquet + |PARTITIONED BY(a) + """.stripMargin) + }.getMessage + assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" + + s"('`default`.`t1`') already exists.")) + } + } + + test("create table for managed hive table with a created location throw an exception") { + withTable("t", "t1") { + val warehousePath = spark.sharedState.warehousePath + val tFile = new File(warehousePath, "t") + tFile.mkdirs() + assert(tFile.exists) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t(a string, b string) + |USING hive + """.stripMargin) + }.getMessage + assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" + + s"('`default`.`t`') already exists.")) + // partition table(table path exists) + val tFile1 = new File(warehousePath, "t1") + tFile1.mkdirs() + assert(tFile1.exists) + val e1 = intercept[AnalysisException] { + spark.sql( + s""" + |CREATE TABLE t1(a string, b string) + |USING hive + |PARTITIONED BY(a) + """.stripMargin) + }.getMessage + assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" + + s"('`default`.`t1`') already exists.")) + } + } }