diff --git a/sql/core/benchmarks/MiscBenchmark-results.txt b/sql/core/benchmarks/MiscBenchmark-results.txt new file mode 100644 index 0000000000000..85acd57893655 --- /dev/null +++ b/sql/core/benchmarks/MiscBenchmark-results.txt @@ -0,0 +1,120 @@ +================================================================================================ +filter & aggregate without group +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +range/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +range/filter/sum wholestage off 47752 / 48952 43.9 22.8 1.0X +range/filter/sum wholestage on 3123 / 3558 671.5 1.5 15.3X + + +================================================================================================ +range/limit/sum +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +range/limit/sum wholestage off 229 / 236 2288.9 0.4 1.0X +range/limit/sum wholestage on 257 / 267 2041.0 0.5 0.9X + + +================================================================================================ +sample +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +sample with replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +sample with replacement wholestage off 12908 / 13076 10.2 98.5 1.0X +sample with replacement wholestage on 7334 / 7346 17.9 56.0 1.8X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +sample without replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +sample without replacement wholestage off 3082 / 3095 42.5 23.5 1.0X +sample without replacement wholestage on 1125 / 1211 116.5 8.6 2.7X + + +================================================================================================ +collect +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +collect 1 million 291 / 311 3.6 277.3 1.0X +collect 2 millions 552 / 564 1.9 526.6 0.5X +collect 4 millions 1104 / 1108 0.9 1053.0 0.3X + + +================================================================================================ +collect limit +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +collect limit 1 million 311 / 340 3.4 296.2 1.0X +collect limit 2 millions 581 / 614 1.8 554.4 0.5X + + +================================================================================================ +generate explode +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +generate explode array wholestage off 15211 / 15368 1.1 906.6 1.0X +generate explode array wholestage on 10761 / 10776 1.6 641.4 1.4X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +generate explode map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +generate explode map wholestage off 22128 / 22578 0.8 1318.9 1.0X +generate explode map wholestage on 16421 / 16520 1.0 978.8 1.3X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +generate posexplode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +generate posexplode array wholestage off 17108 / 18019 1.0 1019.7 1.0X +generate posexplode array wholestage on 11715 / 11804 1.4 698.3 1.5X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +generate inline array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +generate inline array wholestage off 16358 / 16418 1.0 975.0 1.0X +generate inline array wholestage on 11152 / 11472 1.5 664.7 1.5X + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +generate big struct array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +generate big struct array wholestage off 708 / 776 0.1 11803.5 1.0X +generate big struct array wholestage on 535 / 589 0.1 8913.9 1.3X + + +================================================================================================ +generate regular generator +================================================================================================ + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +generate stack wholestage off 29082 / 29393 0.6 1733.4 1.0X +generate stack wholestage on 21066 / 21128 0.8 1255.6 1.4X + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala index f44da242e62b9..43380869fefe4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/MiscBenchmark.scala @@ -21,247 +21,140 @@ import org.apache.spark.benchmark.Benchmark /** * Benchmark to measure whole stage codegen performance. - * To run this: - * build/sbt "sql/test-only *benchmark.MiscBenchmark" - * - * Benchmarks in this file are skipped in normal builds. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/MiscBenchmark-results.txt". + * }}} */ -class MiscBenchmark extends BenchmarkWithCodegen { +object MiscBenchmark extends SqlBasedBenchmark { - ignore("filter & aggregate without group") { - val N = 500L << 22 - runBenchmark("range/filter/sum", N) { - sparkSession.range(N).filter("(id & 1) = 1").groupBy().sum().collect() + def filterAndAggregateWithoutGroup(numRows: Long): Unit = { + runBenchmark("filter & aggregate without group") { + codegenBenchmark("range/filter/sum", numRows) { + spark.range(numRows).filter("(id & 1) = 1").groupBy().sum().collect() + } } - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - range/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - range/filter/sum codegen=false 30663 / 31216 68.4 14.6 1.0X - range/filter/sum codegen=true 2399 / 2409 874.1 1.1 12.8X - */ } - ignore("range/limit/sum") { - val N = 500L << 20 - runBenchmark("range/limit/sum", N) { - sparkSession.range(N).limit(1000000).groupBy().sum().collect() + def limitAndAggregateWithoutGroup(numRows: Long): Unit = { + runBenchmark("range/limit/sum") { + codegenBenchmark("range/limit/sum", numRows) { + spark.range(numRows).limit(1000000).groupBy().sum().collect() + } } - /* - Westmere E56xx/L56xx/X56xx (Nehalem-C) - range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - range/limit/sum codegen=false 609 / 672 861.6 1.2 1.0X - range/limit/sum codegen=true 561 / 621 935.3 1.1 1.1X - */ } - ignore("sample") { - val N = 500 << 18 - runBenchmark("sample with replacement", N) { - sparkSession.range(N).sample(withReplacement = true, 0.01).groupBy().sum().collect() - } - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz + def sample(numRows: Int): Unit = { + runBenchmark("sample") { + codegenBenchmark("sample with replacement", numRows) { + spark.range(numRows).sample(withReplacement = true, 0.01).groupBy().sum().collect() + } - sample with replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - sample with replacement codegen=false 7073 / 7227 18.5 54.0 1.0X - sample with replacement codegen=true 5199 / 5203 25.2 39.7 1.4X - */ - - runBenchmark("sample without replacement", N) { - sparkSession.range(N).sample(withReplacement = false, 0.01).groupBy().sum().collect() + codegenBenchmark("sample without replacement", numRows) { + spark.range(numRows).sample(withReplacement = false, 0.01).groupBy().sum().collect() + } } - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11 - Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz - - sample without replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - sample without replacement codegen=false 1508 / 1529 86.9 11.5 1.0X - sample without replacement codegen=true 644 / 662 203.5 4.9 2.3X - */ } - ignore("collect") { - val N = 1 << 20 - - val benchmark = new Benchmark("collect", N) - benchmark.addCase("collect 1 million") { iter => - sparkSession.range(N).collect() - } - benchmark.addCase("collect 2 millions") { iter => - sparkSession.range(N * 2).collect() - } - benchmark.addCase("collect 4 millions") { iter => - sparkSession.range(N * 4).collect() + def collect(numRows: Int): Unit = { + runBenchmark("collect") { + val benchmark = new Benchmark("collect", numRows, output = output) + benchmark.addCase("collect 1 million") { iter => + spark.range(numRows).collect() + } + benchmark.addCase("collect 2 millions") { iter => + spark.range(numRows * 2).collect() + } + benchmark.addCase("collect 4 millions") { iter => + spark.range(numRows * 4).collect() + } + benchmark.run() } - benchmark.run() - - /* - Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - collect: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - collect 1 million 439 / 654 2.4 418.7 1.0X - collect 2 millions 961 / 1907 1.1 916.4 0.5X - collect 4 millions 3193 / 3895 0.3 3044.7 0.1X - */ } - ignore("collect limit") { - val N = 1 << 20 - - val benchmark = new Benchmark("collect limit", N) - benchmark.addCase("collect limit 1 million") { iter => - sparkSession.range(N * 4).limit(N).collect() + def collectLimit(numRows: Int): Unit = { + runBenchmark("collect limit") { + val benchmark = new Benchmark("collect limit", numRows, output = output) + benchmark.addCase("collect limit 1 million") { iter => + spark.range(numRows * 4).limit(numRows).collect() + } + benchmark.addCase("collect limit 2 millions") { iter => + spark.range(numRows * 4).limit(numRows * 2).collect() + } + benchmark.run() } - benchmark.addCase("collect limit 2 millions") { iter => - sparkSession.range(N * 4).limit(N * 2).collect() - } - benchmark.run() - - /* - model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) - collect limit: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------- - collect limit 1 million 833 / 1284 1.3 794.4 1.0X - collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X - */ } - ignore("generate explode") { - val N = 1 << 24 - runBenchmark("generate explode array", N) { - val df = sparkSession.range(N).selectExpr( - "id as key", - "array(rand(), rand(), rand(), rand(), rand()) as values") - df.selectExpr("key", "explode(values) value").count() - } - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 - Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz - - generate explode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - generate explode array wholestage off 6920 / 7129 2.4 412.5 1.0X - generate explode array wholestage on 623 / 646 26.9 37.1 11.1X - */ - - runBenchmark("generate explode map", N) { - val df = sparkSession.range(N).selectExpr( - "id as key", - "map('a', rand(), 'b', rand(), 'c', rand(), 'd', rand(), 'e', rand()) pairs") - df.selectExpr("key", "explode(pairs) as (k, v)").count() - } - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 - Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz - - generate explode map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - generate explode map wholestage off 11978 / 11993 1.4 714.0 1.0X - generate explode map wholestage on 866 / 919 19.4 51.6 13.8X - */ - - runBenchmark("generate posexplode array", N) { - val df = sparkSession.range(N).selectExpr( - "id as key", - "array(rand(), rand(), rand(), rand(), rand()) as values") - df.selectExpr("key", "posexplode(values) as (idx, value)").count() + def explode(numRows: Int): Unit = { + runBenchmark("generate explode") { + codegenBenchmark("generate explode array", numRows) { + val df = spark.range(numRows).selectExpr( + "id as key", + "array(rand(), rand(), rand(), rand(), rand()) as values") + df.selectExpr("key", "explode(values) value").count() + } + + codegenBenchmark("generate explode map", numRows) { + val df = spark.range(numRows).selectExpr( + "id as key", + "map('a', rand(), 'b', rand(), 'c', rand(), 'd', rand(), 'e', rand()) pairs") + df.selectExpr("key", "explode(pairs) as (k, v)").count() + } + + codegenBenchmark("generate posexplode array", numRows) { + val df = spark.range(numRows).selectExpr( + "id as key", + "array(rand(), rand(), rand(), rand(), rand()) as values") + df.selectExpr("key", "posexplode(values) as (idx, value)").count() + } + + codegenBenchmark("generate inline array", numRows) { + val df = spark.range(numRows).selectExpr( + "id as key", + "array((rand(), rand()), (rand(), rand()), (rand(), 0.0d)) as values") + df.selectExpr("key", "inline(values) as (r1, r2)").count() + } + + val M = 60000 + codegenBenchmark("generate big struct array", M) { + import spark.implicits._ + val df = spark.sparkContext.parallelize(Seq(("1", + Array.fill(M)({ + val i = math.random + (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) + })))).toDF("col", "arr") + + df.selectExpr("*", "explode(arr) as arr_col") + .select("col", "arr_col.*").count + } } - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 - Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz - - generate posexplode array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - generate posexplode array wholestage off 7502 / 7513 2.2 447.1 1.0X - generate posexplode array wholestage on 617 / 623 27.2 36.8 12.2X - */ - - runBenchmark("generate inline array", N) { - val df = sparkSession.range(N).selectExpr( - "id as key", - "array((rand(), rand()), (rand(), rand()), (rand(), 0.0d)) as values") - df.selectExpr("key", "inline(values) as (r1, r2)").count() - } - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 - Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz - - generate inline array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - generate inline array wholestage off 6901 / 6928 2.4 411.3 1.0X - generate inline array wholestage on 1001 / 1010 16.8 59.7 6.9X - */ - - val M = 60000 - runBenchmark("generate big struct array", M) { - import sparkSession.implicits._ - val df = sparkSession.sparkContext.parallelize(Seq(("1", - Array.fill(M)({ - val i = math.random - (i.toString, (i + 1).toString, (i + 2).toString, (i + 3).toString) - })))).toDF("col", "arr") - - df.selectExpr("*", "expode(arr) as arr_col") - .select("col", "arr_col.*").count - } - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 - Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz - - test the impact of adding the optimization of Generate.unrequiredChildIndex, - we can see enormous improvement of x250 in this case! and it grows O(n^2). - - with Optimization ON: - - generate big struct array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - generate big struct array wholestage off 331 / 378 0.2 5524.9 1.0X - generate big struct array wholestage on 205 / 232 0.3 3413.1 1.6X - - with Optimization OFF: - - generate big struct array: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - generate big struct array wholestage off 49697 / 51496 0.0 828277.7 1.0X - generate big struct array wholestage on 50558 / 51434 0.0 842641.6 1.0X - */ - } - ignore("generate regular generator") { - val N = 1 << 24 - runBenchmark("generate stack", N) { - val df = sparkSession.range(N).selectExpr( - "id as key", - "id % 2 as t1", - "id % 3 as t2", - "id % 5 as t3", - "id % 7 as t4", - "id % 13 as t5") - df.selectExpr("key", "stack(4, t1, t2, t3, t4, t5)").count() + def stack(numRows: Int): Unit = { + runBenchmark("generate regular generator") { + codegenBenchmark("generate stack", numRows) { + val df = spark.range(numRows).selectExpr( + "id as key", + "id % 2 as t1", + "id % 3 as t2", + "id % 5 as t3", + "id % 7 as t4", + "id % 13 as t5") + df.selectExpr("key", "stack(4, t1, t2, t3, t4, t5)").count() + } } - - /* - Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.11.6 - Intel(R) Core(TM) i7-4980HQ CPU @ 2.80GHz - - generate stack: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative - ------------------------------------------------------------------------------------------------ - generate stack wholestage off 12953 / 13070 1.3 772.1 1.0X - generate stack wholestage on 836 / 847 20.1 49.8 15.5X - */ } + override def runBenchmarkSuite(): Unit = { + filterAndAggregateWithoutGroup(500L << 22) + limitAndAggregateWithoutGroup(500L << 20) + sample(500 << 18) + collect(1 << 20) + collectLimit(1 << 20) + explode(1 << 24) + stack(1 << 24) + } }