diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index e3f606e322fe..7cc3201bf7e4 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -74,6 +74,7 @@ impl DataFrame { #[cfg(test)] mod tests { + use std::collections::HashMap; use std::sync::Arc; use super::super::Result; @@ -81,9 +82,10 @@ mod tests { use crate::arrow::util::pretty; use crate::execution::context::SessionContext; use crate::execution::options::ParquetReadOptions; - use crate::test_util; + use crate::test_util::{self, register_aggregate_csv}; use datafusion_common::file_options::parquet_writer::parse_compression_string; + use datafusion_execution::config::SessionConfig; use datafusion_expr::{col, lit}; use object_store::local::LocalFileSystem; @@ -150,7 +152,7 @@ mod tests { .await?; // Check that file actually used the specified compression - let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?; + let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?; let reader = parquet::file::serialized_reader::SerializedFileReader::new(file) @@ -166,4 +168,54 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn write_parquet_with_small_rg_size() -> Result<()> { + // This test verifies writing a parquet file with small rg size + // relative to datafusion.execution.batch_size does not panic + let mut ctx = SessionContext::new_with_config( + SessionConfig::from_string_hash_map(HashMap::from_iter( + [("datafusion.execution.batch_size", "10")] + .iter() + .map(|(s1, s2)| (s1.to_string(), s2.to_string())), + ))?, + ); + register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; + let test_df = ctx.table("aggregate_test_100").await?; + + let output_path = "file://local/test.parquet"; + + for rg_size in 1..10 { + let df = test_df.clone(); + let tmp_dir = TempDir::new()?; + let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?); + let local_url = Url::parse("file://local").unwrap(); + let ctx = &test_df.session_state; + ctx.runtime_env().register_object_store(&local_url, local); + let mut options = TableParquetOptions::default(); + options.global.max_row_group_size = rg_size; + options.global.allow_single_file_parallelism = true; + df.write_parquet( + output_path, + DataFrameWriteOptions::new().with_single_file_output(true), + Some(options), + ) + .await?; + + // Check that file actually used the correct rg size + let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?; + + let reader = + parquet::file::serialized_reader::SerializedFileReader::new(file) + .unwrap(); + + let parquet_metadata = reader.metadata(); + + let written_rows = parquet_metadata.row_group(0).num_rows(); + + assert_eq!(written_rows as usize, rg_size); + } + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index ec333bb557d2..bcf4e8a2c8e4 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -876,42 +876,47 @@ fn spawn_parquet_parallel_serialization_task( )?; let mut current_rg_rows = 0; - while let Some(rb) = data.recv().await { - if current_rg_rows + rb.num_rows() < max_row_group_rows { - send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()) - .await?; - current_rg_rows += rb.num_rows(); - } else { - let rows_left = max_row_group_rows - current_rg_rows; - let a = rb.slice(0, rows_left); - send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()) - .await?; + while let Some(mut rb) = data.recv().await { + // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case + // when max_row_group_rows < execution.batch_size as an alternative to a recursive async + // function. + loop { + if current_rg_rows + rb.num_rows() < max_row_group_rows { + send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone()) + .await?; + current_rg_rows += rb.num_rows(); + break; + } else { + let rows_left = max_row_group_rows - current_rg_rows; + let a = rb.slice(0, rows_left); + send_arrays_to_col_writers(&col_array_channels, &a, schema.clone()) + .await?; + + // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup + // on a separate task, so that we can immediately start on the next RG before waiting + // for the current one to finish. + drop(col_array_channels); + let finalize_rg_task = spawn_rg_join_and_finalize_task( + column_writer_handles, + max_row_group_rows, + ); + + serialize_tx.send(finalize_rg_task).await.map_err(|_| { + DataFusionError::Internal( + "Unable to send closed RG to concat task!".into(), + ) + })?; - // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup - // on a separate task, so that we can immediately start on the next RG before waiting - // for the current one to finish. - drop(col_array_channels); - let finalize_rg_task = spawn_rg_join_and_finalize_task( - column_writer_handles, - max_row_group_rows, - ); - - serialize_tx.send(finalize_rg_task).await.map_err(|_| { - DataFusionError::Internal( - "Unable to send closed RG to concat task!".into(), - ) - })?; + current_rg_rows = 0; + rb = rb.slice(rows_left, rb.num_rows() - rows_left); - let b = rb.slice(rows_left, rb.num_rows() - rows_left); - (column_writer_handles, col_array_channels) = - spawn_column_parallel_row_group_writer( - schema.clone(), - writer_props.clone(), - max_buffer_rb, - )?; - send_arrays_to_col_writers(&col_array_channels, &b, schema.clone()) - .await?; - current_rg_rows = b.num_rows(); + (column_writer_handles, col_array_channels) = + spawn_column_parallel_row_group_writer( + schema.clone(), + writer_props.clone(), + max_buffer_rb, + )?; + } } }