diff --git a/benchmarks/README.md b/benchmarks/README.md index 8fed85fa02b8..13b73074e0d9 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -243,28 +243,11 @@ See the help for more details. You can enable `mimalloc` or `snmalloc` (to use either the mimalloc or snmalloc allocator) as features by passing them in as `--features`. For example: ```shell -cargo run --release --features "mimalloc" --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 -``` - -The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl` -(generated by the `dbgen` utility) to CSV and Parquet. - -```bash -cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-parquet --format parquet +cargo run --release --features "mimalloc" --bin dfbench tpch --iterations 3 --path ./data --format tbl --query 1 --batch-size 4096 ``` Or if you want to verify and run all the queries in the benchmark, you can just run `cargo test`. -#### Sorted Conversion - -The TPCH tables generated by the dbgen utility are sorted by their first column (their primary key for most tables, the `l_orderkey` column for the `lineitem` table.) - -To preserve this sorted order information during conversion (useful for benchmarking execution on pre-sorted data) include the `--sort` flag: - -```bash -cargo run --release --bin tpch -- convert --input ./data --output /mnt/tpch-sorted-parquet --format parquet --sort -``` - ### Comparing results between runs Any `dfbench` execution with `-o ` argument will produce a diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 6db4eed42c0f..8c0a92523240 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -189,8 +189,8 @@ main() { echo "***************************" case "$BENCHMARK" in all) - data_tpch "1" - data_tpch "10" + data_tpch "1" "parquet" + data_tpch "10" "parquet" data_h2o "SMALL" data_h2o "MEDIUM" data_h2o "BIG" @@ -203,26 +203,22 @@ main() { # nlj uses range() function, no data generation needed ;; tpch) - data_tpch "1" + data_tpch "1" "parquet" ;; tpch_mem) - # same data as for tpch - data_tpch "1" + data_tpch "1" "parquet" ;; tpch_csv) - # same data as for tpch - data_tpch "1" + data_tpch "1" "csv" ;; tpch10) - data_tpch "10" + data_tpch "10" "parquet" ;; tpch_mem10) - # same data as for tpch10 - data_tpch "10" + data_tpch "10" "parquet" ;; tpch_csv10) - # same data as for tpch10 - data_tpch "10" + data_tpch "10" "csv" ;; clickbench_1) data_clickbench_1 @@ -297,19 +293,19 @@ main() { ;; external_aggr) # same data as for tpch - data_tpch "1" + data_tpch "1" "parquet" ;; sort_tpch) # same data as for tpch - data_tpch "1" + data_tpch "1" "parquet" ;; sort_tpch10) # same data as for tpch10 - data_tpch "10" + data_tpch "10" "parquet" ;; topk_tpch) # same data as for tpch - data_tpch "1" + data_tpch "1" "parquet" ;; nlj) # nlj uses range() function, no data generation needed @@ -320,7 +316,7 @@ main() { echo "HJ benchmark does not require data generation" ;; compile_profile) - data_tpch "1" + data_tpch "1" "parquet" ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" @@ -537,7 +533,7 @@ main() { # Creates TPCH data at a certain scale factor, if it doesn't already # exist # -# call like: data_tpch($scale_factor) +# call like: data_tpch($scale_factor, format) # # Creates data in $DATA_DIR/tpch_sf1 for scale factor 1 # Creates data in $DATA_DIR/tpch_sf10 for scale factor 10 @@ -548,20 +544,23 @@ data_tpch() { echo "Internal error: Scale factor not specified" exit 1 fi + FORMAT=$2 + if [ -z "$FORMAT" ] ; then + echo "Internal error: Format not specified" + exit 1 + fi TPCH_DIR="${DATA_DIR}/tpch_sf${SCALE_FACTOR}" - echo "Creating tpch dataset at Scale Factor ${SCALE_FACTOR} in ${TPCH_DIR}..." + echo "Creating tpch $FORMAT dataset at Scale Factor ${SCALE_FACTOR} in ${TPCH_DIR}..." # Ensure the target data directory exists mkdir -p "${TPCH_DIR}" - # Create 'tbl' (CSV format) data into $DATA_DIR if it does not already exist - FILE="${TPCH_DIR}/supplier.tbl" - if test -f "${FILE}"; then - echo " tbl files exist ($FILE exists)." - else - echo " creating tbl files with tpch_dbgen..." - docker run -v "${TPCH_DIR}":/data -it --rm ghcr.io/scalytics/tpch-docker:main -vf -s "${SCALE_FACTOR}" + # check if tpchgen-cli is installed + if ! command -v tpchgen-cli &> /dev/null + then + echo "tpchgen-cli could not be found, please install it via 'cargo install tpchgen-cli'" + exit 1 fi # Copy expected answers into the ./data/answers directory if it does not already exist @@ -574,27 +573,32 @@ data_tpch() { docker run -v "${TPCH_DIR}":/data -it --entrypoint /bin/bash --rm ghcr.io/scalytics/tpch-docker:main -c "cp -f /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/" fi - # Create 'parquet' files from tbl - FILE="${TPCH_DIR}/supplier" - if test -d "${FILE}"; then - echo " parquet files exist ($FILE exists)." - else - echo " creating parquet files using benchmark binary ..." - pushd "${SCRIPT_DIR}" > /dev/null - $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}" --format parquet - popd > /dev/null + if [ "$FORMAT" = "parquet" ]; then + # Create 'parquet' files, one directory per file + FILE="${TPCH_DIR}/supplier" + if test -d "${FILE}"; then + echo " parquet files exist ($FILE exists)." + else + echo " creating parquet files using tpchgen-cli ..." + tpchgen-cli --scale-factor "${SCALE_FACTOR}" --format parquet --parquet-compression='ZSTD(1)' --parts=1 --output-dir "${TPCH_DIR}" + fi + return fi - # Create 'csv' files from tbl - FILE="${TPCH_DIR}/csv/supplier" - if test -d "${FILE}"; then - echo " csv files exist ($FILE exists)." - else - echo " creating csv files using benchmark binary ..." - pushd "${SCRIPT_DIR}" > /dev/null - $CARGO_COMMAND --bin tpch -- convert --input "${TPCH_DIR}" --output "${TPCH_DIR}/csv" --format csv - popd > /dev/null + # Create 'csv' files, one directory per file + if [ "$FORMAT" = "csv" ]; then + FILE="${TPCH_DIR}/csv/supplier" + if test -d "${FILE}"; then + echo " csv files exist ($FILE exists)." + else + echo " creating csv files using tpchgen-cli binary ..." + tpchgen-cli --scale-factor "${SCALE_FACTOR}" --format csv --parts=1 --output-dir "${TPCH_DIR}/csv" + fi + return fi + + echo "Error: unknown format '$FORMAT' for tpch data generation, expected 'parquet' or 'csv'" + exit 1 } # Runs the tpch benchmark @@ -611,10 +615,10 @@ run_tpch() { echo "Running tpch benchmark..." FORMAT=$2 - debug_run $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format ${FORMAT} -o "${RESULTS_FILE}" ${QUERY_ARG} } -# Runs the tpch in memory +# Runs the tpch in memory (needs tpch parquet data) run_tpch_mem() { SCALE_FACTOR=$1 if [ -z "$SCALE_FACTOR" ] ; then @@ -627,7 +631,7 @@ run_tpch_mem() { echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running tpch_mem benchmark..." # -m means in memory - debug_run $CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- tpch --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the compile profile benchmark helper diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index 816cae0e3855..68dea188dc55 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -48,7 +48,6 @@ enum Options { Nlj(nlj::RunOpt), SortTpch(sort_tpch::RunOpt), Tpch(tpch::RunOpt), - TpchConvert(tpch::ConvertOpt), } // Main benchmark runner entrypoint @@ -65,6 +64,5 @@ pub async fn main() -> Result<()> { Options::Nlj(opt) => opt.run().await, Options::SortTpch(opt) => opt.run().await, Options::Tpch(opt) => Box::pin(opt.run()).await, - Options::TpchConvert(opt) => opt.run().await, } } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs deleted file mode 100644 index ca2bb8e57c0e..000000000000 --- a/benchmarks/src/bin/tpch.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! tpch binary only entrypoint - -use datafusion::error::Result; -use datafusion_benchmarks::tpch; -use structopt::StructOpt; - -#[cfg(all(feature = "snmalloc", feature = "mimalloc"))] -compile_error!( - "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the same time" -); - -#[cfg(feature = "snmalloc")] -#[global_allocator] -static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; - -#[cfg(feature = "mimalloc")] -#[global_allocator] -static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; - -#[derive(Debug, StructOpt)] -#[structopt(about = "benchmark command")] -enum BenchmarkSubCommandOpt { - #[structopt(name = "datafusion")] - DataFusionBenchmark(tpch::RunOpt), -} - -#[derive(Debug, StructOpt)] -#[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")] -enum TpchOpt { - Benchmark(BenchmarkSubCommandOpt), - Convert(tpch::ConvertOpt), -} - -/// 'tpch' entry point, with tortured command line arguments. Please -/// use `dbbench` instead. -/// -/// Note: this is kept to be backwards compatible with the benchmark names prior to -/// -#[tokio::main] -async fn main() -> Result<()> { - env_logger::init(); - match TpchOpt::from_args() { - TpchOpt::Benchmark(BenchmarkSubCommandOpt::DataFusionBenchmark(opt)) => { - Box::pin(opt.run()).await - } - TpchOpt::Convert(opt) => opt.run().await, - } -} diff --git a/benchmarks/src/tpch/convert.rs b/benchmarks/src/tpch/convert.rs deleted file mode 100644 index 5219e09cd305..000000000000 --- a/benchmarks/src/tpch/convert.rs +++ /dev/null @@ -1,162 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::logical_expr::select_expr::SelectExpr; -use datafusion_common::instant::Instant; -use std::fs; -use std::path::{Path, PathBuf}; - -use datafusion::common::not_impl_err; - -use super::get_tbl_tpch_table_schema; -use super::TPCH_TABLES; -use datafusion::error::Result; -use datafusion::prelude::*; -use parquet::basic::Compression; -use parquet::file::properties::WriterProperties; -use structopt::StructOpt; - -/// Convert tpch .slt files to .parquet or .csv files -#[derive(Debug, StructOpt)] -pub struct ConvertOpt { - /// Path to csv files - #[structopt(parse(from_os_str), required = true, short = "i", long = "input")] - input_path: PathBuf, - - /// Output path - #[structopt(parse(from_os_str), required = true, short = "o", long = "output")] - output_path: PathBuf, - - /// Output file format: `csv` or `parquet` - #[structopt(short = "f", long = "format")] - file_format: String, - - /// Compression to use when writing Parquet files - #[structopt(short = "c", long = "compression", default_value = "zstd")] - compression: String, - - /// Number of partitions to produce - #[structopt(short = "n", long = "partitions", default_value = "1")] - partitions: usize, - - /// Batch size when reading CSV or Parquet files - #[structopt(short = "s", long = "batch-size", default_value = "8192")] - batch_size: usize, - - /// Sort each table by its first column in ascending order. - #[structopt(short = "t", long = "sort")] - sort: bool, -} - -impl ConvertOpt { - pub async fn run(self) -> Result<()> { - let compression = self.compression()?; - - let input_path = self.input_path.to_str().unwrap(); - let output_path = self.output_path.to_str().unwrap(); - - let output_root_path = Path::new(output_path); - for table in TPCH_TABLES { - let start = Instant::now(); - let schema = get_tbl_tpch_table_schema(table); - let key_column_name = schema.fields()[0].name(); - - let input_path = format!("{input_path}/{table}.tbl"); - let options = CsvReadOptions::new() - .schema(&schema) - .has_header(false) - .delimiter(b'|') - .file_extension(".tbl"); - let options = if self.sort { - // indicated that the file is already sorted by its first column to speed up the conversion - options - .file_sort_order(vec![vec![col(key_column_name).sort(true, false)]]) - } else { - options - }; - - let config = SessionConfig::new().with_batch_size(self.batch_size); - let ctx = SessionContext::new_with_config(config); - - // build plan to read the TBL file - let mut csv = ctx.read_csv(&input_path, options).await?; - - // Select all apart from the padding column - let selection = csv - .schema() - .iter() - .take(schema.fields.len() - 1) - .map(Expr::from) - .map(SelectExpr::from) - .collect::>(); - - csv = csv.select(selection)?; - // optionally, repartition the file - let partitions = self.partitions; - if partitions > 1 { - csv = csv.repartition(Partitioning::RoundRobinBatch(partitions))? - } - let csv = if self.sort { - csv.sort_by(vec![col(key_column_name)])? - } else { - csv - }; - - // create the physical plan - let csv = csv.create_physical_plan().await?; - - let output_path = output_root_path.join(table); - let output_path = output_path.to_str().unwrap().to_owned(); - fs::create_dir_all(&output_path)?; - println!( - "Converting '{}' to {} files in directory '{}'", - &input_path, self.file_format, &output_path - ); - match self.file_format.as_str() { - "csv" => ctx.write_csv(csv, output_path).await?, - "parquet" => { - let props = WriterProperties::builder() - .set_compression(compression) - .build(); - ctx.write_parquet(csv, output_path, Some(props)).await? - } - other => { - return not_impl_err!("Invalid output format: {other}"); - } - } - println!("Conversion completed in {} ms", start.elapsed().as_millis()); - } - - Ok(()) - } - - /// return the compression method to use when writing parquet - fn compression(&self) -> Result { - Ok(match self.compression.as_str() { - "none" => Compression::UNCOMPRESSED, - "snappy" => Compression::SNAPPY, - "brotli" => Compression::BROTLI(Default::default()), - "gzip" => Compression::GZIP(Default::default()), - "lz4" => Compression::LZ4, - "lz0" => Compression::LZO, - "zstd" => Compression::ZSTD(Default::default()), - other => { - return not_impl_err!("Invalid compression format: {other}"); - } - }) - } -} diff --git a/benchmarks/src/tpch/mod.rs b/benchmarks/src/tpch/mod.rs index 233ea94a05c1..681aa0a403ee 100644 --- a/benchmarks/src/tpch/mod.rs +++ b/benchmarks/src/tpch/mod.rs @@ -27,9 +27,6 @@ use std::fs; mod run; pub use run::RunOpt; -mod convert; -pub use convert::ConvertOpt; - pub const TPCH_TABLES: &[&str] = &[ "part", "supplier", "partsupp", "customer", "orders", "lineitem", "nation", "region", ];