Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, ExternalCatalog}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -199,7 +199,7 @@ case class InsertIntoHiveTable(
attr.withName(name.toLowerCase(Locale.ROOT))
}

saveAsHiveFile(
val writtenParts = saveAsHiveFile(
sparkSession = sparkSession,
plan = child,
hadoopConf = hadoopConf,
Expand All @@ -209,6 +209,42 @@ case class InsertIntoHiveTable(

if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
if (overwrite && table.tableType == CatalogTableType.EXTERNAL) {
// SPARK-29295: When insert overwrite to a Hive external table partition, if the
// partition does not exist, Hive will not check if the external partition directory
// exists or not before copying files. So if users drop the partition, and then do
// insert overwrite to the same partition, the partition will have both old and new
// data. We construct partition path. If the path exists, we delete it manually.
writtenParts.foreach { partPath =>
val dpMap = partPath.split("/").map { part =>
val splitPart = part.split("=")
assert(splitPart.size == 2, s"Invalid written partition path: $part")
ExternalCatalogUtils.unescapePathName(splitPart(0)) ->
ExternalCatalogUtils.unescapePathName(splitPart(1))
}.toMap

val updatedPartitionSpec = partition.map {
case (key, Some(value)) => key -> value
case (key, None) if dpMap.contains(key) => key -> dpMap(key)
case (key, _) =>
throw new SparkException(s"Dynamic partition key $key is not among " +
"written partition paths.")
}
val partitionColumnNames = table.partitionColumnNames
val tablePath = new Path(table.location)
val partitionPath = ExternalCatalogUtils.generatePartitionPath(updatedPartitionSpec,
partitionColumnNames, tablePath)

val fs = partitionPath.getFileSystem(hadoopConf)
if (fs.exists(partitionPath)) {
if (!fs.delete(partitionPath, true)) {
throw new RuntimeException(
"Cannot remove partition directory '" + partitionPath.toString)
}
}
}
}

externalCatalog.loadDynamicPartitions(
db = table.database,
table = table.identifier.table,
Expand All @@ -230,18 +266,32 @@ case class InsertIntoHiveTable(
var doHiveOverwrite = overwrite

if (oldPart.isEmpty || !ifPartitionNotExists) {
// SPARK-29295: When insert overwrite to a Hive external table partition, if the
// partition does not exist, Hive will not check if the external partition directory
// exists or not before copying files. So if users drop the partition, and then do
// insert overwrite to the same partition, the partition will have both old and new
// data. We construct partition path. If the path exists, we delete it manually.
val partitionPath = if (oldPart.isEmpty && overwrite
&& table.tableType == CatalogTableType.EXTERNAL) {
val partitionColumnNames = table.partitionColumnNames
val tablePath = new Path(table.location)
Some(ExternalCatalogUtils.generatePartitionPath(partitionSpec,
partitionColumnNames, tablePath))
} else {
oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri)))
}

// SPARK-18107: Insert overwrite runs much slower than hive-client.
// Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
if (oldPart.nonEmpty && overwrite) {
oldPart.get.storage.locationUri.foreach { uri =>
val partitionPath = new Path(uri)
val fs = partitionPath.getFileSystem(hadoopConf)
if (fs.exists(partitionPath)) {
if (!fs.delete(partitionPath, true)) {
if (partitionPath.nonEmpty && overwrite) {
partitionPath.foreach { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
if (!fs.delete(path, true)) {
throw new RuntimeException(
"Cannot remove partition directory '" + partitionPath.toString)
"Cannot remove partition directory '" + path.toString)
}
// Don't let Hive do overwrite operation since it is slower.
doHiveOverwrite = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2412,4 +2412,84 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
}

test("SPARK-29295: insert overwrite external partition should not have old data") {
Seq("true", "false").foreach { convertParquet =>
withTable("test") {
withTempDir { f =>
sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (name string) STORED AS " +
s"PARQUET LOCATION '${f.getAbsolutePath}'")

withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet) {
sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 1")
sql("ALTER TABLE test DROP PARTITION(name='n1')")
sql("INSERT OVERWRITE TABLE test PARTITION(name='n1') SELECT 2")
checkAnswer(sql("SELECT id FROM test WHERE name = 'n1' ORDER BY id"),
Array(Row(2)))
}
}
}
}
}

test("SPARK-29295: dynamic insert overwrite external partition should not have old data") {
Seq("true", "false").foreach { convertParquet =>
withTable("test") {
withTempDir { f =>
sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (p1 string, p2 string) " +
s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'")

withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet,
"hive.exec.dynamic.partition.mode" -> "nonstrict") {
sql(
"""
|INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2)
|SELECT * FROM VALUES (1, 'n2'), (2, 'n3') AS t(id, p2)
""".stripMargin)
checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n2' ORDER BY id"),
Array(Row(1)))
checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n3' ORDER BY id"),
Array(Row(2)))

sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 4, 'n4'")
checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n4' ORDER BY id"),
Array(Row(4)))

sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='n2')")
sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='n3')")

sql(
"""
|INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2)
|SELECT * FROM VALUES (5, 'n2'), (6, 'n3') AS t(id, p2)
""".stripMargin)
checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n2' ORDER BY id"),
Array(Row(5)))
checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n3' ORDER BY id"),
Array(Row(6)))
// Partition not overwritten should not be deleted.
checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = 'n4' ORDER BY id"),
Array(Row(4)))
}
}
}

withTable("test") {
withTempDir { f =>
sql("CREATE EXTERNAL TABLE test(id int) PARTITIONED BY (p1 string, p2 string) " +
s"STORED AS PARQUET LOCATION '${f.getAbsolutePath}'")

withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertParquet,
"hive.exec.dynamic.partition.mode" -> "nonstrict") {
// We should unescape partition value.
sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 1, '/'")
sql("ALTER TABLE test DROP PARTITION(p1='n1',p2='/')")
sql("INSERT OVERWRITE TABLE test PARTITION(p1='n1', p2) SELECT 2, '/'")
checkAnswer(sql("SELECT id FROM test WHERE p1 = 'n1' and p2 = '/' ORDER BY id"),
Array(Row(2)))
}
}
}
}
}
}