-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25664][SQL][TEST] Refactor JoinBenchmark to use main method #22661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4339b1c
4859a9f
2baaf35
00c4950
3be13b1
28f9b9a
cd8b664
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| ================================================================================================ | ||
| Join Benchmark | ||
| ================================================================================================ | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| Join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| Join w long wholestage off 4464 / 4483 4.7 212.9 1.0X | ||
| Join w long wholestage on 289 / 339 72.6 13.8 15.5X | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| Join w long duplicated wholestage off 5662 / 5678 3.7 270.0 1.0X | ||
| Join w long duplicated wholestage on 332 / 345 63.1 15.8 17.0X | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| Join w 2 ints wholestage off 173174 / 173183 0.1 8257.6 1.0X | ||
| Join w 2 ints wholestage on 166350 / 198362 0.1 7932.2 1.0X | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this surprises me that whole stage codegen doesn't help. We should investigate it later.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. |
||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| Join w 2 longs wholestage off 7055 / 7214 3.0 336.4 1.0X | ||
| Join w 2 longs wholestage on 1869 / 1985 11.2 89.1 3.8X | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| Join w 2 longs duplicated wholestage off 19256 / 20283 1.1 918.2 1.0X | ||
| Join w 2 longs duplicated wholestage on 2467 / 2544 8.5 117.7 7.8X | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| outer join w long wholestage off 3756 / 3761 5.6 179.1 1.0X | ||
| outer join w long wholestage on 218 / 250 96.2 10.4 17.2X | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| semi join w long wholestage off 2393 / 2416 8.8 114.1 1.0X | ||
| semi join w long wholestage on 214 / 218 97.9 10.2 11.2X | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| sort merge join wholestage off 2318 / 2392 0.9 1105.3 1.0X | ||
| sort merge join wholestage on 1669 / 1811 1.3 795.9 1.4X | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| sort merge join with duplicates: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| sort merge join with duplicates wholestage off 2966 / 2976 0.7 1414.5 1.0X | ||
| sort merge join with duplicates wholestage on 2413 / 2641 0.9 1150.5 1.2X | ||
|
|
||
| OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 | ||
| Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz | ||
| shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| shuffle hash join wholestage off 1475 / 1479 2.8 351.7 1.0X | ||
| shuffle hash join wholestage on 1209 / 1238 3.5 288.3 1.2X | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,229 +19,163 @@ package org.apache.spark.sql.execution.benchmark | |
|
|
||
| import org.apache.spark.sql.execution.joins._ | ||
| import org.apache.spark.sql.functions._ | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types.IntegerType | ||
|
|
||
| /** | ||
| * Benchmark to measure performance for aggregate primitives. | ||
| * To run this: | ||
| * build/sbt "sql/test-only *benchmark.JoinBenchmark" | ||
| * | ||
| * Benchmarks in this file are skipped in normal builds. | ||
| * Benchmark to measure performance for joins. | ||
| * To run this benchmark: | ||
| * {{{ | ||
| * 1. without sbt: | ||
| * bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar> | ||
| * 2. build/sbt "sql/test:runMain <this class>" | ||
| * 3. generate result: | ||
| * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" | ||
| * Results will be written to "benchmarks/JoinBenchmark-results.txt". | ||
| * }}} | ||
| */ | ||
| class JoinBenchmark extends BenchmarkWithCodegen { | ||
| object JoinBenchmark extends SqlBasedBenchmark { | ||
|
|
||
| ignore("broadcast hash join, long key") { | ||
| def broadcastHashJoinLongKey(): Unit = { | ||
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
|
|
||
| val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) | ||
| runBenchmark("Join w long", N) { | ||
| val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k")) | ||
| val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as string) as v")) | ||
| codegenBenchmark("Join w long", N) { | ||
| val df = spark.range(N).join(dim, (col("id") % M) === col("k")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 | ||
| Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| Join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------- | ||
| Join w long codegen=false 3002 / 3262 7.0 143.2 1.0X | ||
| Join w long codegen=true 321 / 371 65.3 15.3 9.3X | ||
| */ | ||
| } | ||
|
|
||
| ignore("broadcast hash join, long key with duplicates") { | ||
| def broadcastHashJoinLongKeyWithDuplicates(): Unit = { | ||
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
|
|
||
| val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, this is a removal of redundant one, right?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes |
||
| runBenchmark("Join w long duplicated", N) { | ||
| val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as long) as k")) | ||
| val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k")) | ||
| val dim = broadcast(spark.range(M).selectExpr("cast(id/10 as long) as k")) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For this change, we need rerun the benchmark to get a new result. |
||
| codegenBenchmark("Join w long duplicated", N) { | ||
| val df = spark.range(N).join(dim, (col("id") % M) === col("k")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 | ||
| *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| *Join w long duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *Join w long duplicated codegen=false 3446 / 3478 6.1 164.3 1.0X | ||
| *Join w long duplicated codegen=true 322 / 351 65.2 15.3 10.7X | ||
| */ | ||
| } | ||
|
|
||
| ignore("broadcast hash join, two int key") { | ||
| def broadcastHashJoinTwoIntKey(): Unit = { | ||
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
| val dim2 = broadcast(sparkSession.range(M) | ||
| val dim2 = broadcast(spark.range(M) | ||
| .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id as string) as v")) | ||
|
|
||
| runBenchmark("Join w 2 ints", N) { | ||
| val df = sparkSession.range(N).join(dim2, | ||
| codegenBenchmark("Join w 2 ints", N) { | ||
| val df = spark.range(N).join(dim2, | ||
| (col("id") % M).cast(IntegerType) === col("k1") | ||
| && (col("id") % M).cast(IntegerType) === col("k2")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 | ||
| *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| *Join w 2 ints: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *Join w 2 ints codegen=false 4426 / 4501 4.7 211.1 1.0X | ||
| *Join w 2 ints codegen=true 791 / 818 26.5 37.7 5.6X | ||
| */ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, @cloud-fan , @gatorsmile , @davies , @rxin . We are hitting some performance slowdown in benchmark. However, this is not a regression because it's consistent in 2.0.2 ~ 2.4.0-rc3. According to the original performance number, it seems to be a result when Did we really want to measure the difference in
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any advice is welcome and thank you in advance, @cloud-fan , @gatorsmile , @davies , @rxin .
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems caused by the bug fix: #15390 So the performance is reasonable.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for confirmation, @cloud-fan ! |
||
| } | ||
|
|
||
| ignore("broadcast hash join, two long key") { | ||
| def broadcastHashJoinTwoLongKey(): Unit = { | ||
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
| val dim3 = broadcast(sparkSession.range(M) | ||
| val dim3 = broadcast(spark.range(M) | ||
| .selectExpr("id as k1", "id as k2", "cast(id as string) as v")) | ||
|
|
||
| runBenchmark("Join w 2 longs", N) { | ||
| val df = sparkSession.range(N).join(dim3, | ||
| codegenBenchmark("Join w 2 longs", N) { | ||
| val df = spark.range(N).join(dim3, | ||
| (col("id") % M) === col("k1") && (col("id") % M) === col("k2")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 | ||
| *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| *Join w 2 longs: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *Join w 2 longs codegen=false 5905 / 6123 3.6 281.6 1.0X | ||
| *Join w 2 longs codegen=true 2230 / 2529 9.4 106.3 2.6X | ||
| */ | ||
| } | ||
|
|
||
| ignore("broadcast hash join, two long key with duplicates") { | ||
| def broadcastHashJoinTwoLongKeyWithDuplicates(): Unit = { | ||
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
| val dim4 = broadcast(sparkSession.range(M) | ||
| val dim4 = broadcast(spark.range(M) | ||
| .selectExpr("cast(id/10 as long) as k1", "cast(id/10 as long) as k2")) | ||
|
|
||
| runBenchmark("Join w 2 longs duplicated", N) { | ||
| val df = sparkSession.range(N).join(dim4, | ||
| codegenBenchmark("Join w 2 longs duplicated", N) { | ||
| val df = spark.range(N).join(dim4, | ||
| (col("id") bitwiseAND M) === col("k1") && (col("id") bitwiseAND M) === col("k2")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| *Join w 2 longs duplicated: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *Join w 2 longs duplicated codegen=false 6420 / 6587 3.3 306.1 1.0X | ||
| *Join w 2 longs duplicated codegen=true 2080 / 2139 10.1 99.2 3.1X | ||
| */ | ||
| } | ||
|
|
||
| ignore("broadcast hash join, outer join long key") { | ||
| def broadcastHashJoinOuterJoinLongKey(): Unit = { | ||
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
| val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) | ||
| runBenchmark("outer join w long", N) { | ||
| val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "left") | ||
| val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as string) as v")) | ||
| codegenBenchmark("outer join w long", N) { | ||
| val df = spark.range(N).join(dim, (col("id") % M) === col("k"), "left") | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 | ||
| *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| *outer join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *outer join w long codegen=false 3055 / 3189 6.9 145.7 1.0X | ||
| *outer join w long codegen=true 261 / 276 80.5 12.4 11.7X | ||
| */ | ||
| } | ||
|
|
||
| ignore("broadcast hash join, semi join long key") { | ||
| def broadcastHashJoinSemiJoinLongKey(): Unit = { | ||
| val N = 20 << 20 | ||
| val M = 1 << 16 | ||
| val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) | ||
| runBenchmark("semi join w long", N) { | ||
| val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k"), "leftsemi") | ||
| val dim = broadcast(spark.range(M).selectExpr("id as k", "cast(id as string) as v")) | ||
| codegenBenchmark("semi join w long", N) { | ||
| val df = spark.range(N).join(dim, (col("id") % M) === col("k"), "leftsemi") | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| *Java HotSpot(TM) 64-Bit Server VM 1.7.0_60-b19 on Mac OS X 10.9.5 | ||
| *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| *semi join w long: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *semi join w long codegen=false 1912 / 1990 11.0 91.2 1.0X | ||
| *semi join w long codegen=true 237 / 244 88.3 11.3 8.1X | ||
| */ | ||
| } | ||
|
|
||
| ignore("sort merge join") { | ||
| def sortMergeJoin(): Unit = { | ||
| val N = 2 << 20 | ||
| runBenchmark("merge join", N) { | ||
| val df1 = sparkSession.range(N).selectExpr(s"id * 2 as k1") | ||
| val df2 = sparkSession.range(N).selectExpr(s"id * 3 as k2") | ||
| codegenBenchmark("sort merge join", N) { | ||
| val df1 = spark.range(N).selectExpr(s"id * 2 as k1") | ||
| val df2 = spark.range(N).selectExpr(s"id * 3 as k2") | ||
| val df = df1.join(df2, col("k1") === col("k2")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| *merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *merge join codegen=false 1588 / 1880 1.3 757.1 1.0X | ||
| *merge join codegen=true 1477 / 1531 1.4 704.2 1.1X | ||
| */ | ||
| } | ||
|
|
||
| ignore("sort merge join with duplicates") { | ||
| def sortMergeJoinWithDuplicates(): Unit = { | ||
| val N = 2 << 20 | ||
| runBenchmark("sort merge join", N) { | ||
| val df1 = sparkSession.range(N) | ||
| codegenBenchmark("sort merge join with duplicates", N) { | ||
| val df1 = spark.range(N) | ||
| .selectExpr(s"(id * 15485863) % ${N*10} as k1") | ||
| val df2 = sparkSession.range(N) | ||
| val df2 = spark.range(N) | ||
| .selectExpr(s"(id * 15485867) % ${N*10} as k2") | ||
| val df = df1.join(df2, col("k1") === col("k2")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
|
|
||
| /* | ||
| *Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| *sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *sort merge join codegen=false 3626 / 3667 0.6 1728.9 1.0X | ||
| *sort merge join codegen=true 3405 / 3438 0.6 1623.8 1.1X | ||
| */ | ||
| } | ||
|
|
||
| ignore("shuffle hash join") { | ||
| val N = 4 << 20 | ||
| sparkSession.conf.set("spark.sql.shuffle.partitions", "2") | ||
| sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", "10000000") | ||
| sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false") | ||
| runBenchmark("shuffle hash join", N) { | ||
| val df1 = sparkSession.range(N).selectExpr(s"id as k1") | ||
| val df2 = sparkSession.range(N / 3).selectExpr(s"id * 3 as k2") | ||
| val df = df1.join(df2, col("k1") === col("k2")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined) | ||
| df.count() | ||
| def shuffleHashJoin(): Unit = { | ||
| val N: Long = 4 << 20 | ||
| withSQLConf( | ||
| SQLConf.SHUFFLE_PARTITIONS.key -> "2", | ||
| SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000", | ||
| SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { | ||
| codegenBenchmark("shuffle hash join", N) { | ||
| val df1 = spark.range(N).selectExpr(s"id as k1") | ||
| val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2") | ||
| val df = df1.join(df2, col("k1") === col("k2")) | ||
| assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined) | ||
| df.count() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /* | ||
| *Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1 | ||
| *Intel64 Family 6 Model 94 Stepping 3, GenuineIntel | ||
| *shuffle hash join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| *------------------------------------------------------------------------------------------- | ||
| *shuffle hash join codegen=false 2005 / 2010 2.1 478.0 1.0X | ||
| *shuffle hash join codegen=true 1773 / 1792 2.4 422.7 1.1X | ||
| */ | ||
| override def runBenchmarkSuite(): Unit = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you wrap the followings(line 168~177) with something like |
||
| runBenchmark("Join Benchmark") { | ||
| broadcastHashJoinLongKey() | ||
| broadcastHashJoinLongKeyWithDuplicates() | ||
| broadcastHashJoinTwoIntKey() | ||
| broadcastHashJoinTwoLongKey() | ||
| broadcastHashJoinTwoLongKeyWithDuplicates() | ||
| broadcastHashJoinOuterJoinLongKey() | ||
| broadcastHashJoinSemiJoinLongKey() | ||
| sortMergeJoin() | ||
| sortMergeJoinWithDuplicates() | ||
| shuffleHashJoin() | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the Mac has one more line than Linux:
28f9b9a#diff-45c96c65f7c46bc2d84843a7cb92f22fL7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur.. I'm not a fan to piggy-backing. Okay.