diff --git a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala index 55e34b32fe0d..e97b9d5d6bea 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala @@ -46,7 +46,8 @@ abstract class BenchmarkBase { if (regenerateBenchmarkFiles) { val version = System.getProperty("java.version").split("\\D+")(0).toInt val jdkString = if (version > 8) s"-jdk$version" else "" - val resultFileName = s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt" + val resultFileName = + s"${this.getClass.getSimpleName.replace("$", "")}$jdkString$suffix-results.txt" val file = new File(s"benchmarks/$resultFileName") if (!file.exists()) { file.createNewFile() @@ -65,6 +66,8 @@ abstract class BenchmarkBase { afterAll() } + def suffix: String = "" + /** * Any shutdown code to ensure a clean shutdown */ diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt new file mode 100644 index 000000000000..85884a1aaf73 --- /dev/null +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt @@ -0,0 +1,11 @@ +Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4 +Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +INSERT INTO DYNAMIC 6812 7043 328 0.0 665204.8 1.0X +INSERT INTO HYBRID 817 852 32 0.0 79783.6 8.3X +INSERT INTO STATIC 231 246 21 0.0 22568.2 29.5X +INSERT OVERWRITE DYNAMIC 25947 26671 1024 0.0 2533910.2 0.3X +INSERT OVERWRITE HYBRID 2846 2884 54 0.0 277908.7 2.4X +INSERT OVERWRITE STATIC 232 247 26 0.0 22659.9 29.4X + diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt new file mode 100644 index 000000000000..ea8e6057ea61 --- /dev/null +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt @@ -0,0 +1,11 @@ +Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4 +Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +INSERT INTO DYNAMIC 4326 4373 66 0.0 422486.0 1.0X +INSERT INTO HYBRID 726 741 21 0.0 70877.2 6.0X +INSERT INTO STATIC 256 270 12 0.0 25015.7 16.9X +INSERT OVERWRITE DYNAMIC 4115 4150 49 0.0 401828.8 1.1X +INSERT OVERWRITE HYBRID 690 699 8 0.0 67370.5 6.3X +INSERT OVERWRITE STATIC 277 283 5 0.0 27097.9 15.6X + diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt new file mode 100644 index 000000000000..c7a642aad527 --- /dev/null +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt @@ -0,0 +1,11 @@ +Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4 +Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +INSERT INTO DYNAMIC 5083 5412 466 0.0 496384.5 1.0X +INSERT INTO HYBRID 822 864 43 0.0 80283.6 6.2X +INSERT INTO STATIC 335 342 5 0.0 32694.1 15.2X +INSERT OVERWRITE DYNAMIC 4941 5068 179 0.0 482534.5 1.0X +INSERT OVERWRITE HYBRID 722 745 27 0.0 70502.7 7.0X +INSERT OVERWRITE STATIC 295 314 12 0.0 28846.8 17.2X + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index 8526d8645460..27ba3eca8194 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -22,7 +22,20 @@ package object client { private[hive] sealed abstract class HiveVersion( val fullVersion: String, val extraDeps: Seq[String] = Nil, - val exclusions: Seq[String] = Nil) + val exclusions: Seq[String] = Nil) extends Ordered[HiveVersion] { + override def compare(that: HiveVersion): Int = { + val thisVersionParts = fullVersion.split('.').map(_.toInt) + val thatVersionParts = that.fullVersion.split('.').map(_.toInt) + assert(thisVersionParts.length == thatVersionParts.length) + thisVersionParts.zip(thatVersionParts).foreach { case (l, r) => + val candidate = l - r + if (candidate != 0) { + return candidate + } + } + 0 + } + } // scalastyle:off private[hive] object hive { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 19f439598142..9f83f2ab9609 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -26,13 +26,15 @@ import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.client.hive._ /** @@ -285,7 +287,21 @@ case class InsertIntoHiveTable( // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. - if (partitionPath.nonEmpty && overwrite) { + val hiveVersion = externalCatalog.asInstanceOf[ExternalCatalogWithListener] + .unwrapped.asInstanceOf[HiveExternalCatalog] + .client + .version + // SPARK-31684: + // For Hive 2.0.0 and onwards, as https://issues.apache.org/jira/browse/HIVE-11940 + // has been fixed, and there is no performance issue anymore. We should leave the + // overwrite logic to hive to avoid failure in `FileSystem#checkPath` when the table + // and partition locations do not belong to the same `FileSystem` + // TODO(SPARK-31675): For Hive 2.2.0 and earlier, if the table and partition locations + // do not belong together, we will still get the same error thrown by hive encryption + // check. see https://issues.apache.org/jira/browse/HIVE-14380. + // So we still disable for Hive overwrite for Hive 1.x for better performance because + // the partition and table are on the same cluster in most cases. + if (partitionPath.nonEmpty && overwrite && hiveVersion < v2_0) { partitionPath.foreach { path => val fs = path.getFileSystem(hadoopConf) if (fs.exists(path)) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala new file mode 100644 index 000000000000..81eb5e2591f1 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.test.TestHive + +/** + * Benchmark to measure hive table write performance. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * --jars ,, + * --packages org.spark-project.hive:hive-exec:1.2.1.spark2 + * + * 2. build/sbt "hive/test:runMain " -Phive-1.2 or + * build/sbt "hive/test:runMain " -Phive-2.3 + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain " + * Results will be written to "benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt". + * 4. -Phive-1.2 does not work for JDK 11 + * }}} + */ +object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { + + override def getSparkSession: SparkSession = TestHive.sparkSession + + val tempView = "temp" + val numRows = 1024 * 10 + val sql = spark.sql _ + + // scalastyle:off hadoopconfiguration + private val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + hadoopConf.set("hive.exec.dynamic.partition", "true") + hadoopConf.set("hive.exec.dynamic.partition.mode", "nonstrict") + hadoopConf.set("hive.exec.max.dynamic.partitions", numRows.toString) + + def withTable(tableNames: String*)(f: => Unit): Unit = { + tableNames.foreach { name => + sql(s"CREATE TABLE $name(a INT) STORED AS TEXTFILE PARTITIONED BY (b INT, c INT)") + } + try f finally { + tableNames.foreach { name => + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + + def insertOverwriteDynamic(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT OVERWRITE DYNAMIC") { _ => + sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS a," + + s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView") + } + } + + def insertOverwriteHybrid(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT OVERWRITE HYBRID") { _ => + sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," + + s" CAST(id % 10 AS INT) AS c FROM $tempView") + } + } + + def insertOverwriteStatic(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT OVERWRITE STATIC") { _ => + sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" + + s" FROM $tempView") + } + } + + def insertIntoDynamic(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT INTO DYNAMIC") { _ => + sql(s"INSERT INTO TABLE $table SELECT CAST(id AS INT) AS a," + + s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView") + } + } + + def insertIntoHybrid(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT INTO HYBRID") { _ => + sql(s"INSERT INTO TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," + + s" CAST(id % 10 AS INT) AS c FROM $tempView") + } + } + + def insertIntoStatic(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT INTO STATIC") { _ => + sql(s"INSERT INTO TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" + + s" FROM $tempView") + } + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + spark.range(numRows).createOrReplaceTempView(tempView) + + try { + val t1 = "t1" + val t2 = "t2" + val t3 = "t3" + val t4 = "t4" + val t5 = "t5" + val t6 = "t6" + + val benchmark = new Benchmark(s"insert hive table benchmark", numRows, output = output) + + withTable(t1, t2, t3, t4, t5, t6) { + + insertIntoDynamic(t1, benchmark) + insertIntoHybrid(t2, benchmark) + insertIntoStatic(t3, benchmark) + + insertOverwriteDynamic(t4, benchmark) + insertOverwriteHybrid(t5, benchmark) + insertOverwriteStatic(t6, benchmark) + + benchmark.run() + } + } finally { + spark.catalog.dropTempView(tempView) + } + } + + override def suffix: String = if (HiveUtils.isHive23) "-hive2.3" else "-hive1.2" +}