Skip to content

Commit

Permalink
loop split rb
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Mar 22, 2024
1 parent 2b69acc commit 030e59d
Showing 1 changed file with 39 additions and 34 deletions.
73 changes: 39 additions & 34 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,42 +876,47 @@ fn spawn_parquet_parallel_serialization_task(
)?;
let mut current_rg_rows = 0;

while let Some(rb) = data.recv().await {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;
while let Some(mut rb) = data.recv().await {
// This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
// when max_row_group_rows < execution.batch_size as an alternative to a recursive async
// function.
loop {
if current_rg_rows + rb.num_rows() < max_row_group_rows {
send_arrays_to_col_writers(&col_array_channels, &rb, schema.clone())
.await?;
current_rg_rows += rb.num_rows();
break;
} else {
let rows_left = max_row_group_rows - current_rg_rows;
let a = rb.slice(0, rows_left);
send_arrays_to_col_writers(&col_array_channels, &a, schema.clone())
.await?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
// for the current one to finish.
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;

// Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
// on a separate task, so that we can immediately start on the next RG before waiting
// for the current one to finish.
drop(col_array_channels);
let finalize_rg_task = spawn_rg_join_and_finalize_task(
column_writer_handles,
max_row_group_rows,
);

serialize_tx.send(finalize_rg_task).await.map_err(|_| {
DataFusionError::Internal(
"Unable to send closed RG to concat task!".into(),
)
})?;
current_rg_rows = 0;
rb = rb.slice(rows_left, rb.num_rows() - rows_left);

let b = rb.slice(rows_left, rb.num_rows() - rows_left);
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
send_arrays_to_col_writers(&col_array_channels, &b, schema.clone())
.await?;
current_rg_rows = b.num_rows();
(column_writer_handles, col_array_channels) =
spawn_column_parallel_row_group_writer(
schema.clone(),
writer_props.clone(),
max_buffer_rb,
)?;
}
}
}

Expand Down

0 comments on commit 030e59d

Please sign in to comment.