From e34bd2a4774edf2da9484a6fa9223a446ae08d1a Mon Sep 17 00:00:00 2001 From: DouPache Date: Tue, 17 Sep 2024 13:51:06 +0800 Subject: [PATCH 1/4] imdb dataset --- benchmarks/bench.sh | 79 ++++++++++++++ benchmarks/src/bin/imdb.rs | 49 +++++++++ benchmarks/src/imdb/convert.rs | 111 +++++++++++++++++++ benchmarks/src/imdb/mod.rs | 188 +++++++++++++++++++++++++++++++++ benchmarks/src/lib.rs | 1 + 5 files changed, 428 insertions(+) create mode 100644 benchmarks/src/bin/imdb.rs create mode 100644 benchmarks/src/imdb/convert.rs create mode 100644 benchmarks/src/imdb/mod.rs diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index c02b08576eaa..44fb1444a393 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -142,6 +142,7 @@ main() { data_tpch "10" data_clickbench_1 data_clickbench_partitioned + data_imdb ;; tpch) data_tpch "1" @@ -166,6 +167,9 @@ main() { clickbench_extended) data_clickbench_1 ;; + imdb) + data_imdb + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -430,6 +434,81 @@ run_clickbench_extended() { $CARGO_COMMAND --bin dfbench -- clickbench --iterations 5 --path "${DATA_DIR}/hits.parquet" --queries-path "${SCRIPT_DIR}/queries/clickbench/extended.sql" -o "${RESULTS_FILE}" } +# Downloads the csv.gz files IMDB datasets from Peter Boncz's homepage(one of the JOB paper authors) +# http://homepages.cwi.nl/~boncz/job/imdb.tgz +data_imdb() { + local imdb_dir="${DATA_DIR}/imdb" + local imdb_temp_gz="${imdb_dir}/imdb.tgz" + local imdb_url="https://homepages.cwi.nl/~boncz/job/imdb.tgz" + + # imdb has 21 files, we just separate them into 3 groups for better readability + local first_required_files=( + "aka_name.parquet" + "aka_title.parquet" + "cast_info.parquet" + "char_name.parquet" + "comp_cast_type.parquet" + "company_name.parquet" + "company_type.parquet" + ) + + local second_required_files=( + "complete_cast.parquet" + "info_type.parquet" + "keyword.parquet" + "kind_type.parquet" + "link_type.parquet" + "movie_companies.parquet" + "movie_info.parquet" + ) + + local third_required_files=( + "movie_info_idx.parquet" + "movie_keyword.parquet" + "movie_link.parquet" + "name.parquet" + "person_info.parquet" + "role_type.parquet" + "title.parquet" + ) + + # Combine the three arrays into one + local required_files=("${first_required_files[@]}" "${second_required_files[@]}" "${third_required_files[@]}") + local convert_needed=false + + # Create directory if it doesn't exist + mkdir -p "${imdb_dir}" + + # Check if required files exist + for file in "${required_files[@]}"; do + if [ ! -f "${imdb_dir}/${file}" ]; then + convert_needed=true + break + fi + done + + if [ "$convert_needed" = true ]; then + if [ ! -f "${imdb_dir}/imdb.tgz" ]; then + echo "Downloading IMDB dataset..." + + # Download the dataset + curl -o "${imdb_temp_gz}" "${imdb_url}" + else + echo "IMDB.tgz already exists." + + # Extract the dataset + tar -xzvf "${imdb_temp_gz}" -C "${imdb_dir}" + $CARGO_COMMAND --bin imdb -- convert --input ${imdb_dir} --output ${imdb_dir} --format parquet + fi + echo "IMDB dataset downloaded and extracted." + else + echo "IMDB dataset already exists and contains required parquet files." + fi +} + + + + compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" BRANCH1="$1" diff --git a/benchmarks/src/bin/imdb.rs b/benchmarks/src/bin/imdb.rs new file mode 100644 index 000000000000..02f679be4070 --- /dev/null +++ b/benchmarks/src/bin/imdb.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! IMDB binary entrypoint + +use datafusion::error::Result; +use datafusion_benchmarks::imdb; +use structopt::StructOpt; + +#[cfg(all(feature = "snmalloc", feature = "mimalloc"))] +compile_error!( + "feature \"snmalloc\" and feature \"mimalloc\" cannot be enabled at the same time" +); + +#[cfg(feature = "snmalloc")] +#[global_allocator] +static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; + +#[cfg(feature = "mimalloc")] +#[global_allocator] +static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; + +#[derive(Debug, StructOpt)] +#[structopt(name = "IMDB", about = "IMDB Dataset Processing.")] +enum ImdbOpt { + Convert(imdb::ConvertOpt), +} + +#[tokio::main] +pub async fn main() -> Result<()> { + env_logger::init(); + match ImdbOpt::from_args() { + ImdbOpt::Convert(opt) => opt.run().await, + } +} \ No newline at end of file diff --git a/benchmarks/src/imdb/convert.rs b/benchmarks/src/imdb/convert.rs new file mode 100644 index 000000000000..4f5999ca1fe6 --- /dev/null +++ b/benchmarks/src/imdb/convert.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::dataframe::DataFrameWriteOptions; +use datafusion_common::instant::Instant; +use std::path::PathBuf; + +use datafusion::error::Result; +use datafusion::prelude::*; +use structopt::StructOpt; + +use datafusion::common::not_impl_err; + +use super::IMDB_TABLES; +use super::get_imdb_table_schema; + +#[derive(Debug, StructOpt)] +pub struct ConvertOpt { + /// Path to csv files + #[structopt(parse(from_os_str), required = true, short = "i", long = "input")] + input_path: PathBuf, + + /// Output path + #[structopt(parse(from_os_str), required = true, short = "o", long = "output")] + output_path: PathBuf, + + /// Output file format: `csv` or `parquet` + #[structopt(short = "f", long = "format")] + file_format: String, + + /// Batch size when reading CSV or Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "8192")] + batch_size: usize, +} + +impl ConvertOpt { + pub async fn run(self) -> Result<()> { + + let input_path = self.input_path.to_str().unwrap(); + let output_path = self.output_path.to_str().unwrap(); + + for table in IMDB_TABLES { + let start = Instant::now(); + let schema = get_imdb_table_schema(table); + + let input_path = format!("{input_path}/{table}.csv"); + let output_path = format!("{output_path}/{table}.parquet"); + let options = CsvReadOptions::new() + .schema(&schema) + .has_header(false) + .delimiter(b',') + .escape(b'\\') + .file_extension(".csv"); + + let config = SessionConfig::new().with_batch_size(self.batch_size); + let ctx = SessionContext::new_with_config(config); + + let mut csv = ctx.read_csv(&input_path, options).await?; + + // Select all apart from the padding column + let selection = csv + .schema() + .iter() + .take(schema.fields.len() - 1) + .map(Expr::from) + .collect(); + + csv = csv.select(selection)?; + + println!( + "Converting '{}' to {} files in directory '{}'", + &input_path, self.file_format, &output_path + ); + match self.file_format.as_str() { + "csv" => { + csv.write_csv( + output_path.as_str(), + DataFrameWriteOptions::new(), + None, + ).await?; + } + "parquet" => { + csv.write_parquet( + output_path.as_str(), + DataFrameWriteOptions::new(), + None, + ).await?; + } + other => { + return not_impl_err!("Invalid output format: {other}"); + } + } + println!("Conversion completed in {} ms", start.elapsed().as_millis()); + } + Ok(()) + } +} \ No newline at end of file diff --git a/benchmarks/src/imdb/mod.rs b/benchmarks/src/imdb/mod.rs new file mode 100644 index 000000000000..84e755e87a52 --- /dev/null +++ b/benchmarks/src/imdb/mod.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmark derived from IMDB dataset. + +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +mod convert; +pub use convert::ConvertOpt; + +// we have 21 tables in the IMDB dataset +pub const IMDB_TABLES: &[&str] = &[ + "aka_name", "aka_title", "cast_info", "char_name", "comp_cast_type", + "company_name", "company_type", "complete_cast", "info_type", "keyword", + "kind_type", "link_type", "movie_companies", "movie_info_idx", "movie_keyword", + "movie_link", "name", "role_type", "title", "movie_info", "person_info", +]; + +/// Get the schema for the IMDB dataset tables +/// see benchmarks/data/imdb/schematext.sql +pub fn get_imdb_table_schema(table: &str) -> Schema { + match table { + "aka_name" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("person_id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("name_pcode_cf", DataType::Utf8, true), + Field::new("name_pcode_nf", DataType::Utf8, true), + Field::new("surname_pcode", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "aka_title" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("title", DataType::Utf8, true), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("kind_id", DataType::Int32, false), + Field::new("production_year", DataType::Int32, true), + Field::new("phonetic_code", DataType::Utf8, true), + Field::new("episode_of_id", DataType::Int32, true), + Field::new("season_nr", DataType::Int32, true), + Field::new("episode_nr", DataType::Int32, true), + Field::new("note", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "cast_info" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("person_id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("person_role_id", DataType::Int32, true), + Field::new("note", DataType::Utf8, true), + Field::new("nr_order", DataType::Int32, true), + Field::new("role_id", DataType::Int32, false), + ]), + "char_name" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("imdb_id", DataType::Int32, true), + Field::new("name_pcode_nf", DataType::Utf8, true), + Field::new("surname_pcode", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "comp_cast_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("kind", DataType::Utf8, false), + ]), + "company_name" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("country_code", DataType::Utf8, true), + Field::new("imdb_id", DataType::Int32, true), + Field::new("name_pcode_nf", DataType::Utf8, true), + Field::new("name_pcode_sf", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "company_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("kind", DataType::Utf8, true), + ]), + "complete_cast" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, true), + Field::new("subject_id", DataType::Int32, false), + Field::new("status_id", DataType::Int32, false), + ]), + "info_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("info", DataType::Utf8, false), + ]), + "keyword" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("keyword", DataType::Utf8, false), + Field::new("phonetic_code", DataType::Utf8, true), + ]), + "kind_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("kind", DataType::Utf8, true), + ]), + "link_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("link", DataType::Utf8, false), + ]), + "movie_companies" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("company_id", DataType::Int32, false), + Field::new("company_type_id", DataType::Int32, false), + Field::new("note", DataType::Utf8, true), + ]), + "movie_info_idx" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("info_type_id", DataType::Int32, false), + Field::new("info", DataType::Utf8, false), + Field::new("note", DataType::Utf8, true), + ]), + "movie_keyword" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("keyword_id", DataType::Int32, false), + ]), + "movie_link" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("linked_movie_id", DataType::Int32, false), + Field::new("link_type_id", DataType::Int32, false), + ]), + "name" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("imdb_id", DataType::Int32, true), + Field::new("gender", DataType::Utf8, true), + Field::new("name_pcode_cf", DataType::Utf8, true), + Field::new("name_pcode_nf", DataType::Utf8, true), + Field::new("surname_pcode", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "role_type" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("role", DataType::Utf8, false), + ]), + "title" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("title", DataType::Utf8, false), + Field::new("imdb_index", DataType::Utf8, true), + Field::new("kind_id", DataType::Int32, false), + Field::new("production_year", DataType::Int32, true), + Field::new("imdb_id", DataType::Int32, true), + Field::new("phonetic_code", DataType::Utf8, true), + Field::new("episode_of_id", DataType::Int32, true), + Field::new("season_nr", DataType::Int32, true), + Field::new("episode_nr", DataType::Int32, true), + Field::new("series_years", DataType::Utf8, true), + Field::new("md5sum", DataType::Utf8, true), + ]), + "movie_info" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("movie_id", DataType::Int32, false), + Field::new("info_type_id", DataType::Int32, false), + Field::new("info", DataType::Utf8, false), + Field::new("note", DataType::Utf8, true), + ]), + "person_info" => Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("person_id", DataType::Int32, false), + Field::new("info_type_id", DataType::Int32, false), + Field::new("info", DataType::Utf8, false), + Field::new("note", DataType::Utf8, true), + ]), + _ => unimplemented!("Schema for table {} is not implemented", table), + } +} diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index f81220aa2c94..bb1c3d51a149 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -20,5 +20,6 @@ pub mod clickbench; pub mod parquet_filter; pub mod sort; pub mod tpch; +pub mod imdb; mod util; pub use util::*; From dced3aff6c4f648c9ab93839807bb219e8f37240 Mon Sep 17 00:00:00 2001 From: DouPache Date: Tue, 17 Sep 2024 14:23:53 +0800 Subject: [PATCH 2/4] cargo fmt --- benchmarks/src/bin/imdb.rs | 2 +- benchmarks/src/imdb/convert.rs | 19 ++++++++++--------- benchmarks/src/imdb/mod.rs | 25 +++++++++++++++++++++---- benchmarks/src/lib.rs | 2 +- 4 files changed, 33 insertions(+), 15 deletions(-) diff --git a/benchmarks/src/bin/imdb.rs b/benchmarks/src/bin/imdb.rs index 02f679be4070..40efb84b0501 100644 --- a/benchmarks/src/bin/imdb.rs +++ b/benchmarks/src/bin/imdb.rs @@ -46,4 +46,4 @@ pub async fn main() -> Result<()> { match ImdbOpt::from_args() { ImdbOpt::Convert(opt) => opt.run().await, } -} \ No newline at end of file +} diff --git a/benchmarks/src/imdb/convert.rs b/benchmarks/src/imdb/convert.rs index 4f5999ca1fe6..0e98c26795fa 100644 --- a/benchmarks/src/imdb/convert.rs +++ b/benchmarks/src/imdb/convert.rs @@ -25,8 +25,8 @@ use structopt::StructOpt; use datafusion::common::not_impl_err; -use super::IMDB_TABLES; use super::get_imdb_table_schema; +use super::IMDB_TABLES; #[derive(Debug, StructOpt)] pub struct ConvertOpt { @@ -42,21 +42,20 @@ pub struct ConvertOpt { #[structopt(short = "f", long = "format")] file_format: String, - /// Batch size when reading CSV or Parquet files - #[structopt(short = "s", long = "batch-size", default_value = "8192")] - batch_size: usize, + /// Batch size when reading CSV or Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "8192")] + batch_size: usize, } impl ConvertOpt { pub async fn run(self) -> Result<()> { - let input_path = self.input_path.to_str().unwrap(); let output_path = self.output_path.to_str().unwrap(); for table in IMDB_TABLES { let start = Instant::now(); let schema = get_imdb_table_schema(table); - + let input_path = format!("{input_path}/{table}.csv"); let output_path = format!("{output_path}/{table}.parquet"); let options = CsvReadOptions::new() @@ -91,14 +90,16 @@ impl ConvertOpt { output_path.as_str(), DataFrameWriteOptions::new(), None, - ).await?; + ) + .await?; } "parquet" => { csv.write_parquet( output_path.as_str(), DataFrameWriteOptions::new(), None, - ).await?; + ) + .await?; } other => { return not_impl_err!("Invalid output format: {other}"); @@ -108,4 +109,4 @@ impl ConvertOpt { } Ok(()) } -} \ No newline at end of file +} diff --git a/benchmarks/src/imdb/mod.rs b/benchmarks/src/imdb/mod.rs index 84e755e87a52..8e2977c0384e 100644 --- a/benchmarks/src/imdb/mod.rs +++ b/benchmarks/src/imdb/mod.rs @@ -23,10 +23,27 @@ pub use convert::ConvertOpt; // we have 21 tables in the IMDB dataset pub const IMDB_TABLES: &[&str] = &[ - "aka_name", "aka_title", "cast_info", "char_name", "comp_cast_type", - "company_name", "company_type", "complete_cast", "info_type", "keyword", - "kind_type", "link_type", "movie_companies", "movie_info_idx", "movie_keyword", - "movie_link", "name", "role_type", "title", "movie_info", "person_info", + "aka_name", + "aka_title", + "cast_info", + "char_name", + "comp_cast_type", + "company_name", + "company_type", + "complete_cast", + "info_type", + "keyword", + "kind_type", + "link_type", + "movie_companies", + "movie_info_idx", + "movie_keyword", + "movie_link", + "name", + "role_type", + "title", + "movie_info", + "person_info", ]; /// Get the schema for the IMDB dataset tables diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index bb1c3d51a149..52d81ca91816 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -17,9 +17,9 @@ //! DataFusion benchmark runner pub mod clickbench; +pub mod imdb; pub mod parquet_filter; pub mod sort; pub mod tpch; -pub mod imdb; mod util; pub use util::*; From 0bf209e93ec76fad8753bd00684f8dcaa72649d0 Mon Sep 17 00:00:00 2001 From: DouPache Date: Tue, 17 Sep 2024 18:00:35 +0800 Subject: [PATCH 3/4] we should also extrac the tar after download --- benchmarks/bench.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 44fb1444a393..24efab6c6ca5 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -493,6 +493,10 @@ data_imdb() { # Download the dataset curl -o "${imdb_temp_gz}" "${imdb_url}" + + # Extract the dataset + tar -xzvf "${imdb_temp_gz}" -C "${imdb_dir}" + $CARGO_COMMAND --bin imdb -- convert --input ${imdb_dir} --output ${imdb_dir} --format parquet else echo "IMDB.tgz already exists." From dec1fbde1e7d5c86735893b0f2fdefdabc264a51 Mon Sep 17 00:00:00 2001 From: DouPache Date: Fri, 20 Sep 2024 11:28:28 +0800 Subject: [PATCH 4/4] we should not skip last col --- benchmarks/src/imdb/convert.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/src/imdb/convert.rs b/benchmarks/src/imdb/convert.rs index 0e98c26795fa..c95f7f8bf564 100644 --- a/benchmarks/src/imdb/convert.rs +++ b/benchmarks/src/imdb/convert.rs @@ -74,7 +74,7 @@ impl ConvertOpt { let selection = csv .schema() .iter() - .take(schema.fields.len() - 1) + .take(schema.fields.len()) .map(Expr::from) .collect();