diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ee1734b1f232c..801be64702519 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan @@ -199,7 +199,7 @@ case class InsertIntoHiveTable( attr.withName(name.toLowerCase(Locale.ROOT)) } - saveAsHiveFile( + val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, @@ -209,6 +209,42 @@ case class InsertIntoHiveTable( if (partition.nonEmpty) { if (numDynamicPartitions > 0) { + if (overwrite && table.tableType == CatalogTableType.EXTERNAL) { + // SPARK-29295: When insert overwrite to a Hive external table partition, if the + // partition does not exist, Hive will not check if the external partition directory + // exists or not before copying files. So if users drop the partition, and then do + // insert overwrite to the same partition, the partition will have both old and new + // data. We construct partition path. If the path exists, we delete it manually. + writtenParts.foreach { partPath => + val dpMap = partPath.split("/").map { part => + val splitPart = part.split("=") + assert(splitPart.size == 2, s"Invalid written partition path: $part") + ExternalCatalogUtils.unescapePathName(splitPart(0)) -> + ExternalCatalogUtils.unescapePathName(splitPart(1)) + }.toMap + + val updatedPartitionSpec = partition.map { + case (key, Some(value)) => key -> value + case (key, None) if dpMap.contains(key) => key -> dpMap(key) + case (key, _) => + throw new SparkException(s"Dynamic partition key $key is not among " + + "written partition paths.") + } + val partitionColumnNames = table.partitionColumnNames + val tablePath = new Path(table.location) + val partitionPath = ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec, + partitionColumnNames, tablePath) + + val fs = partitionPath.getFileSystem(hadoopConf) + if (fs.exists(partitionPath)) { + if (!fs.delete(partitionPath, true)) { + throw new RuntimeException( + "Cannot remove partition directory '" + partitionPath.toString) + } + } + } + } + externalCatalog.loadDynamicPartitions( db = table.database, table = table.identifier.table, @@ -230,18 +266,32 @@ case class InsertIntoHiveTable( var doHiveOverwrite = overwrite if (oldPart.isEmpty || !ifPartitionNotExists) { + // SPARK-29295: When insert overwrite to a Hive external table partition, if the + // partition does not exist, Hive will not check if the external partition directory + // exists or not before copying files. So if users drop the partition, and then do + // insert overwrite to the same partition, the partition will have both old and new + // data. We construct partition path. If the path exists, we delete it manually. + val partitionPath = if (oldPart.isEmpty && overwrite + && table.tableType == CatalogTableType.EXTERNAL) { + val partitionColumnNames = table.partitionColumnNames + val tablePath = new Path(table.location) + Some(ExternalCatalogUtils.generatePartitionPath(partitionSpec, + partitionColumnNames, tablePath)) + } else { + oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri))) + } + // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. - if (oldPart.nonEmpty && overwrite) { - oldPart.get.storage.locationUri.foreach { uri => - val partitionPath = new Path(uri) - val fs = partitionPath.getFileSystem(hadoopConf) - if (fs.exists(partitionPath)) { - if (!fs.delete(partitionPath, true)) { + if (partitionPath.nonEmpty && overwrite) { + partitionPath.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (fs.exists(path)) { + if (!fs.delete(path, true)) { throw new RuntimeException( - "Cannot remove partition directory '" + partitionPath.toString) + "Cannot remove partition directory '" + path.toString) } // Don't let Hive do overwrite operation since it is slower. doHiveOverwrite = false diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index cfa535eb1e5db..1fd4f015ec7b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2412,4 +2412,84 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("SPARK-29295: insert overwrite external partition should not have old data") { + Seq("true", "false").foreach { convertParquet => + withTable("test") { + withTempDir { f => + sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (name string) STORED AS " + + s"PARQUET LOCATION '${f.getAbsolutePath}'") + + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet) { + sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1") + sql("ALTER TABLE test DROP PARTITION(name='n1')") + sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2") + checkAnswer(sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id"), + Array(Row(2))) + } + } + } + } + } + + test("SPARK-29295: dynamic insert overwrite external partition should not have old data") { + Seq("true", "false").foreach { convertParquet => + withTable("test") { + withTempDir { f => + sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (p1 string, p2 string) " + + s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'") + + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet, + "hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql( + """ + |INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) + |SELECT * FROM VALUES (1, 'n2'), (2, 'n3') AS t(id, p2) + """.stripMargin) + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n2' ORDER BY id"), + Array(Row(1))) + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n3' ORDER BY id"), + Array(Row(2))) + + sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 4, 'n4'") + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n4' ORDER BY id"), + Array(Row(4))) + + sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='n2')") + sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='n3')") + + sql( + """ + |INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) + |SELECT * FROM VALUES (5, 'n2'), (6, 'n3') AS t(id, p2) + """.stripMargin) + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n2' ORDER BY id"), + Array(Row(5))) + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n3' ORDER BY id"), + Array(Row(6))) + // Partition not overwritten should not be deleted. + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n4' ORDER BY id"), + Array(Row(4))) + } + } + } + + withTable("test") { + withTempDir { f => + sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (p1 string, p2 string) " + + s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'") + + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet, + "hive.exec.dynamic.partition.mode" -> "nonstrict") { + // We should unescape partition value. + sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 1, '/'") + sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='/')") + sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 2, '/'") + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = '/' ORDER BY id"), + Array(Row(2))) + } + } + } + } + } }