Skip to content

Commit

Permalink
Adding TPCH benchmarks for Sort Merge Join (apache#10092)
Browse files Browse the repository at this point in the history
* Adding TPCH bencmarks for Sort Merge Join

* Update benchmarks/bench.sh

Co-authored-by: Andy Grove <andygrove73@gmail.com>

* fix benches

* fmt

* comments

---------

Co-authored-by: Andy Grove <andygrove73@gmail.com>
  • Loading branch information
comphead and andygrove authored Apr 16, 2024
1 parent b54adb3 commit 4ad4f90
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 2 deletions.
21 changes: 21 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ main() {
tpch_mem10)
run_tpch_mem "10"
;;
tpch_smj)
run_tpch_smj "1"
;;
tpch_smj10)
run_tpch_smj "10"
;;
parquet)
run_parquet
;;
Expand Down Expand Up @@ -320,6 +326,21 @@ run_tpch() {
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch benchmark with sort merge join
run_tpch_smj() {
SCALE_FACTOR=$1
if [ -z "$SCALE_FACTOR" ] ; then
echo "Internal error: Scale factor not specified"
exit 1
fi
TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}"

RESULTS_FILE="${RESULTS_DIR}/tpch_smj_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch SMJ benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join false --format parquet -o ${RESULTS_FILE}
}

# Runs the tpch in memory
run_tpch_mem() {
SCALE_FACTOR=$1
Expand Down
15 changes: 13 additions & 2 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use log::info;
use structopt::StructOpt;

// hack to avoid `default_value is meaningless for bool` errors
type BoolDefaultTrue = bool;

/// Run the tpch benchmark.
///
/// This benchmarks is derived from the [TPC-H][1] version
Expand Down Expand Up @@ -81,6 +84,11 @@ pub struct RunOpt {
/// Whether to disable collection of statistics (and cost based optimizations) or not.
#[structopt(short = "S", long = "disable-statistics")]
disable_statistics: bool,

/// If true then hash join used, if false then sort merge join
/// True by default.
#[structopt(short = "j", long = "prefer_hash_join", default_value = "true")]
prefer_hash_join: BoolDefaultTrue,
}

const TPCH_QUERY_START_ID: usize = 1;
Expand All @@ -107,10 +115,11 @@ impl RunOpt {
}

async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self
let mut config = self
.common
.config()
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
let ctx = SessionContext::new_with_config(config);

// register tables
Expand Down Expand Up @@ -304,7 +313,7 @@ mod tests {
use super::*;

use datafusion::common::exec_err;
use datafusion::error::{DataFusionError, Result};
use datafusion::error::Result;
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes,
physical_plan_to_bytes,
Expand Down Expand Up @@ -339,6 +348,7 @@ mod tests {
mem_table: false,
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
Expand Down Expand Up @@ -371,6 +381,7 @@ mod tests {
mem_table: false,
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
Expand Down

0 comments on commit 4ad4f90

Please sign in to comment.