diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index b51b41869bf0..73ca1376bbe6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -88,7 +88,7 @@ object PartitioningUtils { * path = "hdfs://:/path/to/partition/a=2/b=world/c=6.28"))) * }}} */ - private[datasources] def parsePartitions( + private[spark] def parsePartitions( paths: Seq[Path], defaultPartitionName: String, typeInference: Boolean, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index e333fc7febc2..f051c21f6828 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -24,7 +24,7 @@ import java.util.{Date, Locale, Random} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.common.{FileUtils, HiveStatsUtils} import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} @@ -35,6 +35,8 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand} +import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException @@ -257,12 +259,53 @@ case class InsertIntoHiveTable( val holdDDLTime = false if (partition.nonEmpty) { if (numDynamicPartitions > 0) { + // SPARK-18107: Insert overwrite runs much slower than hive-client. + // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive + // version and we may not want to catch up new Hive version every time. We delete the + // Hive partition first and then load data file into the Hive partition. + var doHiveOverwrite = overwrite + if (overwrite) { + val fs = outputPath.getFileSystem(hadoopConf) + // Extracts the partition paths from output path. + // E.g., if the dynamic partition columns are "a" and "b", we would get the paths like + // "/output/a=1/b=2", "/output/a=2/b=3". + val partitionPaths = + HiveStatsUtils.getFileStatusRecurse(outputPath, numDynamicPartitions, fs) + .map(_.getPath()) + val partitionSpecInOutputPath = + PartitioningUtils.parsePartitions( + partitionPaths, + PartitioningUtils.DEFAULT_PARTITION_NAME, + true, + Set(outputPath)) + + val schema = partitionSpecInOutputPath.partitionColumns + val columnNames = schema.fieldNames + + partitionSpecInOutputPath.partitions.flatMap { partition => + // Construct partition spec from parsed dynamic partition column names and values. + val spec = columnNames.zip(partition.values.toSeq(schema).map(_.toString)).toMap + // Using static partition spec and dynamic partition spec to get partition metadata. + externalCatalog.getPartitionOption( + table.catalogTable.database, + table.catalogTable.identifier.table, + partitionSpec ++ spec) + }.foreach { part => + part.storage.locationUri.map { uri => + if (removePartitionPath(new Path(uri), hadoopConf)) { + // Don't let Hive do overwrite operation since it is slower. + doHiveOverwrite = false + } + } + } + } + externalCatalog.loadDynamicPartitions( db = table.catalogTable.database, table = table.catalogTable.identifier.table, outputPath.toString, partitionSpec, - overwrite, + doHiveOverwrite, numDynamicPartitions, holdDDLTime = holdDDLTime) } else { @@ -284,14 +327,8 @@ case class InsertIntoHiveTable( // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. if (oldPart.nonEmpty && overwrite) { - oldPart.get.storage.locationUri.foreach { uri => - val partitionPath = new Path(uri) - val fs = partitionPath.getFileSystem(hadoopConf) - if (fs.exists(partitionPath)) { - if (!fs.delete(partitionPath, true)) { - throw new RuntimeException( - "Cannot remove partition directory '" + partitionPath.toString) - } + oldPart.get.storage.locationUri.map { uri => + if (removePartitionPath(new Path(uri), hadoopConf)) { // Don't let Hive do overwrite operation since it is slower. doHiveOverwrite = false } @@ -331,6 +368,22 @@ case class InsertIntoHiveTable( Seq.empty[InternalRow] } + // Deletes a partition path. Returns true if the path exists and is successfully deleted. + // Returns false if the path doesn't exist. Throws RuntimeException if error happens when + // deleting the path. + private def removePartitionPath(partitionPath: Path, hadoopConf: Configuration): Boolean = { + val fs = partitionPath.getFileSystem(hadoopConf) + if (fs.exists(partitionPath)) { + if (!fs.delete(partitionPath, true)) { + throw new RuntimeException( + "Cannot remove partition directory '" + partitionPath.toString) + } + true + } else { + false + } + } + override def outputPartitioning: Partitioning = child.outputPartitioning override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ad70835d06d9..100257a683b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -2010,6 +2010,42 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("Insert overwrite with partition: dynamic partition") { + withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { + withTable("tableWithPartition") { + sql( + """ + |CREATE TABLE tableWithPartition (key int, value STRING) + |PARTITIONED BY (part1 STRING, part2 INT, part3 LONG) + """.stripMargin) + sql( + """ + |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part1 = '1', part2 = 2, part3) + |SELECT key, value, 3 AS part3 FROM default.src + """.stripMargin) + checkAnswer( + sql("SELECT part1, part2, part3, key, value FROM tableWithPartition"), + sql("SELECT '1' AS part1, 2 AS part2, 3 AS part3, key, value FROM default.src") + ) + + sql( + """ + |INSERT OVERWRITE TABLE tableWithPartition PARTITION (part1 = '1', part2 = 2, part3) + |SELECT key, value, part3 FROM + |VALUES (1, "one", 3), (2, "two", 3), (3, null, 3) AS data(key, value, part3) + """.stripMargin) + checkAnswer( + sql("SELECT part1, part2, part3, key, value FROM tableWithPartition"), + sql( + """ + |SELECT '1' AS part1, 2 AS part2, 3 AS part3, key, value FROM VALUES + |(1, "one"), (2, "two"), (3, null) AS data(key, value) + """.stripMargin) + ) + } + } + } + def testCommandAvailable(command: String): Boolean = { val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue()) attempt.isSuccess && attempt.get == 0