Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object PartitioningUtils {
* path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
* }}}
*/
private[datasources] def parsePartitions(
private[spark] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String,
typeInference: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.{Date, Locale, Random}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.common.{FileUtils, HiveStatsUtils}
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
Expand All @@ -35,6 +35,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
Expand Down Expand Up @@ -257,12 +259,53 @@ case class InsertIntoHiveTable(
val holdDDLTime = false
if (partition.nonEmpty) {
if (numDynamicPartitions > 0) {
// SPARK-18107: Insert overwrite runs much slower than hive-client.
// Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
var doHiveOverwrite = overwrite
if (overwrite) {
val fs = outputPath.getFileSystem(hadoopConf)
// Extracts the partition paths from output path.
// E.g., if the dynamic partition columns are "a" and "b", we would get the paths like
// "/output/a=1/b=2", "/output/a=2/b=3".
val partitionPaths =
HiveStatsUtils.getFileStatusRecurse(outputPath, numDynamicPartitions, fs)
.map(_.getPath())
val partitionSpecInOutputPath =
PartitioningUtils.parsePartitions(
partitionPaths,
PartitioningUtils.DEFAULT_PARTITION_NAME,
true,
Set(outputPath))

val schema = partitionSpecInOutputPath.partitionColumns
val columnNames = schema.fieldNames

partitionSpecInOutputPath.partitions.flatMap { partition =>
// Construct partition spec from parsed dynamic partition column names and values.
val spec = columnNames.zip(partition.values.toSeq(schema).map(_.toString)).toMap
// Using static partition spec and dynamic partition spec to get partition metadata.
externalCatalog.getPartitionOption(
table.catalogTable.database,
table.catalogTable.identifier.table,
partitionSpec ++ spec)
}.foreach { part =>
part.storage.locationUri.map { uri =>
if (removePartitionPath(new Path(uri), hadoopConf)) {
// Don't let Hive do overwrite operation since it is slower.
doHiveOverwrite = false
}
}
}
}

externalCatalog.loadDynamicPartitions(
db = table.catalogTable.database,
table = table.catalogTable.identifier.table,
outputPath.toString,
partitionSpec,
overwrite,
doHiveOverwrite,
numDynamicPartitions,
holdDDLTime = holdDDLTime)
} else {
Expand All @@ -284,14 +327,8 @@ case class InsertIntoHiveTable(
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
if (oldPart.nonEmpty && overwrite) {
oldPart.get.storage.locationUri.foreach { uri =>
val partitionPath = new Path(uri)
val fs = partitionPath.getFileSystem(hadoopConf)
if (fs.exists(partitionPath)) {
if (!fs.delete(partitionPath, true)) {
throw new RuntimeException(
"Cannot remove partition directory '" + partitionPath.toString)
}
oldPart.get.storage.locationUri.map { uri =>
if (removePartitionPath(new Path(uri), hadoopConf)) {
// Don't let Hive do overwrite operation since it is slower.
doHiveOverwrite = false
}
Expand Down Expand Up @@ -331,6 +368,22 @@ case class InsertIntoHiveTable(
Seq.empty[InternalRow]
}

// Deletes a partition path. Returns true if the path exists and is successfully deleted.
// Returns false if the path doesn't exist. Throws RuntimeException if error happens when
// deleting the path.
private def removePartitionPath(partitionPath: Path, hadoopConf: Configuration): Boolean = {
val fs = partitionPath.getFileSystem(hadoopConf)
if (fs.exists(partitionPath)) {
if (!fs.delete(partitionPath, true)) {
throw new RuntimeException(
"Cannot remove partition directory '" + partitionPath.toString)
}
true
} else {
false
}
}

override def outputPartitioning: Partitioning = child.outputPartitioning

override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,42 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("Insert overwrite with partition: dynamic partition") {
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
withTable("tableWithPartition") {
sql(
"""
|CREATE TABLE tableWithPartition (key int, value STRING)
|PARTITIONED BY (part1 STRING, part2 INT, part3 LONG)
""".stripMargin)
sql(
"""
|INSERT OVERWRITE TABLE tableWithPartition PARTITION (part1 = '1', part2 = 2, part3)
|SELECT key, value, 3 AS part3 FROM default.src
""".stripMargin)
checkAnswer(
sql("SELECT part1, part2, part3, key, value FROM tableWithPartition"),
sql("SELECT '1' AS part1, 2 AS part2, 3 AS part3, key, value FROM default.src")
)

sql(
"""
|INSERT OVERWRITE TABLE tableWithPartition PARTITION (part1 = '1', part2 = 2, part3)
|SELECT key, value, part3 FROM
|VALUES (1, "one", 3), (2, "two", 3), (3, null, 3) AS data(key, value, part3)
""".stripMargin)
checkAnswer(
sql("SELECT part1, part2, part3, key, value FROM tableWithPartition"),
sql(
"""
|SELECT '1' AS part1, 2 AS part2, 3 AS part3, key, value FROM VALUES
|(1, "one"), (2, "two"), (3, null) AS data(key, value)
""".stripMargin)
)
}
}
}

def testCommandAvailable(command: String): Boolean = {
val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
attempt.isSuccess && attempt.get == 0
Expand Down