Skip to content

Commit

Permalink
parallalize window function calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu authored and jimexist committed Jun 26, 2021
1 parent 8858d95 commit ed55aa2
Showing 1 changed file with 24 additions and 6 deletions.
30 changes: 24 additions & 6 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::task;

/// Window execution plan
#[derive(Debug)]
Expand Down Expand Up @@ -467,14 +468,29 @@ pin_project! {
}

/// Compute the window aggregate columns
fn compute_window_aggregates(
async fn compute_window_aggregates(
window_expr: Vec<Arc<dyn WindowExpr>>,
batch: &RecordBatch,
batch: Arc<RecordBatch>,
) -> Result<Vec<ArrayRef>> {
window_expr
let handles = window_expr
.iter()
.map(|window_expr| window_expr.evaluate(batch))
.collect()
.map(|window_expr| {
let batch = batch.clone();
let window_expr = window_expr.clone();
task::spawn_blocking(move || window_expr.evaluate(&batch))
})
.collect::<Vec<_>>();
let mut result = vec![];
for handle in handles {
let arr = handle.await.map_err(|e| {
DataFusionError::Execution(format!(
"Failed to join window aggregation handle {}",
e
))
})??;
result.push(arr);
}
Ok(result)
}

impl WindowAggStream {
Expand Down Expand Up @@ -510,8 +526,10 @@ impl WindowAggStream {
.map_err(DataFusionError::into_arrow_external_error)?;
let batch = common::combine_batches(&batches, input_schema.clone())?;
if let Some(batch) = batch {
let batch = Arc::new(batch);
// calculate window cols
let mut columns = compute_window_aggregates(window_expr, &batch)
let mut columns = compute_window_aggregates(window_expr, batch.clone())
.await
.map_err(DataFusionError::into_arrow_external_error)?;
// combine with the original cols
// note the setup of window aggregates is that they newly calculated window
Expand Down

0 comments on commit ed55aa2

Please sign in to comment.