Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement cost-based optimizer to avoid regressions #591

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_CBO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.cbo.enabled")
.doc(
"Cost-based optimizer to avoid performance regressions where Comet plan may " +
"be slower than Spark plan.")
.booleanConf
.createWithDefault(false)

}

object ConfigHelpers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,197 +5,198 @@
"query_path": "../../tpch/queries",
"spark_conf": {
"spark.comet.explainFallback.enabled": "true",
"spark.eventLog.enabled": "true",
"spark.jars": "file:///home/andy/git/apache/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.comet.cast.allowIncompatible": "true",
"spark.app.startTime": "1719691158901",
"spark.executor.extraClassPath": "/home/andy/git/apache/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.executor.memory": "8G",
"spark.comet.exec.shuffle.enabled": "true",
"spark.app.name": "DataFusion Comet Benchmark derived from TPC-H / TPC-DS",
"spark.driver.port": "36573",
"spark.sql.adaptive.coalescePartitions.enabled": "false",
"spark.app.startTime": "1716923498046",
"spark.app.id": "app-20240629135919-0008",
"spark.comet.batchSize": "8192",
"spark.app.id": "app-20240528131138-0043",
"spark.serializer.objectStreamReset": "100",
"spark.app.initial.jar.urls": "spark://woody.lan:36573/jars/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.driver.host": "10.0.0.118",
"spark.submit.deployMode": "client",
"spark.sql.autoBroadcastJoinThreshold": "-1",
"spark.comet.exec.all.enabled": "true",
"spark.eventLog.enabled": "false",
"spark.driver.host": "woody.lan",
"spark.executor.cores": "8",
"spark.driver.extraJavaOptions": "-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false",
"spark.comet.shuffle.enforceMode.enabled": "true",
"spark.sql.warehouse.dir": "file:/home/andy/git/apache/datafusion-benchmarks/runners/datafusion-comet/spark-warehouse",
"spark.shuffle.manager": "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager",
"spark.comet.exec.enabled": "true",
"spark.app.submitTime": "1719691158623",
"spark.repl.local.jars": "file:///home/andy/git/apache/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.comet.exec.enabled": "true",
"spark.executor.id": "driver",
"spark.master": "spark://woody:7077",
"spark.executor.instances": "8",
"spark.comet.exec.shuffle.mode": "auto",
"spark.driver.port": "34629",
"spark.sql.extensions": "org.apache.comet.CometSparkSessionExtensions",
"spark.driver.memory": "8G",
"spark.driver.extraClassPath": "/home/andy/git/apache/datafusion-comet/spark/target/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.executor.memory": "32G",
"spark.rdd.compress": "True",
"spark.executor.extraJavaOptions": "-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false",
"spark.executor.instances": "1",
"spark.cores.max": "8",
"spark.comet.enabled": "true",
"spark.app.submitTime": "1716923497738",
"spark.submit.pyFiles": "",
"spark.executor.cores": "1",
"spark.comet.parquet.io.enabled": "false"
"spark.app.initial.jar.urls": "spark://10.0.0.118:34629/jars/comet-spark-spark3.4_2.12-0.1.0-SNAPSHOT.jar",
"spark.comet.cbo.enabled": "false"
},
"1": [
32.121661901474,
27.997092485427856,
27.756758451461792,
28.55236315727234,
28.332542181015015
28.735982179641724,
27.904003858566284,
27.98918342590332,
27.998026847839355,
27.7985897064209
],
"2": [
18.269107580184937,
16.200955629348755,
16.194639682769775,
16.745808839797974,
16.59864115715027
15.840301513671875,
15.137918710708618,
15.086657047271729,
15.252221584320068,
15.093742370605469
],
"3": [
17.265466690063477,
17.069786310195923,
17.12887978553772,
19.33678102493286,
18.182055234909058
18.124080181121826,
18.498253345489502,
18.420130252838135,
18.309802055358887,
18.46897006034851
],
"4": [
8.367004156112671,
8.172023296356201,
8.023266077041626,
8.350765228271484,
8.258736610412598
9.55617070198059,
9.518851518630981,
9.514896392822266,
9.583910465240479,
9.444581985473633
],
"5": [
34.10048794746399,
32.69314408302307,
33.21383595466614,
36.391114473342896,
39.00048065185547
33.23771286010742,
33.053247690200806,
32.84638738632202,
32.790276765823364,
32.90981197357178
],
"6": [
3.1693499088287354,
3.044705390930176,
3.047694206237793,
3.2817511558532715,
3.274174928665161
3.34500789642334,
2.9966821670532227,
3.0137181282043457,
2.9657068252563477,
2.919524908065796
],
"7": [
25.369214296340942,
24.020941257476807,
24.0787034034729,
28.47402787208557,
28.23443365097046
20.84096646308899,
20.373249053955078,
20.337918519973755,
20.32623314857483,
20.321190357208252
],
"8": [
40.06126809120178,
39.828824281692505,
45.250510454177856,
44.406742572784424,
48.98451232910156
36.99943470954895,
36.097434520721436,
36.08603119850159,
36.26709461212158,
36.22776746749878
],
"9": [
62.822797775268555,
61.26328158378601,
64.95581865310669,
69.51708793640137,
73.52380013465881
58.00954031944275,
56.75375247001648,
57.23253607749939,
57.04572892189026,
57.06179666519165
],
"10": [
20.55334782600403,
20.546096324920654,
20.57452392578125,
22.84211039543152,
23.724371671676636
19.51328682899475,
19.17092227935791,
19.110991716384888,
19.05888819694519,
19.292072534561157
],
"11": [
11.068235158920288,
10.715423822402954,
11.353424310684204,
11.37632942199707,
11.530814170837402
12.222111463546753,
12.186187267303467,
12.177972316741943,
12.100908517837524,
12.061741828918457
],
"12": [
10.264788389205933,
8.67864990234375,
8.845952033996582,
8.593009233474731,
8.540803909301758
7.657347679138184,
7.598176002502441,
7.568347930908203,
7.4833292961120605,
7.551736116409302
],
"13": [
9.603406190872192,
9.648627042770386,
13.040799140930176,
10.154011249542236,
9.716034412384033
9.64631199836731,
9.536576509475708,
9.564186096191406,
9.570204496383667,
9.662892580032349
],
"14": [
6.20926308631897,
6.0385496616363525,
7.674488544464111,
10.53052043914795,
7.661675691604614
6.022975921630859,
5.84771203994751,
6.049532175064087,
5.998222827911377,
5.899066925048828
],
"15": [
11.466301918029785,
11.473632097244263,
11.279382228851318,
13.291078329086304,
12.81026816368103
10.946545600891113,
10.68128228187561,
10.473867416381836,
10.72830843925476,
10.45834231376648
],
"16": [
8.096073865890503,
7.73410701751709,
7.742897272109985,
8.477537631988525,
7.821273326873779
7.951048851013184,
6.773421049118042,
6.630566120147705,
6.826274633407593,
6.515024185180664
],
"17": [
43.69264578819275,
43.33040428161621,
46.291987657547,
54.654345989227295,
54.37124800682068
46.03706979751587,
42.801599740982056,
42.59856081008911,
42.84500861167908,
42.899412870407104
],
"18": [
27.205485105514526,
26.785916090011597,
27.331408262252808,
29.946768760681152,
28.037617444992065
34.244925022125244,
31.239882469177246,
31.353251695632935,
31.224499940872192,
31.53875970840454
],
"19": [
8.100102186203003,
7.845783472061157,
8.52329158782959,
8.907397985458374,
9.13755488395691
7.07506251335144,
6.813824892044067,
6.79759407043457,
6.941055059432983,
6.83566427230835
],
"20": [
13.09695029258728,
12.683861255645752,
15.612725019454956,
13.361177206039429,
16.614356517791748
10.964829683303833,
10.757019996643066,
10.806366205215454,
10.990953922271729,
10.887315273284912
],
"21": [
43.69623780250549,
43.26758122444153,
46.91650056838989,
47.875754833221436,
57.9763662815094
44.07762622833252,
44.03535461425781,
43.978052377700806,
43.928617000579834,
43.93204379081726
],
"22": [
4.5090577602386475,
4.420571804046631,
4.639787673950195,
5.118046998977661,
5.017346143722534
4.871158599853516,
4.810696601867676,
4.873842239379883,
4.817774534225464,
4.927582740783691
]
}
11 changes: 6 additions & 5 deletions docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ $SPARK_HOME/bin/spark-submit \
$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
--conf spark.driver.memory=8G \
--conf spark.executor.memory=64G \
--conf spark.executor.instances=1 \
--conf spark.executor.memory=32G \
--conf spark.executor.cores=8 \
--conf spark.cores.max=8 \
--conf spark.sql.autoBroadcastJoinThreshold=-1 \
Expand All @@ -68,8 +69,8 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.comet.batchSize=8192 \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--conf spark.comet.shuffle.enforceMode.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
--conf spark.sql.adaptive.coalescePartitions.enabled=false \
tpcbench.py \
--benchmark tpch \
--data /mnt/bigdata/tpch/sf100/ \
Expand Down Expand Up @@ -99,7 +100,7 @@ is an ongoing task, and we welcome contributions from the community to help achi

The raw results of these benchmarks in JSON format is available here:

- [Spark](./benchmark-results/2024-05-30/spark-8-exec-5-runs.json)
- [Comet](./benchmark-results/2024-05-30/comet-8-exec-5-runs.json)
- [DataFusion](./benchmark-results/2024-05-30/datafusion-python-8-cores.json)
- [Spark](./benchmark-results/2024-06-29/spark-8-exec-5-runs.json)
- [Comet](./benchmark-results/2024-06-29/comet-8-exec-5-runs.json)
- [DataFusion](./benchmark-results/2024-06-29/datafusion-python-8-cores.json)

1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Comet provides the following configuration settings.
|--------|-------------|---------------|
| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 |
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. See compatibility guide for more information. | false |
| spark.comet.cbo.enabled | Cost-based optimizer to avoid performance regressions where Comet plan may be slower than Spark plan. | false |
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config is false. | false |
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
| spark.comet.columnar.shuffle.async.thread.num | Number of threads used for Comet async columnar shuffle per shuffle task. By default, this config is 3. Note that more threads means more memory requirement to buffer shuffle data before flushing to disk. Also, more threads may not always improve performance, and should be set based on the number of cores available. | 3 |
Expand Down
Loading
Loading