Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sort preserving merge (#362) #379

Merged
merged 6 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions datafusion/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ use std::fs::metadata;
use std::sync::Arc;
use std::task::{Context, Poll};

use super::{RecordBatchStream, SendableRecordBatchStream};
use crate::error::{DataFusionError, Result};

use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use futures::{Stream, TryStreamExt};
use futures::channel::mpsc;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use tokio::task::JoinHandle;

use crate::arrow::error::ArrowError;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::ExecutionPlan;

use super::{RecordBatchStream, SendableRecordBatchStream};

/// Stream of record batches
pub struct SizedRecordBatchStream {
Expand Down Expand Up @@ -113,3 +118,29 @@ fn build_file_list_recurse(
}
Ok(())
}

/// Spawns a task to the tokio threadpool and writes its outputs to the provided mpsc sender
pub(crate) fn spawn_execution(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice abstraction (and we can probably use it elsewhere)

input: Arc<dyn ExecutionPlan>,
mut output: mpsc::Sender<ArrowResult<RecordBatch>>,
partition: usize,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut stream = match input.execute(partition).await {
Err(e) => {
// If send fails, plan being torn
// down, no place to send the error
let arrow_error = ArrowError::ExternalError(Box::new(e));
output.send(Err(arrow_error)).await.ok();
return;
}
Ok(stream) => stream,
};

while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error
output.send(item).await.ok();
}
})
}
29 changes: 3 additions & 26 deletions datafusion/src/physical_plan/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,19 @@ use std::any::Any;
use std::sync::Arc;

use futures::channel::mpsc;
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::Stream;

use async_trait::async_trait;

use arrow::record_batch::RecordBatch;
use arrow::{
datatypes::SchemaRef,
error::{ArrowError, Result as ArrowResult},
};
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult};

use super::RecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};

use super::SendableRecordBatchStream;
use crate::physical_plan::common::spawn_execution;
use pin_project_lite::pin_project;

/// Merge execution plan executes partitions in parallel and combines them into a single
Expand Down Expand Up @@ -121,26 +117,7 @@ impl ExecutionPlan for MergeExec {
// spawn independent tasks whose resulting streams (of batches)
// are sent to the channel for consumption.
for part_i in 0..input_partitions {
let input = self.input.clone();
let mut sender = sender.clone();
tokio::spawn(async move {
let mut stream = match input.execute(part_i).await {
Err(e) => {
// If send fails, plan being torn
// down, no place to send the error
let arrow_error = ArrowError::ExternalError(Box::new(e));
sender.send(Err(arrow_error)).await.ok();
return;
}
Ok(stream) => stream,
};

while let Some(item) = stream.next().await {
// If send fails, plan being torn down,
// there is no place to send the error
sender.send(item).await.ok();
}
});
spawn_execution(self.input.clone(), sender.clone(), part_i);
}

Ok(Box::pin(MergeStream {
Expand Down
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ pub mod projection;
pub mod regex_expressions;
pub mod repartition;
pub mod sort;
pub mod sort_preserving_merge;
pub mod source;
pub mod string_expressions;
pub mod type_coercion;
Expand Down
Loading