Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Enable shuffle by default #881

Merged
merged 12 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
.doc(
"Whether to enable Comet native shuffle. By default, this config is false. " +
"Whether to enable Comet native shuffle. " +
"Note that this requires setting 'spark.shuffle.manager' to " +
"'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. " +
"'spark.shuffle.manager' must be set before starting the Spark application and " +
"cannot be changed during the application.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to fix default value in description

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably automate adding the text stating the default value. Thanks for catching that.


val COMET_SHUFFLE_MODE: ConfigEntry[String] = conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.mode")
.doc("The mode of Comet shuffle. This config is only effective if Comet shuffle " +
Expand All @@ -198,6 +198,7 @@ object CometConf extends ShimCometConf {
"'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. " +
"'auto' is for Comet to choose the best shuffle mode based on the query plan. " +
"By default, this config is 'auto'.")
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("native", "jvm", "auto"))
Expand Down
3 changes: 1 addition & 2 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'auto'. | auto |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false",
EXTENDED_EXPLAIN_PROVIDERS_KEY -> "org.apache.comet.ExtendedExplainInfo") {
val table = "test"
withTable(table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,13 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
(-0.0.asInstanceOf[Float], 2),
(0.0.asInstanceOf[Float], 3),
(Float.NaN, 4))
withParquetTable(data, "tbl", dictionaryEnabled) {
checkSparkAnswer("SELECT SUM(_2), MIN(_2), MAX(_2), _1 FROM tbl GROUP BY _1")
checkSparkAnswer("SELECT MIN(_1), MAX(_1), MIN(_2), MAX(_2) FROM tbl")
checkSparkAnswer("SELECT AVG(_2), _1 FROM tbl GROUP BY _1")
checkSparkAnswer("SELECT AVG(_1), AVG(_2) FROM tbl")
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {
withParquetTable(data, "tbl", dictionaryEnabled) {
checkSparkAnswer("SELECT SUM(_2), MIN(_2), MAX(_2), _1 FROM tbl GROUP BY _1")
checkSparkAnswer("SELECT MIN(_1), MAX(_1), MIN(_2), MAX(_2) FROM tbl")
checkSparkAnswer("SELECT AVG(_2), _1 FROM tbl GROUP BY _1")
checkSparkAnswer("SELECT AVG(_1), AVG(_2) FROM tbl")
}
}
}
}
Expand Down Expand Up @@ -582,15 +584,23 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withParquetTable(path.toUri.toString, "tbl") {
withView("v") {
sql("CREATE TEMP VIEW v AS SELECT _g1, _g2, _3 FROM tbl ORDER BY _3")
checkSparkAnswer("SELECT _g1, _g2, FIRST(_3) FROM v GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, LAST(_3) FROM v GROUP BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, FIRST(_3) FROM v GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, LAST(_3) FROM v GROUP BY _g1, _g2 ORDER BY _g1, _g2")
}
checkSparkAnswer("SELECT _g1, _g2, SUM(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, COUNT(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, SUM(DISTINCT _3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, COUNT(DISTINCT _3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, MIN(_3), MAX(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer("SELECT _g1, _g2, AVG(_3) FROM tbl GROUP BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, SUM(_3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, COUNT(_3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, SUM(DISTINCT _3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, COUNT(DISTINCT _3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, MIN(_3), MAX(_3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
checkSparkAnswer(
"SELECT _g1, _g2, AVG(_3) FROM tbl GROUP BY _g1, _g2 ORDER BY _g1, _g2")
}
}
}
Expand All @@ -603,7 +613,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withTable("t") {
sql("CREATE TABLE t(v VARCHAR(3), i INT) USING PARQUET")
sql("INSERT INTO t VALUES ('c', 1)")
checkSparkAnswerAndNumOfAggregates("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v", 1)
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {
checkSparkAnswerAndNumOfAggregates("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v", 1)
}
}
}

Expand All @@ -623,19 +635,22 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
withView("v") {
sql("CREATE TEMP VIEW v AS SELECT _g3, _g4, _3, _4 FROM tbl ORDER BY _3, _4")
checkSparkAnswer(
"SELECT _g3, _g4, FIRST(_3), FIRST(_4) FROM v GROUP BY _g3, _g4")
checkSparkAnswer("SELECT _g3, _g4, LAST(_3), LAST(_4) FROM v GROUP BY _g3, _g4")
"SELECT _g3, _g4, FIRST(_3), FIRST(_4) FROM v GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, LAST(_3), LAST(_4) FROM v GROUP BY _g3, _g4 ORDER BY _g3, _g4")
}
checkSparkAnswer("SELECT _g3, _g4, SUM(_3), SUM(_4) FROM tbl GROUP BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, SUM(DISTINCT _3), SUM(DISTINCT _4) FROM tbl GROUP BY _g3, _g4")
"SELECT _g3, _g4, SUM(_3), SUM(_4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, SUM(DISTINCT _3), SUM(DISTINCT _4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g3, _g4")
"SELECT _g3, _g4, COUNT(_3), COUNT(_4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, COUNT(DISTINCT _3), COUNT(DISTINCT _4) FROM tbl GROUP BY _g3, _g4")
"SELECT _g3, _g4, COUNT(DISTINCT _3), COUNT(DISTINCT _4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, MIN(_3), MAX(_3), MIN(_4), MAX(_4) FROM tbl GROUP BY _g3, _g4")
checkSparkAnswer("SELECT _g3, _g4, AVG(_3), AVG(_4) FROM tbl GROUP BY _g3, _g4")
"SELECT _g3, _g4, MIN(_3), MAX(_3), MIN(_4), MAX(_4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
checkSparkAnswer(
"SELECT _g3, _g4, AVG(_3), AVG(_4) FROM tbl GROUP BY _g3, _g4 ORDER BY _g3, _g4")
}
}
}
Expand All @@ -654,21 +669,24 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
makeParquetFile(path, numValues, numGroups, dictionaryEnabled)
withParquetTable(path.toUri.toString, "tbl") {
Seq(128, numValues + 100).foreach { batchSize =>
withSQLConf(CometConf.COMET_BATCH_SIZE.key -> batchSize.toString) {
withSQLConf(
CometConf.COMET_BATCH_SIZE.key -> batchSize.toString,
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {

// Test all combinations of different aggregation & group-by types
(1 to 14).foreach { gCol =>
withView("v") {
sql(s"CREATE TEMP VIEW v AS SELECT _g$gCol, _1, _2, _3, _4 " +
"FROM tbl ORDER BY _1, _2, _3, _4")
checkSparkAnswer(s"SELECT _g$gCol, FIRST(_1), FIRST(_2), FIRST(_3), " +
s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol")
s"FIRST(_4), LAST(_1), LAST(_2), LAST(_3), LAST(_4) FROM v GROUP BY _g$gCol ORDER BY _g$gCol")
}
checkSparkAnswer(s"SELECT _g$gCol, SUM(_1), SUM(_2), COUNT(_3), COUNT(_4), " +
s"MIN(_1), MAX(_4), AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol")
checkSparkAnswer(s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol")
s"MIN(_1), MAX(_4), AVG(_2), AVG(_4) FROM tbl GROUP BY _g$gCol ORDER BY _g$gCol")
checkSparkAnswer(
s"SELECT _g$gCol, SUM(DISTINCT _3) FROM tbl GROUP BY _g$gCol ORDER BY _g$gCol")
checkSparkAnswer(
s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol")
s"SELECT _g$gCol, COUNT(DISTINCT _1) FROM tbl GROUP BY _g$gCol ORDER BY _g$gCol")
}
}
}
Expand Down Expand Up @@ -837,7 +855,9 @@ class CometAggregateSuite extends CometTestBase with AdaptiveSparkPlanHelper {
(0 until 5).map(i => (i.toDouble, i.toDouble % 2)),
"tbl",
dictionaryEnabled) {
checkSparkAnswerAndNumOfAggregates("SELECT _2 , AVG(_1) FROM tbl GROUP BY _2", 1)
withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {
checkSparkAnswerAndNumOfAggregates("SELECT _2 , AVG(_1) FROM tbl GROUP BY _2", 1)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ abstract class CometTestBase
conf.set(MEMORY_OFFHEAP_SIZE.key, "2g")
conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
conf.set(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key, "1g")
// TODO we should no longer be disabling COALESCE_PARTITIONS_ENABLED
conf.set(SQLConf.COALESCE_PARTITIONS_ENABLED.key, "false")
Comment on lines +78 to 79
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but we should fix this in a separate PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it in https://github.com/apache/datafusion-comet/pull/553/files#r1730694991, in order to trigger a test case failed in current code.

conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
conf
Expand Down
Loading