Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, V1WritesUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.client.HiveClientImpl


Expand Down Expand Up @@ -207,34 +206,7 @@ case class InsertIntoHiveTable(
table.database,
table.identifier.table,
partitionSpec)

var doHiveOverwrite = overwrite

if (oldPart.isEmpty || !ifPartitionNotExists) {
// SPARK-29295: When insert overwrite to a Hive external table partition, if the
// partition does not exist, Hive will not check if the external partition directory
// exists or not before copying files. So if users drop the partition, and then do
// insert overwrite to the same partition, the partition will have both old and new
// data. We construct partition path. If the path exists, we delete it manually.
val partitionPath = if (oldPart.isEmpty && overwrite
&& table.tableType == CatalogTableType.EXTERNAL) {
val partitionColumnNames = table.partitionColumnNames
val tablePath = new Path(table.location)
Some(ExternalCatalogUtils.generatePartitionPath(partitionSpec,
partitionColumnNames, tablePath))
} else {
oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri)))
}

// SPARK-18107: Insert overwrite runs much slower than hive-client.
// Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
val hiveVersion = externalCatalog.asInstanceOf[ExternalCatalogWithListener]
.unwrapped.asInstanceOf[HiveExternalCatalog]
.client
.version

// inheritTableSpecs is set to true. It should be set to false for an IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
Expand All @@ -243,7 +215,7 @@ case class InsertIntoHiveTable(
table.identifier.table,
tmpLocation.toString,
partitionSpec,
isOverwrite = doHiveOverwrite,
isOverwrite = overwrite,
inheritTableSpecs = inheritTableSpecs,
isSrcLocal = false)
}
Expand Down