diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index c02b08576eaa..24efab6c6ca5 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -142,6 +142,7 @@ main() { data_tpch "10" data_clickbench_1 data_clickbench_partitioned + data_imdb ;; tpch) data_tpch "1" @@ -166,6 +167,9 @@ main() { clickbench_extended) data_clickbench_1 ;; + imdb) + data_imdb + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -430,6 +434,85 @@ run_clickbench_extended() { $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o "${RESULTS_FILE}" } +# Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors) +# http://homepages.cwi.nl/~boncz/job/imdb.tgz +data_imdb() { + local imdb_dir="${DATA_DIR}/imdb" + local imdb_temp_gz="${imdb_dir}/imdb.tgz" + local imdb_url="https://homepages.cwi.nl/~boncz/job/imdb.tgz" + + # imdb has 21 files, we just separate them into 3 groups for better readability + local first_required_files=( + "aka_name.parquet" + "aka_title.parquet" + "cast_info.parquet" + "char_name.parquet" + "comp_cast_type.parquet" + "company_name.parquet" + "company_type.parquet" + ) + + local second_required_files=( + "complete_cast.parquet" + "info_type.parquet" + "keyword.parquet" + "kind_type.parquet" + "link_type.parquet" + "movie_companies.parquet" + "movie_info.parquet" + ) + + local third_required_files=( + "movie_info_idx.parquet" + "movie_keyword.parquet" + "movie_link.parquet" + "name.parquet" + "person_info.parquet" + "role_type.parquet" + "title.parquet" + ) + + # Combine the three arrays into one + local required_files=("${first_required_files[@]}" "${second_required_files[@]}" "${third_required_files[@]}") + local convert_needed=false + + # Create directory if it doesn't exist + mkdir -p "${imdb_dir}" + + # Check if required files exist + for file in "${required_files[@]}"; do + if [ ! -f "${imdb_dir}/${file}" ]; then + convert_needed=true + break + fi + done + + if [ "$convert_needed" = true ]; then + if [ ! -f "${imdb_dir}/imdb.tgz" ]; then + echo "Downloading IMDB dataset..." + + # Download the dataset + curl -o "${imdb_temp_gz}" "${imdb_url}" + + # Extract the dataset + tar -xzvf "${imdb_temp_gz}" -C "${imdb_dir}" + $CARGO_COMMAND --bin imdb -- convert --input ${imdb_dir} --output ${imdb_dir} --format parquet + else + echo "IMDB.tgz already exists." + + # Extract the dataset + tar -xzvf "${imdb_temp_gz}" -C "${imdb_dir}" + $CARGO_COMMAND --bin imdb -- convert --input ${imdb_dir} --output ${imdb_dir} --format parquet + fi + echo "IMDB dataset downloaded and extracted." + else + echo "IMDB dataset already exists and contains required parquet files." + fi +} + + + + compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" BRANCH1="$1" diff --git a/benchmarks/src/bin/imdb.rs b/benchmarks/src/bin/imdb.rs new file mode 100644 index 000000000000..40efb84b0501 --- /dev/null +++ b/benchmarks/src/bin/imdb.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! IMDB binary entrypoint + +use datafusion::error::Result; +use datafusion_benchmarks::imdb; +use structopt::StructOpt; + +#[cfg(all(feature = "snmalloc", feature = "mimalloc"))] +compile_error!( + "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the same time" +); + +#[cfg(feature = "snmalloc")] +#[global_allocator] +static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; + +#[cfg(feature = "mimalloc")] +#[global_allocator] +static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; + +#[derive(Debug, StructOpt)] +#[structopt(name = "IMDB", about = "IMDB Dataset Processing.")] +enum ImdbOpt { + Convert(imdb::ConvertOpt), +} + +#[tokio::main] +pub async fn main() -> Result<()> { + env_logger::init(); + match ImdbOpt::from_args() { + ImdbOpt::Convert(opt) => opt.run().await, + } +} diff --git a/benchmarks/src/imdb/convert.rs b/benchmarks/src/imdb/convert.rs new file mode 100644 index 000000000000..c95f7f8bf564 --- /dev/null +++ b/benchmarks/src/imdb/convert.rs @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion_common::instant::Instant; +use std::path::PathBuf; + +use datafusion::error::Result; +use datafusion::prelude::*; +use structopt::StructOpt; + +use datafusion::common::not_impl_err; + +use super::get_imdb_table_schema; +use super::IMDB_TABLES; + +#[derive(Debug, StructOpt)] +pub struct ConvertOpt { + /// Path to csv files + #[structopt(parse(from_os_str), required = true, short = "i", long = "input")] + input_path: PathBuf, + + /// Output path + #[structopt(parse(from_os_str), required = true, short = "o", long = "output")] + output_path: PathBuf, + + /// Output file format: `csv` or `parquet` + #[structopt(short = "f", long = "format")] + file_format: String, + + /// Batch size when reading CSV or Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "8192")] + batch_size: usize, +} + +impl ConvertOpt { + pub async fn run(self) -> Result<()> { + let input_path = self.input_path.to_str().unwrap(); + let output_path = self.output_path.to_str().unwrap(); + + for table in IMDB_TABLES { + let start = Instant::now(); + let schema = get_imdb_table_schema(table); + + let input_path = format!("{input_path}/{table}.csv"); + let output_path = format!("{output_path}/{table}.parquet"); + let options = CsvReadOptions::new() + .schema(&schema) + .has_header(false) + .delimiter(b',') + .escape(b'\\') + .file_extension(".csv"); + + let config = SessionConfig::new().with_batch_size(self.batch_size); + let ctx = SessionContext::new_with_config(config); + + let mut csv = ctx.read_csv(&input_path, options).await?; + + // Select all apart from the padding column + let selection = csv + .schema() + .iter() + .take(schema.fields.len()) + .map(Expr::from) + .collect(); + + csv = csv.select(selection)?; + + println!( + "Converting '{}' to {} files in directory '{}'", + &input_path, self.file_format, &output_path + ); + match self.file_format.as_str() { + "csv" => { + csv.write_csv( + output_path.as_str(), + DataFrameWriteOptions::new(), + None, + ) + .await?; + } + "parquet" => { + csv.write_parquet( + output_path.as_str(), + DataFrameWriteOptions::new(), + None, + ) + .await?; + } + other => { + return not_impl_err!("Invalid output format: {other}"); + } + } + println!("Conversion completed in {} ms", start.elapsed().as_millis()); + } + Ok(()) + } +} diff --git a/benchmarks/src/imdb/mod.rs b/benchmarks/src/imdb/mod.rs new file mode 100644 index 000000000000..8e2977c0384e --- /dev/null +++ b/benchmarks/src/imdb/mod.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark derived from IMDB dataset. + +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +mod convert; +pub use convert::ConvertOpt; + +// we have 21 tables in the IMDB dataset +pub const IMDB_TABLES: &[&str] = &[ + "aka_name", + "aka_title", + "cast_info", + "char_name", + "comp_cast_type", + "company_name", + "company_type", + "complete_cast", + "info_type", + "keyword", + "kind_type", + "link_type", + "movie_companies", + "movie_info_idx", + "movie_keyword", + "movie_link", + "name", + "role_type", + "title", + "movie_info", + "person_info", +]; + +/// Get the schema for the IMDB dataset tables +/// see benchmarks/data/imdb/schematext.sql +pub fn get_imdb_table_schema(table: &str) -> Schema { + match table { + "aka_name" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("person_id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("name_pcode_cf", DataType::Utf8, true), + Field::new("name_pcode_nf", DataType::Utf8, true), + Field::new("surname_pcode", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "aka_title" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("title", DataType::Utf8, true), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("kind_id", DataType::Int32, false), + Field::new("production_year", DataType::Int32, true), + Field::new("phonetic_code", DataType::Utf8, true), + Field::new("episode_of_id", DataType::Int32, true), + Field::new("season_nr", DataType::Int32, true), + Field::new("episode_nr", DataType::Int32, true), + Field::new("note", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "cast_info" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("person_id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("person_role_id", DataType::Int32, true), + Field::new("note", DataType::Utf8, true), + Field::new("nr_order", DataType::Int32, true), + Field::new("role_id", DataType::Int32, false), + ]), + "char_name" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("imdb_id", DataType::Int32, true), + Field::new("name_pcode_nf", DataType::Utf8, true), + Field::new("surname_pcode", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "comp_cast_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("kind", DataType::Utf8, false), + ]), + "company_name" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("country_code", DataType::Utf8, true), + Field::new("imdb_id", DataType::Int32, true), + Field::new("name_pcode_nf", DataType::Utf8, true), + Field::new("name_pcode_sf", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "company_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("kind", DataType::Utf8, true), + ]), + "complete_cast" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, true), + Field::new("subject_id", DataType::Int32, false), + Field::new("status_id", DataType::Int32, false), + ]), + "info_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("info", DataType::Utf8, false), + ]), + "keyword" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("keyword", DataType::Utf8, false), + Field::new("phonetic_code", DataType::Utf8, true), + ]), + "kind_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("kind", DataType::Utf8, true), + ]), + "link_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("link", DataType::Utf8, false), + ]), + "movie_companies" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("company_id", DataType::Int32, false), + Field::new("company_type_id", DataType::Int32, false), + Field::new("note", DataType::Utf8, true), + ]), + "movie_info_idx" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("info_type_id", DataType::Int32, false), + Field::new("info", DataType::Utf8, false), + Field::new("note", DataType::Utf8, true), + ]), + "movie_keyword" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("keyword_id", DataType::Int32, false), + ]), + "movie_link" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("linked_movie_id", DataType::Int32, false), + Field::new("link_type_id", DataType::Int32, false), + ]), + "name" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("imdb_id", DataType::Int32, true), + Field::new("gender", DataType::Utf8, true), + Field::new("name_pcode_cf", DataType::Utf8, true), + Field::new("name_pcode_nf", DataType::Utf8, true), + Field::new("surname_pcode", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "role_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("role", DataType::Utf8, false), + ]), + "title" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("title", DataType::Utf8, false), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("kind_id", DataType::Int32, false), + Field::new("production_year", DataType::Int32, true), + Field::new("imdb_id", DataType::Int32, true), + Field::new("phonetic_code", DataType::Utf8, true), + Field::new("episode_of_id", DataType::Int32, true), + Field::new("season_nr", DataType::Int32, true), + Field::new("episode_nr", DataType::Int32, true), + Field::new("series_years", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "movie_info" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("info_type_id", DataType::Int32, false), + Field::new("info", DataType::Utf8, false), + Field::new("note", DataType::Utf8, true), + ]), + "person_info" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("person_id", DataType::Int32, false), + Field::new("info_type_id", DataType::Int32, false), + Field::new("info", DataType::Utf8, false), + Field::new("note", DataType::Utf8, true), + ]), + _ => unimplemented!("Schema for table {} is not implemented", table), + } +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index f81220aa2c94..52d81ca91816 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -17,6 +17,7 @@ //! DataFusion benchmark runner pub mod clickbench; +pub mod imdb; pub mod parquet_filter; pub mod sort; pub mod tpch;