Skip to content

Commit

Permalink
allow window aggr to be parallelizable
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 16, 2021
1 parent 64d2bf3 commit 0ac4a3a
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 21 deletions.
11 changes: 11 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
/// using the provided `concurrency` level
pub repartition_aggregations: bool,
/// Should DataFusion repartition data using the partition keys to execute window functions in
/// parallel using the provided `concurrency` level
pub repartition_windows: bool,
}

impl ExecutionConfig {
Expand Down Expand Up @@ -669,6 +672,7 @@ impl ExecutionConfig {
information_schema: false,
repartition_joins: true,
repartition_aggregations: true,
repartition_windows: true,
}
}

Expand Down Expand Up @@ -752,11 +756,18 @@ impl ExecutionConfig {
self.repartition_joins = enabled;
self
}

/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.repartition_aggregations = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.repartition_windows = enabled;
self
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl DefaultPhysicalPlanner {

let can_repartition = !partition_keys.is_empty()
&& ctx_state.config.concurrency > 1
&& ctx_state.config.repartition_aggregations;
&& ctx_state.config.repartition_windows;

let input_exec = if can_repartition {
let partition_keys = partition_keys
Expand Down
22 changes: 5 additions & 17 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,14 @@ impl ExecutionPlan for WindowAggExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
// because we can have repartitioning using the partition keys
// this would be either 1 or more than 1 depending on the presense of
// repartitioning
self.input.output_partitioning()
}

fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
Distribution::UnspecifiedDistribution
}

fn with_new_children(
Expand All @@ -436,22 +439,7 @@ impl ExecutionPlan for WindowAggExec {
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"WindowAggExec invalid partition {}",
partition
)));
}

// window needs to operate on a single partition currently
if 1 != self.input.output_partitioning().partition_count() {
return Err(DataFusionError::Internal(
"WindowAggExec requires a single input partition".to_owned(),
));
}

let input = self.input.execute(partition).await?;

let stream = Box::pin(WindowAggStream::new(
self.schema.clone(),
self.window_expr.clone(),
Expand Down
4 changes: 3 additions & 1 deletion datafusion/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len()));
groups.reverse();
for (_, exprs) in groups {
let window_exprs: Vec<Expr> = exprs.into_iter().cloned().collect();
let window_exprs = exprs.into_iter().cloned().collect();
// the partition and sort itself is done at physical level, see physical_planner's
// fn create_initial_plan
plan = LogicalPlanBuilder::from(&plan)
.window(window_exprs)?
.build()?;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ pub(crate) fn generate_sort_key(
/// keys.
pub(crate) fn window_expr_common_partition_keys(
window_exprs: &[Expr],
) -> Result<Vec<Expr>> {
) -> Result<&[Expr]> {
let all_partition_keys = window_exprs
.iter()
.map(|expr| match expr {
Expand All @@ -477,7 +477,7 @@ pub(crate) fn window_expr_common_partition_keys(
let result = all_partition_keys.iter().min_by_key(|s| s.len()).ok_or(
DataFusionError::Execution("No window expressions found".to_owned()),
)?;
Ok(result.to_vec())
Ok(result)
}

/// group a slice of window expression expr by their order by expressions
Expand Down

0 comments on commit 0ac4a3a

Please sign in to comment.