Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: parallel parquet can underflow when max_record_batch_rows < execution.batch_size #9737

Merged
merged 7 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,18 @@ impl DataFrame {

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use super::super::Result;
use super::*;
use crate::arrow::util::pretty;
use crate::execution::context::SessionContext;
use crate::execution::options::ParquetReadOptions;
use crate::test_util;
use crate::test_util::{self, register_aggregate_csv};

use datafusion_common::file_options::parquet_writer::parse_compression_string;
use datafusion_execution::config::SessionConfig;
use datafusion_expr::{col, lit};

use object_store::local::LocalFileSystem;
Expand Down Expand Up @@ -150,7 +152,7 @@ mod tests {
.await?;

// Check that file actually used the specified compression
let file = std::fs::File::open(tmp_dir.into_path().join("test.parquet"))?;
let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the driveby cleanup


let reader =
parquet::file::serialized_reader::SerializedFileReader::new(file)
Expand All @@ -166,4 +168,54 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn write_parquet_with_small_rg_size() -> Result<()> {
// This test verifies writing a parquet file with small rg size
// relative to datafusion.execution.batch_size does not panic
let mut ctx = SessionContext::new_with_config(
SessionConfig::from_string_hash_map(HashMap::from_iter(
[("datafusion.execution.batch_size", "10")]
.iter()
.map(|(s1, s2)| (s1.to_string(), s2.to_string())),
))?,
);
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
let test_df = ctx.table("aggregate_test_100").await?;

let output_path = "file://local/test.parquet";

for rg_size in 1..10 {
let df = test_df.clone();
let tmp_dir = TempDir::new()?;
let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
let local_url = Url::parse("file://local").unwrap();
let ctx = &test_df.session_state;
ctx.runtime_env().register_object_store(&local_url, local);
let mut options = TableParquetOptions::default();
options.global.max_row_group_size = rg_size;
options.global.allow_single_file_parallelism = true;
df.write_parquet(
output_path,
DataFrameWriteOptions::new().with_single_file_output(true),
Some(options),
)
.await?;

// Check that file actually used the correct rg size
let file = std::fs::File::open(tmp_dir.path().join("test.parquet"))?;

let reader =
parquet::file::serialized_reader::SerializedFileReader::new(file)
.unwrap();

let parquet_metadata = reader.metadata();

let written_rows = parquet_metadata.row_group(0).num_rows();

assert_eq!(written_rows as usize, rg_size);
}

Ok(())
}
}
73 changes: 39 additions & 34 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,42 +876,47 @@ fn spawn_parquet_parallel_serialization_task(
)?;
let mut current_rg_rows = 0;

while let Some(rb) = data.recv().await {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;
while let Some(mut rb) = data.recv().await {
// This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
// when max_row_group_rows < execution.batch_size as an alternative to a recursive async
// function.
loop {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
break;
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
// for the current one to finish.
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
// for the current one to finish.
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);

let b = rb.slice(rows_left, rb.num_rows() - rows_left);
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
send_arrays_to_col_writers(&col_array_channels, &b, schema.clone())
.await?;
current_rg_rows = b.num_rows();
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
}
}
}

Expand Down
Loading