From ff71732f8292d46d264bbfe880503b334499fb5a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 12 May 2020 14:28:49 +0800 Subject: [PATCH 1/9] [SPARK-31684][SQL] Overwrite partition failed with 'WRONG FS' when the target partition is not belong to the filesystem as same as the table --- .../hive/execution/InsertIntoHiveTable.scala | 65 +++++++++++++++++-- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 19f439598142..96223b71c7eb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,22 +17,25 @@ package org.apache.spark.sql.hive.execution +import java.net.URI import java.util.Locale import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils, ExternalCatalogWithListener} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils +import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} +import org.apache.spark.sql.hive.client.hive._ /** @@ -281,11 +284,27 @@ case class InsertIntoHiveTable( oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri))) } + val hiveVersion = externalCatalog.asInstanceOf[ExternalCatalogWithListener] + .unwrapped.asInstanceOf[HiveExternalCatalog] + .client + .version + // https://issues.apache.org/jira/browse/SPARK-31684, + // For Hive 2.0.0 and onwards, as https://issues.apache.org/jira/browse/HIVE-11940 + // has been fixed, and there is no performance issue anymore. + // We should leave the overwrite logic to hive to avoid failure in `FileSystem#checkPath` + // For Hive versions before 2.0.0, we leave the replace work to hive when the table + // and partition locations do not belong to the same FileSystem + // TODO(SPARK-31675): For Hive 2.2.0 and earlier, if the table and partition locations + // do not belong together, we will still get the same error thrown by hive encryption + // check. + val hiveVersDoHiveOverwrite: Set[HiveVersion] = Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) + val canDisable = !hiveVersDoHiveOverwrite.contains(hiveVersion) && + canDisableHiveOverwrite(table.location, partitionPath.map(_.toUri).orNull, hadoopConf) // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. - if (partitionPath.nonEmpty && overwrite) { + if (partitionPath.nonEmpty && overwrite && canDisable) { partitionPath.foreach { path => val fs = path.getFileSystem(hadoopConf) if (fs.exists(path)) { @@ -321,4 +340,42 @@ case class InsertIntoHiveTable( isSrcLocal = false) } } + + // scalastyle:off line.size.limit + /** + * If the table location and partition location do not belong to the same [[FileSystem]], We + * should not disable hive overwrite. Otherwise, hive will use the [[FileSystem]] instance belong + * to the table location to copy files, which will fail in [[FileSystem#checkPath]] + * see https://github.com/apache/hive/blob/rel/release-2.3.7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L1648-L1659 + */ + // scalastyle:on line.size.limit + private def canDisableHiveOverwrite( + tableLocation: URI, + partitionLocation: URI, + hadoopConf: Configuration): Boolean = { + if (tableLocation == null || partitionLocation == null) return true + val partScheme = partitionLocation.getScheme + if (partScheme == null) return true // relative path + + val tblScheme = tableLocation.getScheme + // authority and scheme are not case sensitive + if (partScheme.equalsIgnoreCase(tblScheme)) { + val partAuthority = partitionLocation.getAuthority + val tblAuthority = tableLocation.getAuthority + if (partAuthority != null && tblAuthority != null) { + tblAuthority.equalsIgnoreCase(partAuthority) + } else { + val defaultUri = FileSystem.getDefaultUri(hadoopConf) + if (tblAuthority != null) { + tblAuthority.equalsIgnoreCase(defaultUri.getAuthority) + } else if (partAuthority != null) { + partAuthority.equalsIgnoreCase(defaultUri.getAuthority) + } else { + true + } + } + } else { + false + } + } } From e4bfc7b35672b1e87ad459bdd6d10cbd5a1db319 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 15 May 2020 23:28:04 +0800 Subject: [PATCH 2/9] add benchmark & check only version --- .../spark/benchmark/BenchmarkBase.scala | 6 +- ...IntoHiveTableBenchmark-hive1.2-results.txt | 11 ++ ...rtIntoHiveTableBenchmark-jdk11-results.txt | 11 ++ .../InsertIntoHiveTableBenchmark-results.txt | 11 ++ .../hive/execution/InsertIntoHiveTable.scala | 54 +------ .../InsertIntoHiveTableBenchmark.scala | 144 ++++++++++++++++++ 6 files changed, 190 insertions(+), 47 deletions(-) create mode 100644 sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt create mode 100644 sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-results.txt create mode 100644 sql/hive/benchmarks/InsertIntoHiveTableBenchmark-results.txt create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala diff --git a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala index 55e34b32fe0d..bd15a34fffdb 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala @@ -46,7 +46,9 @@ abstract class BenchmarkBase { if (regenerateBenchmarkFiles) { val version = System.getProperty("java.version").split("\\D+")(0).toInt val jdkString = if (version > 8) s"-jdk$version" else "" - val resultFileName = s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt" + val suffixWithHyphen = if (suffix == null || suffix.isEmpty) "" else "-" + suffix + val resultFileName = + s"${this.getClass.getSimpleName.replace("$", "")}$jdkString$suffixWithHyphen-results.txt" val file = new File(s"benchmarks/$resultFileName") if (!file.exists()) { file.createNewFile() @@ -65,6 +67,8 @@ abstract class BenchmarkBase { afterAll() } + def suffix: String = "" + /** * Any shutdown code to ensure a clean shutdown */ diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt new file mode 100644 index 000000000000..c17abc469e0d --- /dev/null +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt @@ -0,0 +1,11 @@ +Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4 +Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +INSERT INTO DYNAMIC 19474 20570 1550 0.0 1901725.8 1.0X +INSERT INTO HYBRID 2292 2334 60 0.0 223806.6 8.5X +INSERT INTO STATIC 409 436 32 0.0 39989.6 47.6X +INSERT OVERWRITE DYNAMIC 69798 69918 169 0.0 6816229.6 0.3X +INSERT OVERWRITE HYBRID 7224 7227 4 0.0 705486.3 2.7X +INSERT OVERWRITE STATIC 405 429 20 0.0 39542.5 48.1X + diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-results.txt new file mode 100644 index 000000000000..ec7bf39a4e2d --- /dev/null +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-results.txt @@ -0,0 +1,11 @@ +Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4 +Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +INSERT INTO DYNAMIC 9387 10298 1288 0.0 916724.5 1.0X +INSERT INTO HYBRID 1160 1219 84 0.0 113257.1 8.1X +INSERT INTO STATIC 370 378 4 0.0 36168.4 25.3X +INSERT OVERWRITE DYNAMIC 8547 8729 258 0.0 834634.3 1.1X +INSERT OVERWRITE HYBRID 1248 1298 72 0.0 121845.0 7.5X +INSERT OVERWRITE STATIC 432 443 7 0.0 42209.9 21.7X + diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-results.txt new file mode 100644 index 000000000000..c91b1a915dbd --- /dev/null +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-results.txt @@ -0,0 +1,11 @@ +Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4 +Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +INSERT INTO DYNAMIC 7346 7470 175 0.0 717423.0 1.0X +INSERT INTO HYBRID 1179 1188 13 0.0 115184.2 6.2X +INSERT INTO STATIC 344 367 48 0.0 33585.1 21.4X +INSERT OVERWRITE DYNAMIC 7656 7714 82 0.0 747622.7 1.0X +INSERT OVERWRITE HYBRID 1179 1183 6 0.0 115163.3 6.2X +INSERT OVERWRITE STATIC 400 408 10 0.0 39014.2 18.4X + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 96223b71c7eb..7607e711554f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -290,21 +290,21 @@ case class InsertIntoHiveTable( .version // https://issues.apache.org/jira/browse/SPARK-31684, // For Hive 2.0.0 and onwards, as https://issues.apache.org/jira/browse/HIVE-11940 - // has been fixed, and there is no performance issue anymore. - // We should leave the overwrite logic to hive to avoid failure in `FileSystem#checkPath` - // For Hive versions before 2.0.0, we leave the replace work to hive when the table - // and partition locations do not belong to the same FileSystem + // has been fixed, and there is no performance issue anymore. We should leave the + // overwrite logic to hive to avoid failure in `FileSystem#checkPath` when the table + // and partition locations do not belong to the same `FileSystem` // TODO(SPARK-31675): For Hive 2.2.0 and earlier, if the table and partition locations // do not belong together, we will still get the same error thrown by hive encryption - // check. + // check. see https://issues.apache.org/jira/browse/HIVE-14380. + // So we still disable for Hive overwrite for Hive 1.x for better performance because + // the partition and table are on the same cluster in most cases. val hiveVersDoHiveOverwrite: Set[HiveVersion] = Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) - val canDisable = !hiveVersDoHiveOverwrite.contains(hiveVersion) && - canDisableHiveOverwrite(table.location, partitionPath.map(_.toUri).orNull, hadoopConf) // SPARK-18107: Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. - if (partitionPath.nonEmpty && overwrite && canDisable) { + if (partitionPath.nonEmpty && overwrite && + !hiveVersDoHiveOverwrite.contains(hiveVersion)) { partitionPath.foreach { path => val fs = path.getFileSystem(hadoopConf) if (fs.exists(path)) { @@ -340,42 +340,4 @@ case class InsertIntoHiveTable( isSrcLocal = false) } } - - // scalastyle:off line.size.limit - /** - * If the table location and partition location do not belong to the same [[FileSystem]], We - * should not disable hive overwrite. Otherwise, hive will use the [[FileSystem]] instance belong - * to the table location to copy files, which will fail in [[FileSystem#checkPath]] - * see https://github.com/apache/hive/blob/rel/release-2.3.7/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L1648-L1659 - */ - // scalastyle:on line.size.limit - private def canDisableHiveOverwrite( - tableLocation: URI, - partitionLocation: URI, - hadoopConf: Configuration): Boolean = { - if (tableLocation == null || partitionLocation == null) return true - val partScheme = partitionLocation.getScheme - if (partScheme == null) return true // relative path - - val tblScheme = tableLocation.getScheme - // authority and scheme are not case sensitive - if (partScheme.equalsIgnoreCase(tblScheme)) { - val partAuthority = partitionLocation.getAuthority - val tblAuthority = tableLocation.getAuthority - if (partAuthority != null && tblAuthority != null) { - tblAuthority.equalsIgnoreCase(partAuthority) - } else { - val defaultUri = FileSystem.getDefaultUri(hadoopConf) - if (tblAuthority != null) { - tblAuthority.equalsIgnoreCase(defaultUri.getAuthority) - } else if (partAuthority != null) { - partAuthority.equalsIgnoreCase(defaultUri.getAuthority) - } else { - true - } - } - } else { - false - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala new file mode 100644 index 000000000000..2efcbad33f61 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.sql.hive.test.TestHive + +/** + * Benchmark to measure hive table write performance. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * --jars ,, + * --packages org.spark-project.hive:hive-exec:1.2.1.spark2 + * + * 2. build/sbt "hive/test:runMain " -Phive-1.2 or + * build/sbt "hive/test:runMain " -Phive-2.3 + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain " + * Results will be written to "benchmarks/InsertIntoHiveTableBenchmark-results.txt". + * 4. -Phive-1.2 does not work for JDK 11 + * }}} + */ +object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { + + override def getSparkSession: SparkSession = TestHive.sparkSession + + val tempTable = "temp" + val numRows = 1024 * 10 + val sql = spark.sql _ + + // scalastyle:off hadoopconfiguration + private val hadoopConf = spark.sparkContext.hadoopConfiguration + // scalastyle:on hadoopconfiguration + hadoopConf.set("hive.exec.dynamic.partition", "true") + hadoopConf.set("hive.exec.dynamic.partition.mode", "nonstrict") + hadoopConf.set("hive.exec.max.dynamic.partitions", numRows.toString) + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { + val ds = spark.range(numRows) + tableNames.foreach { name => + ds.createOrReplaceTempView(name) + } + try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withTable(tableNames: String*)(f: => Unit): Unit = { + tableNames.foreach { name => + sql(s"CREATE TABLE $name(a INT) STORED AS TEXTFILE PARTITIONED BY (b INT, c INT)") + } + try f finally { + tableNames.foreach { name => + spark.sql(s"DROP TABLE IF EXISTS $name") + } + } + } + + def insertOverwriteDynamic(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT OVERWRITE DYNAMIC") { _ => + sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS a," + + s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") + } + } + + def insertOverwriteHybrid(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT OVERWRITE HYBRID") { _ => + sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," + + s" CAST(id % 10 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") + } + } + + def insertOverwriteStatic(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT OVERWRITE STATIC") { _ => + sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" + + s" FROM $tempTable DISTRIBUTE BY a") + } + } + + def insertIntoDynamic(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT INTO DYNAMIC") { _ => + sql(s"INSERT INTO TABLE $table SELECT CAST(id AS INT) AS a," + + s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") + } + } + + def insertIntoHybrid(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT INTO HYBRID") { _ => + sql(s"INSERT INTO TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," + + s" CAST(id % 10 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") + } + } + + def insertIntoStatic(table: String, benchmark: Benchmark): Unit = { + benchmark.addCase("INSERT INTO STATIC") { _ => + sql(s"INSERT INTO TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" + + s" FROM $tempTable DISTRIBUTE BY a") + } + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + withTempTable(tempTable) { + val t1 = "t1" + val t2 = "t2" + val t3 = "t3" + val t4 = "t4" + val t5 = "t5" + val t6 = "t6" + + val benchmark = new Benchmark(s"insert hive table benchmark", numRows, output = output) + + withTable(t1, t2, t3, t4, t5, t6) { + + insertIntoDynamic(t1, benchmark) + insertIntoHybrid(t2, benchmark) + insertIntoStatic(t3, benchmark) + + insertOverwriteDynamic(t4, benchmark) + insertOverwriteHybrid(t5, benchmark) + insertOverwriteStatic(t6, benchmark) + + benchmark.run() + } + } + } + + override def suffix: String = if (HiveUtils.isHive23) "" else "hive1.2" +} From 85b27d6cbeccf48fd865190ae9258a536ed42e10 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Fri, 15 May 2020 23:45:21 +0800 Subject: [PATCH 3/9] nit --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 7607e711554f..4789e5312ba1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,17 +17,16 @@ package org.apache.spark.sql.hive.execution -import java.net.URI import java.util.Locale import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils, ExternalCatalogWithListener} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan From f7c6b5126b601a87a62ac6eb50217f03f3c1b27a Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 18 May 2020 14:04:57 +0800 Subject: [PATCH 4/9] suffix --- .../InsertIntoHiveTableBenchmark-hive2.3-results.txt | 11 +++++++++++ ...rtIntoHiveTableBenchmark-jdk11-hive2.3-results.txt | 11 +++++++++++ .../InsertIntoHiveTableBenchmark-jdk11-results.txt | 11 ----------- .../InsertIntoHiveTableBenchmark-results.txt | 11 ----------- .../benchmark/InsertIntoHiveTableBenchmark.scala | 4 ++-- 5 files changed, 24 insertions(+), 24 deletions(-) create mode 100644 sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt create mode 100644 sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt delete mode 100644 sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-results.txt delete mode 100644 sql/hive/benchmarks/InsertIntoHiveTableBenchmark-results.txt diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt new file mode 100644 index 000000000000..477c058ddf5c --- /dev/null +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt @@ -0,0 +1,11 @@ +Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4 +Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +INSERT INTO DYNAMIC 7742 7918 248 0.0 756044.0 1.0X +INSERT INTO HYBRID 1289 1307 26 0.0 125866.3 6.0X +INSERT INTO STATIC 371 393 38 0.0 36219.4 20.9X +INSERT OVERWRITE DYNAMIC 8456 8554 138 0.0 825790.3 0.9X +INSERT OVERWRITE HYBRID 1303 1311 12 0.0 127198.4 5.9X +INSERT OVERWRITE STATIC 434 447 13 0.0 42373.8 17.8X + diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt new file mode 100644 index 000000000000..2b5b21157a87 --- /dev/null +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt @@ -0,0 +1,11 @@ +Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4 +Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz +insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +INSERT INTO DYNAMIC 9607 10174 802 0.0 938164.7 1.0X +INSERT INTO HYBRID 1365 1408 60 0.0 133312.0 7.0X +INSERT INTO STATIC 444 454 8 0.0 43340.2 21.6X +INSERT OVERWRITE DYNAMIC 9916 9929 19 0.0 968354.8 1.0X +INSERT OVERWRITE HYBRID 1441 1447 8 0.0 140767.1 6.7X +INSERT OVERWRITE STATIC 524 533 14 0.0 51181.2 18.3X + diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-results.txt deleted file mode 100644 index ec7bf39a4e2d..000000000000 --- a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-results.txt +++ /dev/null @@ -1,11 +0,0 @@ -Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4 -Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz -insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -INSERT INTO DYNAMIC 9387 10298 1288 0.0 916724.5 1.0X -INSERT INTO HYBRID 1160 1219 84 0.0 113257.1 8.1X -INSERT INTO STATIC 370 378 4 0.0 36168.4 25.3X -INSERT OVERWRITE DYNAMIC 8547 8729 258 0.0 834634.3 1.1X -INSERT OVERWRITE HYBRID 1248 1298 72 0.0 121845.0 7.5X -INSERT OVERWRITE STATIC 432 443 7 0.0 42209.9 21.7X - diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-results.txt deleted file mode 100644 index c91b1a915dbd..000000000000 --- a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-results.txt +++ /dev/null @@ -1,11 +0,0 @@ -Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4 -Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz -insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -INSERT INTO DYNAMIC 7346 7470 175 0.0 717423.0 1.0X -INSERT INTO HYBRID 1179 1188 13 0.0 115184.2 6.2X -INSERT INTO STATIC 344 367 48 0.0 33585.1 21.4X -INSERT OVERWRITE DYNAMIC 7656 7714 82 0.0 747622.7 1.0X -INSERT OVERWRITE HYBRID 1179 1183 6 0.0 115163.3 6.2X -INSERT OVERWRITE STATIC 400 408 10 0.0 39014.2 18.4X - diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala index 2efcbad33f61..976d1bb64bbc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.hive.test.TestHive * build/sbt "hive/test:runMain " -Phive-2.3 * 3. generate result: * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "hive/test:runMain " - * Results will be written to "benchmarks/InsertIntoHiveTableBenchmark-results.txt". + * Results will be written to "benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt". * 4. -Phive-1.2 does not work for JDK 11 * }}} */ @@ -140,5 +140,5 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { } } - override def suffix: String = if (HiveUtils.isHive23) "" else "hive1.2" + override def suffix: String = if (HiveUtils.isHive23) "hive2.3" else "hive1.2" } From 11456322c7e2385db7d3b271c56cbe782a8c0997 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 18 May 2020 20:20:06 +0800 Subject: [PATCH 5/9] make hiveversion comparable --- .../org/apache/spark/benchmark/BenchmarkBase.scala | 3 +-- .../org/apache/spark/sql/hive/client/package.scala | 14 +++++++++++++- .../sql/hive/execution/InsertIntoHiveTable.scala | 11 +++++------ .../benchmark/InsertIntoHiveTableBenchmark.scala | 2 +- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala index bd15a34fffdb..e97b9d5d6bea 100644 --- a/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala +++ b/core/src/test/scala/org/apache/spark/benchmark/BenchmarkBase.scala @@ -46,9 +46,8 @@ abstract class BenchmarkBase { if (regenerateBenchmarkFiles) { val version = System.getProperty("java.version").split("\\D+")(0).toInt val jdkString = if (version > 8) s"-jdk$version" else "" - val suffixWithHyphen = if (suffix == null || suffix.isEmpty) "" else "-" + suffix val resultFileName = - s"${this.getClass.getSimpleName.replace("$", "")}$jdkString$suffixWithHyphen-results.txt" + s"${this.getClass.getSimpleName.replace("$", "")}$jdkString$suffix-results.txt" val file = new File(s"benchmarks/$resultFileName") if (!file.exists()) { file.createNewFile() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index 8526d8645460..21e7e0d5e17f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -22,7 +22,19 @@ package object client { private[hive] sealed abstract class HiveVersion( val fullVersion: String, val extraDeps: Seq[String] = Nil, - val exclusions: Seq[String] = Nil) + val exclusions: Seq[String] = Nil) extends Ordered[HiveVersion] { + override def compare(that: HiveVersion): Int = { + val thisVersionParts = fullVersion.split('.').map(_.toInt) + val thatVersionParts = that.fullVersion.split('.').map(_.toInt) + thisVersionParts.zip(thatVersionParts).foreach { case (l, r) => + val candidate = l - r + if (candidate != 0) { + return candidate + } + } + 0 + } + } // scalastyle:off private[hive] object hive { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 4789e5312ba1..1a3244684df4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.client.hive._ @@ -287,7 +287,7 @@ case class InsertIntoHiveTable( .unwrapped.asInstanceOf[HiveExternalCatalog] .client .version - // https://issues.apache.org/jira/browse/SPARK-31684, + // SPARK-31684: // For Hive 2.0.0 and onwards, as https://issues.apache.org/jira/browse/HIVE-11940 // has been fixed, and there is no performance issue anymore. We should leave the // overwrite logic to hive to avoid failure in `FileSystem#checkPath` when the table @@ -297,13 +297,12 @@ case class InsertIntoHiveTable( // check. see https://issues.apache.org/jira/browse/HIVE-14380. // So we still disable for Hive overwrite for Hive 1.x for better performance because // the partition and table are on the same cluster in most cases. - val hiveVersDoHiveOverwrite: Set[HiveVersion] = Set(v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) - // SPARK-18107: Insert overwrite runs much slower than hive-client. + // SPARK-18107: + // Insert overwrite runs much slower than hive-client. // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive // version and we may not want to catch up new Hive version every time. We delete the // Hive partition first and then load data file into the Hive partition. - if (partitionPath.nonEmpty && overwrite && - !hiveVersDoHiveOverwrite.contains(hiveVersion)) { + if (partitionPath.nonEmpty && overwrite && hiveVersion < v2_0) { partitionPath.foreach { path => val fs = path.getFileSystem(hadoopConf) if (fs.exists(path)) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala index 976d1bb64bbc..bb85a64de4fb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala @@ -140,5 +140,5 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { } } - override def suffix: String = if (HiveUtils.isHive23) "hive2.3" else "hive1.2" + override def suffix: String = if (HiveUtils.isHive23) "-hive2.3" else "-hive1.2" } From af4bfa9ad981c090a81793c5b793d0703013e40e Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 18 May 2020 20:30:14 +0800 Subject: [PATCH 6/9] address comments --- .../benchmark/InsertIntoHiveTableBenchmark.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala index bb85a64de4fb..23b0a79aec11 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala @@ -53,14 +53,6 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { hadoopConf.set("hive.exec.dynamic.partition.mode", "nonstrict") hadoopConf.set("hive.exec.max.dynamic.partitions", numRows.toString) - def withTempTable(tableNames: String*)(f: => Unit): Unit = { - val ds = spark.range(numRows) - tableNames.foreach { name => - ds.createOrReplaceTempView(name) - } - try f finally tableNames.foreach(spark.catalog.dropTempView) - } - def withTable(tableNames: String*)(f: => Unit): Unit = { tableNames.foreach { name => sql(s"CREATE TABLE $name(a INT) STORED AS TEXTFILE PARTITIONED BY (b INT, c INT)") @@ -115,7 +107,9 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - withTempTable(tempTable) { + spark.range(numRows).createOrReplaceTempView(tempTable) + + try { val t1 = "t1" val t2 = "t2" val t3 = "t3" @@ -137,6 +131,8 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { benchmark.run() } + } finally { + spark.catalog.dropTempView(tempTable) } } From ddeb3e2ed9b7e1b0fe5ce30c3a9a0dfe1a02cb44 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 18 May 2020 22:00:20 +0800 Subject: [PATCH 7/9] address comments --- .../scala/org/apache/spark/sql/hive/client/package.scala | 1 + .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 9 ++++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala index 21e7e0d5e17f..27ba3eca8194 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala @@ -26,6 +26,7 @@ package object client { override def compare(that: HiveVersion): Int = { val thisVersionParts = fullVersion.split('.').map(_.toInt) val thatVersionParts = that.fullVersion.split('.').map(_.toInt) + assert(thisVersionParts.length == thatVersionParts.length) thisVersionParts.zip(thatVersionParts).foreach { case (l, r) => val candidate = l - r if (candidate != 0) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 1a3244684df4..9f83f2ab9609 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -283,6 +283,10 @@ case class InsertIntoHiveTable( oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri))) } + // SPARK-18107: Insert overwrite runs much slower than hive-client. + // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive + // version and we may not want to catch up new Hive version every time. We delete the + // Hive partition first and then load data file into the Hive partition. val hiveVersion = externalCatalog.asInstanceOf[ExternalCatalogWithListener] .unwrapped.asInstanceOf[HiveExternalCatalog] .client @@ -297,11 +301,6 @@ case class InsertIntoHiveTable( // check. see https://issues.apache.org/jira/browse/HIVE-14380. // So we still disable for Hive overwrite for Hive 1.x for better performance because // the partition and table are on the same cluster in most cases. - // SPARK-18107: - // Insert overwrite runs much slower than hive-client. - // Newer Hive largely improves insert overwrite performance. As Spark uses older Hive - // version and we may not want to catch up new Hive version every time. We delete the - // Hive partition first and then load data file into the Hive partition. if (partitionPath.nonEmpty && overwrite && hiveVersion < v2_0) { partitionPath.foreach { path => val fs = path.getFileSystem(hadoopConf) From eea99f5b3199e3058f01128e5d74bafb4c9bebde Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Mon, 18 May 2020 22:04:30 +0800 Subject: [PATCH 8/9] naming --- .../InsertIntoHiveTableBenchmark.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala index 23b0a79aec11..a5207b561a05 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala @@ -42,7 +42,7 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { override def getSparkSession: SparkSession = TestHive.sparkSession - val tempTable = "temp" + val tempView = "temp" val numRows = 1024 * 10 val sql = spark.sql _ @@ -67,47 +67,47 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { def insertOverwriteDynamic(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT OVERWRITE DYNAMIC") { _ => sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS a," + - s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") + s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView DISTRIBUTE BY a") } } def insertOverwriteHybrid(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT OVERWRITE HYBRID") { _ => sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," + - s" CAST(id % 10 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") + s" CAST(id % 10 AS INT) AS c FROM $tempView DISTRIBUTE BY a") } } def insertOverwriteStatic(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT OVERWRITE STATIC") { _ => sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" + - s" FROM $tempTable DISTRIBUTE BY a") + s" FROM $tempView DISTRIBUTE BY a") } } def insertIntoDynamic(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT INTO DYNAMIC") { _ => sql(s"INSERT INTO TABLE $table SELECT CAST(id AS INT) AS a," + - s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") + s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView DISTRIBUTE BY a") } } def insertIntoHybrid(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT INTO HYBRID") { _ => sql(s"INSERT INTO TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," + - s" CAST(id % 10 AS INT) AS c FROM $tempTable DISTRIBUTE BY a") + s" CAST(id % 10 AS INT) AS c FROM $tempView DISTRIBUTE BY a") } } def insertIntoStatic(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT INTO STATIC") { _ => sql(s"INSERT INTO TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" + - s" FROM $tempTable DISTRIBUTE BY a") + s" FROM $tempView DISTRIBUTE BY a") } } override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { - spark.range(numRows).createOrReplaceTempView(tempTable) + spark.range(numRows).createOrReplaceTempView(tempView) try { val t1 = "t1" @@ -132,7 +132,7 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { benchmark.run() } } finally { - spark.catalog.dropTempView(tempTable) + spark.catalog.dropTempView(tempView) } } From 78e097284096b27839928668a3deaf6a49cab336 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 19 May 2020 13:57:28 +0800 Subject: [PATCH 9/9] rm distribute by --- .../InsertIntoHiveTableBenchmark-hive1.2-results.txt | 12 ++++++------ .../InsertIntoHiveTableBenchmark-hive2.3-results.txt | 12 ++++++------ ...tIntoHiveTableBenchmark-jdk11-hive2.3-results.txt | 12 ++++++------ .../benchmark/InsertIntoHiveTableBenchmark.scala | 12 ++++++------ 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt index c17abc469e0d..85884a1aaf73 100644 --- a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive1.2-results.txt @@ -2,10 +2,10 @@ Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -INSERT INTO DYNAMIC 19474 20570 1550 0.0 1901725.8 1.0X -INSERT INTO HYBRID 2292 2334 60 0.0 223806.6 8.5X -INSERT INTO STATIC 409 436 32 0.0 39989.6 47.6X -INSERT OVERWRITE DYNAMIC 69798 69918 169 0.0 6816229.6 0.3X -INSERT OVERWRITE HYBRID 7224 7227 4 0.0 705486.3 2.7X -INSERT OVERWRITE STATIC 405 429 20 0.0 39542.5 48.1X +INSERT INTO DYNAMIC 6812 7043 328 0.0 665204.8 1.0X +INSERT INTO HYBRID 817 852 32 0.0 79783.6 8.3X +INSERT INTO STATIC 231 246 21 0.0 22568.2 29.5X +INSERT OVERWRITE DYNAMIC 25947 26671 1024 0.0 2533910.2 0.3X +INSERT OVERWRITE HYBRID 2846 2884 54 0.0 277908.7 2.4X +INSERT OVERWRITE STATIC 232 247 26 0.0 22659.9 29.4X diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt index 477c058ddf5c..ea8e6057ea61 100644 --- a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-hive2.3-results.txt @@ -2,10 +2,10 @@ Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -INSERT INTO DYNAMIC 7742 7918 248 0.0 756044.0 1.0X -INSERT INTO HYBRID 1289 1307 26 0.0 125866.3 6.0X -INSERT INTO STATIC 371 393 38 0.0 36219.4 20.9X -INSERT OVERWRITE DYNAMIC 8456 8554 138 0.0 825790.3 0.9X -INSERT OVERWRITE HYBRID 1303 1311 12 0.0 127198.4 5.9X -INSERT OVERWRITE STATIC 434 447 13 0.0 42373.8 17.8X +INSERT INTO DYNAMIC 4326 4373 66 0.0 422486.0 1.0X +INSERT INTO HYBRID 726 741 21 0.0 70877.2 6.0X +INSERT INTO STATIC 256 270 12 0.0 25015.7 16.9X +INSERT OVERWRITE DYNAMIC 4115 4150 49 0.0 401828.8 1.1X +INSERT OVERWRITE HYBRID 690 699 8 0.0 67370.5 6.3X +INSERT OVERWRITE STATIC 277 283 5 0.0 27097.9 15.6X diff --git a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt index 2b5b21157a87..c7a642aad527 100644 --- a/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt +++ b/sql/hive/benchmarks/InsertIntoHiveTableBenchmark-jdk11-hive2.3-results.txt @@ -2,10 +2,10 @@ Java HotSpot(TM) 64-Bit Server VM 11.0.5+10-LTS on Mac OS X 10.15.4 Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz insert hive table benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -INSERT INTO DYNAMIC 9607 10174 802 0.0 938164.7 1.0X -INSERT INTO HYBRID 1365 1408 60 0.0 133312.0 7.0X -INSERT INTO STATIC 444 454 8 0.0 43340.2 21.6X -INSERT OVERWRITE DYNAMIC 9916 9929 19 0.0 968354.8 1.0X -INSERT OVERWRITE HYBRID 1441 1447 8 0.0 140767.1 6.7X -INSERT OVERWRITE STATIC 524 533 14 0.0 51181.2 18.3X +INSERT INTO DYNAMIC 5083 5412 466 0.0 496384.5 1.0X +INSERT INTO HYBRID 822 864 43 0.0 80283.6 6.2X +INSERT INTO STATIC 335 342 5 0.0 32694.1 15.2X +INSERT OVERWRITE DYNAMIC 4941 5068 179 0.0 482534.5 1.0X +INSERT OVERWRITE HYBRID 722 745 27 0.0 70502.7 7.0X +INSERT OVERWRITE STATIC 295 314 12 0.0 28846.8 17.2X diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala index a5207b561a05..81eb5e2591f1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/InsertIntoHiveTableBenchmark.scala @@ -67,42 +67,42 @@ object InsertIntoHiveTableBenchmark extends SqlBasedBenchmark { def insertOverwriteDynamic(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT OVERWRITE DYNAMIC") { _ => sql(s"INSERT OVERWRITE TABLE $table SELECT CAST(id AS INT) AS a," + - s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView DISTRIBUTE BY a") + s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView") } } def insertOverwriteHybrid(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT OVERWRITE HYBRID") { _ => sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," + - s" CAST(id % 10 AS INT) AS c FROM $tempView DISTRIBUTE BY a") + s" CAST(id % 10 AS INT) AS c FROM $tempView") } } def insertOverwriteStatic(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT OVERWRITE STATIC") { _ => sql(s"INSERT OVERWRITE TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" + - s" FROM $tempView DISTRIBUTE BY a") + s" FROM $tempView") } } def insertIntoDynamic(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT INTO DYNAMIC") { _ => sql(s"INSERT INTO TABLE $table SELECT CAST(id AS INT) AS a," + - s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView DISTRIBUTE BY a") + s" CAST(id % 10 AS INT) AS b, CAST(id % 100 AS INT) AS c FROM $tempView") } } def insertIntoHybrid(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT INTO HYBRID") { _ => sql(s"INSERT INTO TABLE $table partition(b=1, c) SELECT CAST(id AS INT) AS a," + - s" CAST(id % 10 AS INT) AS c FROM $tempView DISTRIBUTE BY a") + s" CAST(id % 10 AS INT) AS c FROM $tempView") } } def insertIntoStatic(table: String, benchmark: Benchmark): Unit = { benchmark.addCase("INSERT INTO STATIC") { _ => sql(s"INSERT INTO TABLE $table partition(b=1, c=10) SELECT CAST(id AS INT) AS a" + - s" FROM $tempView DISTRIBUTE BY a") + s" FROM $tempView") } }