From 030e59d12455d44e7762e517db0e8e4d68a9f9ff Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Mar 2024 09:44:34 -0400 Subject: [PATCH 1/7] loop split rb --- .../src/datasource/file_format/parquet.rs | 73 ++++++++++--------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index ec333bb557d2..bcf4e8a2c8e4 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -876,42 +876,47 @@ fn spawn_parquet_parallel_serialization_task( )?; let mut current_rg_rows = 0; - while let Some(rb) = data.recv().await { - if current_rg_rows + rb.num_rows() < max_row_group_rows { - send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()) - .await?; - current_rg_rows += rb.num_rows(); - } else { - let rows_left = max_row_group_rows - current_rg_rows; - let a = rb.slice(0, rows_left); - send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()) - .await?; + while let Some(mut rb) = data.recv().await { + // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case + // when max_row_group_rows < execution.batch_size as an alternative to a recursive async + // function. + loop { + if current_rg_rows + rb.num_rows() < max_row_group_rows { + send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()) + .await?; + current_rg_rows += rb.num_rows(); + break; + } else { + let rows_left = max_row_group_rows - current_rg_rows; + let a = rb.slice(0, rows_left); + send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()) + .await?; + + // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup + // on a separate task, so that we can immediately start on the next RG before waiting + // for the current one to finish. + drop(col_array_channels); + let finalize_rg_task = spawn_rg_join_and_finalize_task( + column_writer_handles, + max_row_group_rows, + ); + + serialize_tx.send(finalize_rg_task).await.map_err(|_| { + DataFusionError::Internal( + "Unable to send closed RG to concat task!".into(), + ) + })?; - // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup - // on a separate task, so that we can immediately start on the next RG before waiting - // for the current one to finish. - drop(col_array_channels); - let finalize_rg_task = spawn_rg_join_and_finalize_task( - column_writer_handles, - max_row_group_rows, - ); - - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; + current_rg_rows = 0; + rb = rb.slice(rows_left, rb.num_rows() - rows_left); - let b = rb.slice(rows_left, rb.num_rows() - rows_left); - (column_writer_handles, col_array_channels) = - spawn_column_parallel_row_group_writer( - schema.clone(), - writer_props.clone(), - max_buffer_rb, - )?; - send_arrays_to_col_writers(&col_array_channels, &b, schema.clone()) - .await?; - current_rg_rows = b.num_rows(); + (column_writer_handles, col_array_channels) = + spawn_column_parallel_row_group_writer( + schema.clone(), + writer_props.clone(), + max_buffer_rb, + )?; + } } } From 4d46fccae2ec95f430576c00a14753d11c3cbc83 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Mar 2024 09:53:01 -0400 Subject: [PATCH 2/7] add test --- datafusion/core/src/dataframe/parquet.rs | 39 ++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index e3f606e322fe..e22cc670439b 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -166,4 +166,43 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn write_parquet_with_small_rg_size() -> Result<()> { + let test_df = test_util::test_table().await?; + let output_path = "file://local/test.parquet"; + + for rg_size in 1..10 { + let df = test_df.clone(); + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + let ctx = &test_df.session_state; + ctx.runtime_env().register_object_store(&local_url, local); + let mut options = TableParquetOptions::default(); + options.global.max_row_group_size = rg_size; + df.write_parquet( + output_path, + DataFrameWriteOptions::new().with_single_file_output(true), + Some(options), + ) + .await?; + + // Check that file actually used the specified compression + let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; + + let reader = + parquet::file::serialized_reader::SerializedFileReader::new(file) + .unwrap(); + + let parquet_metadata = reader.metadata(); + + let written_rows = + parquet_metadata.row_group(0).num_rows(); + + assert_eq!(written_rows as usize, rg_size); + } + + Ok(()) + } } From 677272fe51ed8c421bb917caac8f2c8a2614bab3 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Mar 2024 10:09:33 -0400 Subject: [PATCH 3/7] add new test --- datafusion/core/src/dataframe/parquet.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index e22cc670439b..40b990f824de 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -169,10 +169,14 @@ mod tests { #[tokio::test] async fn write_parquet_with_small_rg_size() -> Result<()> { - let test_df = test_util::test_table().await?; + let mut test_df = test_util::test_table().await?; + // make the test data larger so there are multiple batches + for _ in 0..7{ + test_df = test_df.clone().union(test_df)?; + } let output_path = "file://local/test.parquet"; - for rg_size in 1..10 { + for rg_size in (1..7).step_by(5) { let df = test_df.clone(); let tmp_dir = TempDir::new()?; let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); @@ -181,6 +185,7 @@ mod tests { ctx.runtime_env().register_object_store(&local_url, local); let mut options = TableParquetOptions::default(); options.global.max_row_group_size = rg_size; + options.global.allow_single_file_parallelism = true; df.write_parquet( output_path, DataFrameWriteOptions::new().with_single_file_output(true), @@ -188,7 +193,7 @@ mod tests { ) .await?; - // Check that file actually used the specified compression + // Check that file actually used the correct rg size let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; let reader = From 40cf8f958a543e152098fdbbf896b633a32af1dc Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Mar 2024 10:13:03 -0400 Subject: [PATCH 4/7] fmt --- datafusion/core/src/dataframe/parquet.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 40b990f824de..dc51b1f42a85 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -171,7 +171,7 @@ mod tests { async fn write_parquet_with_small_rg_size() -> Result<()> { let mut test_df = test_util::test_table().await?; // make the test data larger so there are multiple batches - for _ in 0..7{ + for _ in 0..7 { test_df = test_df.clone().union(test_df)?; } let output_path = "file://local/test.parquet"; @@ -202,8 +202,7 @@ mod tests { let parquet_metadata = reader.metadata(); - let written_rows = - parquet_metadata.row_group(0).num_rows(); + let written_rows = parquet_metadata.row_group(0).num_rows(); assert_eq!(written_rows as usize, rg_size); } From 8b67b12858a6de06769db5ebf9e40ccf00bcff30 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Mar 2024 18:36:37 -0400 Subject: [PATCH 5/7] lower batch size in test --- datafusion/core/src/dataframe/parquet.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index dc51b1f42a85..07c173c0f62e 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -74,6 +74,7 @@ impl DataFrame { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::sync::Arc; use super::super::Result; @@ -81,9 +82,10 @@ mod tests { use crate::arrow::util::pretty; use crate::execution::context::SessionContext; use crate::execution::options::ParquetReadOptions; - use crate::test_util; + use crate::test_util::{self, register_aggregate_csv}; use datafusion_common::file_options::parquet_writer::parse_compression_string; + use datafusion_execution::config::SessionConfig; use datafusion_expr::{col, lit}; use object_store::local::LocalFileSystem; @@ -169,11 +171,16 @@ mod tests { #[tokio::test] async fn write_parquet_with_small_rg_size() -> Result<()> { - let mut test_df = test_util::test_table().await?; - // make the test data larger so there are multiple batches - for _ in 0..7 { - test_df = test_df.clone().union(test_df)?; - } + let mut ctx = SessionContext::new_with_config( + SessionConfig::from_string_hash_map(HashMap::from_iter( + [("datafusion.execution.batch_size", "10")] + .iter() + .map(|(s1, s2)| (s1.to_string(), s2.to_string())), + ))?, + ); + register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; + let test_df = ctx.table("aggregate_test_100").await?; + let output_path = "file://local/test.parquet"; for rg_size in (1..7).step_by(5) { From c9da8c84e407d0c8ae9faf6e4535ba61e7598528 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Mar 2024 18:37:57 -0400 Subject: [PATCH 6/7] make test faster --- datafusion/core/src/dataframe/parquet.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 07c173c0f62e..6e5f971f96b8 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -171,6 +171,8 @@ mod tests { #[tokio::test] async fn write_parquet_with_small_rg_size() -> Result<()> { + // This test verifies writing a parquet file with small rg size + // relative to datafusion.execution.batch_size does not panic let mut ctx = SessionContext::new_with_config( SessionConfig::from_string_hash_map(HashMap::from_iter( [("datafusion.execution.batch_size", "10")] @@ -183,7 +185,7 @@ mod tests { let output_path = "file://local/test.parquet"; - for rg_size in (1..7).step_by(5) { + for rg_size in 1..10 { let df = test_df.clone(); let tmp_dir = TempDir::new()?; let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); From 0dc697d833d56cb54fde58f5735d8eb9a8fd36b6 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 22 Mar 2024 18:42:21 -0400 Subject: [PATCH 7/7] use path not into_path --- datafusion/core/src/dataframe/parquet.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 6e5f971f96b8..7cc3201bf7e4 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -152,7 +152,7 @@ mod tests { .await?; // Check that file actually used the specified compression - let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; + let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?; let reader = parquet::file::serialized_reader::SerializedFileReader::new(file) @@ -203,7 +203,7 @@ mod tests { .await?; // Check that file actually used the correct rg size - let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; + let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?; let reader = parquet::file::serialized_reader::SerializedFileReader::new(file)