Skip to content

Commit

Permalink
move serialization to blocking threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Mar 14, 2024
1 parent 9d0c05b commit e6de971
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,11 @@ impl DataSink for ParquetSink {

/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them using an [ArrowColumnWriter]
/// Once the channel is exhausted, returns the ArrowColumnWriter.
async fn column_serializer_task(
fn column_serializer_task(
mut rx: Receiver<ArrowLeafColumn>,
mut writer: ArrowColumnWriter,
) -> Result<ArrowColumnWriter> {
while let Some(col) = rx.recv().await {
while let Some(col) = rx.blocking_recv() {
writer.write(&col)?;
}
Ok(writer)
Expand Down Expand Up @@ -799,7 +799,8 @@ fn spawn_column_parallel_row_group_writer(
mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
col_array_channels.push(send_array);

let task = SpawnedTask::spawn(column_serializer_task(recieve_array, writer));
let task =
SpawnedTask::spawn_blocking(|| column_serializer_task(recieve_array, writer));
col_writer_tasks.push(task);
}

Expand Down

0 comments on commit e6de971

Please sign in to comment.