From 5e5c6ef66952d5b17a4b58efeaa19060bb410726 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 30 Sep 2019 08:54:42 -0700 Subject: [PATCH 1/8] Insert overwrite external table partition should not have old data. --- .../hive/execution/InsertIntoHiveTable.scala | 24 +++++++++++++++--- .../sql/hive/execution/SQLQuerySuite.scala | 25 +++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ee1734b1f232c..f316e20f5b59c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -26,11 +26,11 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, CommandUtils} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl @@ -230,12 +230,28 @@ case class InsertIntoHiveTable( var doHiveOverwrite = overwrite if (oldPart.isEmpty || !ifPartitionNotExists) { + // SPARK-29295: When insert overwrite to a Hive external table partition, if the + // partition does not exist, Hive will not check if the external partition directory + // exists or not before copying files. So if users drop the partition, and then do + // insert overwrite to the same partition, the partition will have both old and new + // data. + val updatedPart = if (overwrite && table.tableType == CatalogTableType.EXTERNAL) { + AlterTableAddPartitionCommand( + table.identifier, Seq((partitionSpec, None)), ifNotExists = true).run(sparkSession) + externalCatalog.getPartitionOption( + table.database, + table.identifier.table, + partitionSpec) + } else { + oldPart + } + // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. - if (oldPart.nonEmpty && overwrite) { - oldPart.get.storage.locationUri.foreach { uri => + if (updatedPart.nonEmpty && overwrite) { + updatedPart.get.storage.locationUri.foreach { uri => val partitionPath = new Path(uri) val fs = partitionPath.getFileSystem(hadoopConf) if (fs.exists(partitionPath)) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index cfa535eb1e5db..3994d439528ab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2412,4 +2412,29 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + test("SPARK-29295: insert overwrite external partition should not have old data") { + withTable("test") { + withTempDir { f => + sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (name string) STORED AS " + + s"PARQUET LOCATION '${f.getAbsolutePath}'") + + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { + sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1") + sql("ALTER TABLE test DROP PARTITION(name='n1')") + sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2") + checkAnswer( sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id"), + Array(Row(2))) + } + + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") { + sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1") + sql("ALTER TABLE test DROP PARTITION(name='n1')") + sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2") + checkAnswer( sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id"), + Array(Row(2))) + } + } + } + } } From 4df8a791ccb10d1f3a0c2c7920abad90450aed2c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 7 Oct 2019 22:43:53 -0700 Subject: [PATCH 2/8] Fix dynamic partition. --- .../hive/execution/InsertIntoHiveTable.scala | 27 ++++++++++- .../sql/hive/execution/SQLQuerySuite.scala | 48 ++++++++++++------- 2 files changed, 57 insertions(+), 18 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index f316e20f5b59c..3bcefe8f1e88e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -199,7 +199,7 @@ case class InsertIntoHiveTable( attr.withName(name.toLowerCase(Locale.ROOT)) } - saveAsHiveFile( + val writtenParts = saveAsHiveFile( sparkSession = sparkSession, plan = child, hadoopConf = hadoopConf, @@ -209,6 +209,31 @@ case class InsertIntoHiveTable( if (partition.nonEmpty) { if (numDynamicPartitions > 0) { + if (overwrite && table.tableType == CatalogTableType.EXTERNAL) { + // SPARK-29295: When insert overwrite to a Hive external table partition, if the + // partition does not exist, Hive will not check if the external partition directory + // exists or not before copying files. So if users drop the partition, and then do + // insert overwrite to the same partition, the partition will have both old and new + // data. + val dpMap = writtenParts.map { part => + val splitPart = part.split("=") + assert(splitPart.size == 2, s"Invalid written partition path: $part") + splitPart(0) -> splitPart(1) + }.toMap + + val updatedPartitionSpec = partition.map { + case (key, Some(value)) => key -> value + case (key, None) if dpMap.contains(key) => key -> dpMap(key) + case (key, _) => + throw new SparkException(s"Dynamic partition key $key is not among " + + "written partition paths.") + } + + AlterTableAddPartitionCommand( + table.identifier, Seq((updatedPartitionSpec, None)), ifNotExists = true) + .run(sparkSession) + } + externalCatalog.loadDynamicPartitions( db = table.database, table = table.identifier.table, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3994d439528ab..28572dc08e727 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2414,25 +2414,39 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } test("SPARK-29295: insert overwrite external partition should not have old data") { - withTable("test") { - withTempDir { f => - sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (name string) STORED AS " + - s"PARQUET LOCATION '${f.getAbsolutePath}'") - - withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") { - sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1") - sql("ALTER TABLE test DROP PARTITION(name='n1')") - sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2") - checkAnswer( sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id"), - Array(Row(2))) + Seq("true", "false").foreach { convertParquet => + withTable("test") { + withTempDir { f => + sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (name string) STORED AS " + + s"PARQUET LOCATION '${f.getAbsolutePath}'") + + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet) { + sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1") + sql("ALTER TABLE test DROP PARTITION(name='n1')") + sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2") + checkAnswer(sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id"), + Array(Row(2))) + } } + } + } + } + + test("SPARK-29295: dynamic insert overwrite external partition should not have old data") { + Seq("true", "false").foreach { convertParquet => + withTable("test") { + withTempDir { f => + sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (p1 string, p2 string) " + + s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'") - withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") { - sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1") - sql("ALTER TABLE test DROP PARTITION(name='n1')") - sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2") - checkAnswer( sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id"), - Array(Row(2))) + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet, + "hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 1, 'n2'") + sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='n2')") + sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 2, 'n2'") + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n2' ORDER BY id"), + Array(Row(2))) + } } } } From 1618e56614192db8aa7f0585ca1b26f1956b3cd3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Oct 2019 08:40:11 -0700 Subject: [PATCH 3/8] Address comment. --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3bcefe8f1e88e..037f7ef48d66c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -260,7 +260,8 @@ case class InsertIntoHiveTable( // exists or not before copying files. So if users drop the partition, and then do // insert overwrite to the same partition, the partition will have both old and new // data. - val updatedPart = if (overwrite && table.tableType == CatalogTableType.EXTERNAL) { + val updatedPart = if (oldPart.isEmpty && overwrite + && table.tableType == CatalogTableType.EXTERNAL) { AlterTableAddPartitionCommand( table.identifier, Seq((partitionSpec, None)), ifNotExists = true).run(sparkSession) externalCatalog.getPartitionOption( From 78beb8aeb3759af5721804fa6936d8328a5279a2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Oct 2019 10:56:42 -0700 Subject: [PATCH 4/8] Fix partition directory parsing. --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 037f7ef48d66c..42b6f5902eb9b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -215,7 +215,7 @@ case class InsertIntoHiveTable( // exists or not before copying files. So if users drop the partition, and then do // insert overwrite to the same partition, the partition will have both old and new // data. - val dpMap = writtenParts.map { part => + val dpMap = writtenParts.flatMap(_.split("/")).map { part => val splitPart = part.split("=") assert(splitPart.size == 2, s"Invalid written partition path: $part") splitPart(0) -> splitPart(1) From 965dc4ae80016a2e6054181e70516f453d207fd2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Oct 2019 11:34:22 -0700 Subject: [PATCH 5/8] Add test for escaped partition path. --- .../sql/hive/execution/InsertIntoHiveTable.scala | 5 +++-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 42b6f5902eb9b..4cf032ec4d831 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan @@ -218,7 +218,8 @@ case class InsertIntoHiveTable( val dpMap = writtenParts.flatMap(_.split("/")).map { part => val splitPart = part.split("=") assert(splitPart.size == 2, s"Invalid written partition path: $part") - splitPart(0) -> splitPart(1) + ExternalCatalogUtils.unescapePathName(splitPart(0)) -> + ExternalCatalogUtils.unescapePathName(splitPart(1)) }.toMap val updatedPartitionSpec = partition.map { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 28572dc08e727..20105eda994dd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2449,6 +2449,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } } + + withTable("test") { + withTempDir { f => + sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (p1 string, p2 string) " + + s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'") + + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet, + "hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 1, '/'") + sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='/')") + sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 2, '/'") + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = '/' ORDER BY id"), + Array(Row(2))) + } + } + } } } } From d15576fb0dc802adb597798ffd0980bcbb09a673 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 16 Oct 2019 23:45:29 -0700 Subject: [PATCH 6/8] Construct partition directory. --- .../hive/execution/InsertIntoHiveTable.scala | 47 ++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 4cf032ec4d831..03c4a2a9856d7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -214,7 +214,7 @@ case class InsertIntoHiveTable( // partition does not exist, Hive will not check if the external partition directory // exists or not before copying files. So if users drop the partition, and then do // insert overwrite to the same partition, the partition will have both old and new - // data. + // data. We construct partition path. If the path exists, we delete it manually. val dpMap = writtenParts.flatMap(_.split("/")).map { part => val splitPart = part.split("=") assert(splitPart.size == 2, s"Invalid written partition path: $part") @@ -229,10 +229,18 @@ case class InsertIntoHiveTable( throw new SparkException(s"Dynamic partition key $key is not among " + "written partition paths.") } - - AlterTableAddPartitionCommand( - table.identifier, Seq((updatedPartitionSpec, None)), ifNotExists = true) - .run(sparkSession) + val partitionColumnNames = table.partitionColumnNames + val tablePath = new Path(table.location) + val partitionPath = ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec, + partitionColumnNames, tablePath) + + val fs = partitionPath.getFileSystem(hadoopConf) + if (fs.exists(partitionPath)) { + if (!fs.delete(partitionPath, true)) { + throw new RuntimeException( + "Cannot remove partition directory '" + partitionPath.toString) + } + } } externalCatalog.loadDynamicPartitions( @@ -260,31 +268,28 @@ case class InsertIntoHiveTable( // partition does not exist, Hive will not check if the external partition directory // exists or not before copying files. So if users drop the partition, and then do // insert overwrite to the same partition, the partition will have both old and new - // data. - val updatedPart = if (oldPart.isEmpty && overwrite + // data. We construct partition path. If the path exists, we delete it manually. + val partitionPath = if (oldPart.isEmpty && overwrite && table.tableType == CatalogTableType.EXTERNAL) { - AlterTableAddPartitionCommand( - table.identifier, Seq((partitionSpec, None)), ifNotExists = true).run(sparkSession) - externalCatalog.getPartitionOption( - table.database, - table.identifier.table, - partitionSpec) + val partitionColumnNames = table.partitionColumnNames + val tablePath = new Path(table.location) + Some(ExternalCatalogUtils.generatePartitionPath(partitionSpec, + partitionColumnNames, tablePath)) } else { - oldPart + oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri))) } // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. - if (updatedPart.nonEmpty && overwrite) { - updatedPart.get.storage.locationUri.foreach { uri => - val partitionPath = new Path(uri) - val fs = partitionPath.getFileSystem(hadoopConf) - if (fs.exists(partitionPath)) { - if (!fs.delete(partitionPath, true)) { + if (partitionPath.nonEmpty && overwrite) { + partitionPath.foreach { path => + val fs = path.getFileSystem(hadoopConf) + if (fs.exists(path)) { + if (!fs.delete(path, true)) { throw new RuntimeException( - "Cannot remove partition directory '" + partitionPath.toString) + "Cannot remove partition directory '" + path.toString) } // Don't let Hive do overwrite operation since it is slower. doHiveOverwrite = false From e395facdaa16f661e76993077aca1efe8c927500 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 16 Oct 2019 23:47:37 -0700 Subject: [PATCH 7/8] remove unused import. --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 03c4a2a9856d7..238c1b191d387 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, Ex import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, CommandUtils} +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl From 8337870e62c9550235ebc4357a4711fe1197812a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Oct 2019 00:33:00 -0700 Subject: [PATCH 8/8] Dynamic insert overwrite will update multiple partitions. --- .../hive/execution/InsertIntoHiveTable.scala | 50 ++++++++++--------- .../sql/hive/execution/SQLQuerySuite.scala | 31 ++++++++++-- 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 238c1b191d387..801be64702519 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -215,30 +215,32 @@ case class InsertIntoHiveTable( // exists or not before copying files. So if users drop the partition, and then do // insert overwrite to the same partition, the partition will have both old and new // data. We construct partition path. If the path exists, we delete it manually. - val dpMap = writtenParts.flatMap(_.split("/")).map { part => - val splitPart = part.split("=") - assert(splitPart.size == 2, s"Invalid written partition path: $part") - ExternalCatalogUtils.unescapePathName(splitPart(0)) -> - ExternalCatalogUtils.unescapePathName(splitPart(1)) - }.toMap - - val updatedPartitionSpec = partition.map { - case (key, Some(value)) => key -> value - case (key, None) if dpMap.contains(key) => key -> dpMap(key) - case (key, _) => - throw new SparkException(s"Dynamic partition key $key is not among " + - "written partition paths.") - } - val partitionColumnNames = table.partitionColumnNames - val tablePath = new Path(table.location) - val partitionPath = ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec, - partitionColumnNames, tablePath) - - val fs = partitionPath.getFileSystem(hadoopConf) - if (fs.exists(partitionPath)) { - if (!fs.delete(partitionPath, true)) { - throw new RuntimeException( - "Cannot remove partition directory '" + partitionPath.toString) + writtenParts.foreach { partPath => + val dpMap = partPath.split("/").map { part => + val splitPart = part.split("=") + assert(splitPart.size == 2, s"Invalid written partition path: $part") + ExternalCatalogUtils.unescapePathName(splitPart(0)) -> + ExternalCatalogUtils.unescapePathName(splitPart(1)) + }.toMap + + val updatedPartitionSpec = partition.map { + case (key, Some(value)) => key -> value + case (key, None) if dpMap.contains(key) => key -> dpMap(key) + case (key, _) => + throw new SparkException(s"Dynamic partition key $key is not among " + + "written partition paths.") + } + val partitionColumnNames = table.partitionColumnNames + val tablePath = new Path(table.location) + val partitionPath = ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec, + partitionColumnNames, tablePath) + + val fs = partitionPath.getFileSystem(hadoopConf) + if (fs.exists(partitionPath)) { + if (!fs.delete(partitionPath, true)) { + throw new RuntimeException( + "Cannot remove partition directory '" + partitionPath.toString) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 20105eda994dd..1fd4f015ec7b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2441,11 +2441,35 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet, "hive.exec.dynamic.partition.mode" -> "nonstrict") { - sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 1, 'n2'") - sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='n2')") - sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 2, 'n2'") + sql( + """ + |INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) + |SELECT * FROM VALUES (1, 'n2'), (2, 'n3') AS t(id, p2) + """.stripMargin) checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n2' ORDER BY id"), + Array(Row(1))) + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n3' ORDER BY id"), Array(Row(2))) + + sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 4, 'n4'") + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n4' ORDER BY id"), + Array(Row(4))) + + sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='n2')") + sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='n3')") + + sql( + """ + |INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) + |SELECT * FROM VALUES (5, 'n2'), (6, 'n3') AS t(id, p2) + """.stripMargin) + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n2' ORDER BY id"), + Array(Row(5))) + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n3' ORDER BY id"), + Array(Row(6))) + // Partition not overwritten should not be deleted. + checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n4' ORDER BY id"), + Array(Row(4))) } } } @@ -2457,6 +2481,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet, "hive.exec.dynamic.partition.mode" -> "nonstrict") { + // We should unescape partition value. sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 1, '/'") sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='/')") sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 2, '/'")