Skip to content

Commit

Permalink
chore: add more aggregate functions to benchmark test (apache#706)
Browse files Browse the repository at this point in the history
* chore: add more aggregate functions to benchmark test

* delete benchmark result

(cherry picked from commit 0895aa6)
  • Loading branch information
huaxingao committed Aug 7, 2024
1 parent 63f8b20 commit a320fba
Showing 1 changed file with 58 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
session
}

def singleGroupAndAggregate(values: Int, groupingKeyCardinality: Int): Unit = {
def singleGroupAndAggregate(
values: Int,
groupingKeyCardinality: Int,
aggregateFunction: String): Unit = {
val benchmark =
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), single aggregate",
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " +
s"single aggregate $aggregateFunction",
values,
output = output)

Expand All @@ -54,19 +58,19 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
dir,
spark.sql(s"SELECT value, floor(rand() * $groupingKeyCardinality) as key FROM $tbl"))

val query = "SELECT key, SUM(value) FROM parquetV1Table GROUP BY key"
val query = s"SELECT key, $aggregateFunction(value) FROM parquetV1Table GROUP BY key"

benchmark.addCase("SQL Parquet - Spark") { _ =>
benchmark.addCase(s"SQL Parquet - Spark ($aggregateFunction)") { _ =>
spark.sql(query).noop()
}

benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(query).noop()
}
}

benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
Expand All @@ -83,10 +87,12 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
def singleGroupAndAggregateDecimal(
values: Int,
dataType: DecimalType,
groupingKeyCardinality: Int): Unit = {
groupingKeyCardinality: Int,
aggregateFunction: String): Unit = {
val benchmark =
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), single aggregate on decimal",
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCardinality), " +
s"single aggregate $aggregateFunction on decimal",
values,
output = output)

Expand All @@ -100,19 +106,19 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
spark.sql(
s"SELECT dec as value, floor(rand() * $groupingKeyCardinality) as key FROM $tbl"))

val query = "SELECT key, SUM(value) FROM parquetV1Table GROUP BY key"
val query = s"SELECT key, $aggregateFunction(value) FROM parquetV1Table GROUP BY key"

benchmark.addCase("SQL Parquet - Spark") { _ =>
benchmark.addCase(s"SQL Parquet - Spark ($aggregateFunction)") { _ =>
spark.sql(query).noop()
}

benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(query).noop()
}
}

benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
Expand All @@ -126,10 +132,11 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
}
}

def multiGroupKeys(values: Int, groupingKeyCard: Int): Unit = {
def multiGroupKeys(values: Int, groupingKeyCard: Int, aggregateFunction: String): Unit = {
val benchmark =
new Benchmark(
s"Grouped HashAgg Exec: multiple group keys (cardinality $groupingKeyCard), single aggregate",
s"Grouped HashAgg Exec: multiple group keys (cardinality $groupingKeyCard), " +
s"single aggregate $aggregateFunction",
values,
output = output)

Expand All @@ -141,21 +148,22 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
s"SELECT value, floor(rand() * $groupingKeyCard) as key1, " +
s"floor(rand() * $groupingKeyCard) as key2 FROM $tbl"))

val query = "SELECT key1, key2, SUM(value) FROM parquetV1Table GROUP BY key1, key2"
val query =
s"SELECT key1, key2, $aggregateFunction(value) FROM parquetV1Table GROUP BY key1, key2"

benchmark.addCase("SQL Parquet - Spark") { _ =>
benchmark.addCase(s"SQL Parquet - Spark ($aggregateFunction)") { _ =>
spark.sql(query).noop()
}

benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_MEMORY_OVERHEAD.key -> "1G") {
spark.sql(query).noop()
}
}

benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
Expand All @@ -170,10 +178,11 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
}
}

def multiAggregates(values: Int, groupingKeyCard: Int): Unit = {
def multiAggregates(values: Int, groupingKeyCard: Int, aggregateFunction: String): Unit = {
val benchmark =
new Benchmark(
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCard), multiple aggregates",
s"Grouped HashAgg Exec: single group key (cardinality $groupingKeyCard), " +
s"multiple aggregates $aggregateFunction",
values,
output = output)

Expand All @@ -185,19 +194,20 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
s"SELECT value as value1, value as value2, floor(rand() * $groupingKeyCard) as key " +
s"FROM $tbl"))

val query = "SELECT key, SUM(value1), SUM(value2) FROM parquetV1Table GROUP BY key"
val query = s"SELECT key, $aggregateFunction(value1), $aggregateFunction(value2) " +
"FROM parquetV1Table GROUP BY key"

benchmark.addCase("SQL Parquet - Spark") { _ =>
benchmark.addCase(s"SQL Parquet - Spark ($aggregateFunction)") { _ =>
spark.sql(query).noop()
}

benchmark.addCase("SQL Parquet - Comet (Scan)") { _ =>
benchmark.addCase(s"SQL Parquet - Comet (Scan) ($aggregateFunction)") { _ =>
withSQLConf(CometConf.COMET_ENABLED.key -> "true") {
spark.sql(query).noop()
}
}

benchmark.addCase("SQL Parquet - Comet (Scan, Exec)") { _ =>
benchmark.addCase(s"SQL Parquet - Comet (Scan, Exec) ($aggregateFunction)") { _ =>
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
Expand All @@ -214,32 +224,39 @@ object CometAggregateBenchmark extends CometBenchmarkBase {
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
val total = 1024 * 1024 * 10
val combinations = List(100, 1024, 1024 * 1024) // number of distinct groups
val aggregateFunctions = List("SUM", "MIN", "MAX", "COUNT")

runBenchmarkWithTable("Grouped Aggregate (single group key + single aggregate)", total) { v =>
for (card <- combinations) {
singleGroupAndAggregate(v, card)
aggregateFunctions.foreach { aggFunc =>
runBenchmarkWithTable(
s"Grouped Aggregate (single group key + single aggregate $aggFunc)",
total) { v =>
for (card <- combinations) {
singleGroupAndAggregate(v, card, aggFunc)
}
}
}

runBenchmarkWithTable("Grouped Aggregate (multiple group keys + single aggregate)", total) {
v =>
runBenchmarkWithTable(
s"Grouped Aggregate (multiple group keys + single aggregate $aggFunc)",
total) { v =>
for (card <- combinations) {
multiGroupKeys(v, card)
multiGroupKeys(v, card, aggFunc)
}
}
}

runBenchmarkWithTable("Grouped Aggregate (single group key + multiple aggregates)", total) {
v =>
runBenchmarkWithTable(
s"Grouped Aggregate (single group key + multiple aggregates $aggFunc)",
total) { v =>
for (card <- combinations) {
multiAggregates(v, card)
multiAggregates(v, card, aggFunc)
}
}
}

runBenchmarkWithTable(
"Grouped Aggregate (single group key + single aggregate on decimal)",
total) { v =>
for (card <- combinations) {
singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card)
runBenchmarkWithTable(
s"Grouped Aggregate (single group key + single aggregate $aggFunc on decimal)",
total) { v =>
for (card <- combinations) {
singleGroupAndAggregateDecimal(v, DecimalType(18, 10), card, aggFunc)
}
}
}
}
Expand Down

0 comments on commit a320fba

Please sign in to comment.