Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ abstract class BenchmarkBase {
if (regenerateBenchmarkFiles) {
val version = System.getProperty("java.version").split("\\D+")(0).toInt
val jdkString = if (version > 8) s"-jdk$version" else ""
val resultFileName = s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt"
val resultFileName =
s"${this.getClass.getSimpleName.replace("$", "")}$jdkString$suffix-results.txt"
val file = new File(s"benchmarks/$resultFileName")
if (!file.exists()) {
file.createNewFile()
Expand All @@ -65,6 +66,8 @@ abstract class BenchmarkBase {
afterAll()
}

def suffix: String = ""

/**
* Any shutdown code to ensure a clean shutdown
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
INSERT INTO DYNAMIC 6812 7043 328 0.0 665204.8 1.0X
INSERT INTO HYBRID 817 852 32 0.0 79783.6 8.3X
INSERT INTO STATIC 231 246 21 0.0 22568.2 29.5X
INSERT OVERWRITE DYNAMIC 25947 26671 1024 0.0 2533910.2 0.3X
INSERT OVERWRITE HYBRID 2846 2884 54 0.0 277908.7 2.4X
INSERT OVERWRITE STATIC 232 247 26 0.0 22659.9 29.4X

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
INSERT INTO DYNAMIC 4326 4373 66 0.0 422486.0 1.0X
INSERT INTO HYBRID 726 741 21 0.0 70877.2 6.0X
INSERT INTO STATIC 256 270 12 0.0 25015.7 16.9X
INSERT OVERWRITE DYNAMIC 4115 4150 49 0.0 401828.8 1.1X
INSERT OVERWRITE HYBRID 690 699 8 0.0 67370.5 6.3X
INSERT OVERWRITE STATIC 277 283 5 0.0 27097.9 15.6X

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
INSERT INTO DYNAMIC 5083 5412 466 0.0 496384.5 1.0X
INSERT INTO HYBRID 822 864 43 0.0 80283.6 6.2X
INSERT INTO STATIC 335 342 5 0.0 32694.1 15.2X
INSERT OVERWRITE DYNAMIC 4941 5068 179 0.0 482534.5 1.0X
INSERT OVERWRITE HYBRID 722 745 27 0.0 70502.7 7.0X
INSERT OVERWRITE STATIC 295 314 12 0.0 28846.8 17.2X

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,20 @@ package object client {
private[hive] sealed abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil)
val exclusions: Seq[String] = Nil) extends Ordered[HiveVersion] {
override def compare(that: HiveVersion): Int = {
val thisVersionParts = fullVersion.split('.').map(_.toInt)
val thatVersionParts = that.fullVersion.split('.').map(_.toInt)
assert(thisVersionParts.length == thatVersionParts.length)
thisVersionParts.zip(thatVersionParts).foreach { case (l, r) =>
val candidate = l - r
if (candidate != 0) {
return candidate
}
}
0
}
}

// scalastyle:off
private[hive] object hive {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.client.hive._


/**
Expand Down Expand Up @@ -285,7 +287,21 @@ case class InsertIntoHiveTable(
// Newer Hive largely improves insert overwrite performance. As Spark uses older Hive
// version and we may not want to catch up new Hive version every time. We delete the
// Hive partition first and then load data file into the Hive partition.
if (partitionPath.nonEmpty && overwrite) {
val hiveVersion = externalCatalog.asInstanceOf[ExternalCatalogWithListener]
.unwrapped.asInstanceOf[HiveExternalCatalog]
.client
.version
// SPARK-31684:
// For Hive 2.0.0 and onwards, as https://issues.apache.org/jira/browse/HIVE-11940
// has been fixed, and there is no performance issue anymore. We should leave the
// overwrite logic to hive to avoid failure in `FileSystem#checkPath` when the table
// and partition locations do not belong to the same `FileSystem`
// TODO(SPARK-31675): For Hive 2.2.0 and earlier, if the table and partition locations
// do not belong together, we will still get the same error thrown by hive encryption
// check. see https://issues.apache.org/jira/browse/HIVE-14380.
// So we still disable for Hive overwrite for Hive 1.x for better performance because
// the partition and table are on the same cluster in most cases.
if (partitionPath.nonEmpty && overwrite && hiveVersion < v2_0) {
partitionPath.foreach { path =>
val fs = path.getFileSystem(hadoopConf)
if (fs.exists(path)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHive

/**
* Benchmark to measure hive table write performance.
* To run this benchmark:
* {{{
* 1. without sbt: bin/spark-submit --class <this class>
* --jars <spark catalyst test jar>,<spark core test jar>,<spark hive jar>
* --packages org.spark-project.hive:hive-exec:1.2.1.spark2
* <spark hive test jar>
* 2. build/sbt "hive/test:runMain <this class>" -Phive-1.2 or
* build/sbt "hive/test:runMain <this class>" -Phive-2.3
* 3. generate result:
* SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain <this class>"
* Results will be written to "benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt".
* 4. -Phive-1.2 does not work for JDK 11
* }}}
*/
object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark {

override def getSparkSession: SparkSession = TestHive.sparkSession

val tempView = "temp"
val numRows = 1024 * 10
val sql = spark.sql _

// scalastyle:off hadoopconfiguration
private val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
hadoopConf.set("hive.exec.dynamic.partition", "true")
hadoopConf.set("hive.exec.dynamic.partition.mode", "nonstrict")
hadoopConf.set("hive.exec.max.dynamic.partitions", numRows.toString)

def withTable(tableNames: String*)(f: => Unit): Unit = {
tableNames.foreach { name =>
sql(s"CREATE TABLE $name(a INT) STORED AS TEXTFILE PARTITIONED BY (b INT, c INT)")
}
try f finally {
tableNames.foreach { name =>
spark.sql(s"DROP TABLE IF EXISTS $name")
}
}
}

def insertOverwriteDynamic(table: String, benchmark: Benchmark): Unit = {
benchmark.addCase("INSERT OVERWRITE DYNAMIC") { _ =>
sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS a," +
s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView")
}
}

def insertOverwriteHybrid(table: String, benchmark: Benchmark): Unit = {
benchmark.addCase("INSERT OVERWRITE HYBRID") { _ =>
sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," +
s" CAST(id % 10 AS INT) AS c FROM $tempView")
}
}

def insertOverwriteStatic(table: String, benchmark: Benchmark): Unit = {
benchmark.addCase("INSERT OVERWRITE STATIC") { _ =>
sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" +
s" FROM $tempView")
}
}

def insertIntoDynamic(table: String, benchmark: Benchmark): Unit = {
benchmark.addCase("INSERT INTO DYNAMIC") { _ =>
sql(s"INSERT INTO TABLE $table SELECT CAST(id AS INT) AS a," +
s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView")
}
}

def insertIntoHybrid(table: String, benchmark: Benchmark): Unit = {
benchmark.addCase("INSERT INTO HYBRID") { _ =>
sql(s"INSERT INTO TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," +
s" CAST(id % 10 AS INT) AS c FROM $tempView")
}
}

def insertIntoStatic(table: String, benchmark: Benchmark): Unit = {
benchmark.addCase("INSERT INTO STATIC") { _ =>
sql(s"INSERT INTO TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" +
s" FROM $tempView")
}
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
spark.range(numRows).createOrReplaceTempView(tempView)

try {
val t1 = "t1"
val t2 = "t2"
val t3 = "t3"
val t4 = "t4"
val t5 = "t5"
val t6 = "t6"

val benchmark = new Benchmark(s"insert hive table benchmark", numRows, output = output)

withTable(t1, t2, t3, t4, t5, t6) {

insertIntoDynamic(t1, benchmark)
insertIntoHybrid(t2, benchmark)
insertIntoStatic(t3, benchmark)

insertOverwriteDynamic(t4, benchmark)
insertOverwriteHybrid(t5, benchmark)
insertOverwriteStatic(t6, benchmark)

benchmark.run()
}
} finally {
spark.catalog.dropTempView(tempView)
}
}

override def suffix: String = if (HiveUtils.isHive23) "-hive2.3" else "-hive1.2"
}