Skip to content

Commit

Permalink
allow window aggr to be parallelizable
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu committed Jun 23, 2021
1 parent 13daed7 commit 809cd80
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 65 deletions.
11 changes: 11 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,9 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
/// using the provided `concurrency` level
pub repartition_aggregations: bool,
/// Should DataFusion repartition data using the partition keys to execute window functions in
/// parallel using the provided `concurrency` level
pub repartition_windows: bool,
}

impl Default for ExecutionConfig {
Expand Down Expand Up @@ -652,6 +655,7 @@ impl Default for ExecutionConfig {
information_schema: false,
repartition_joins: true,
repartition_aggregations: true,
repartition_windows: true,
}
}
}
Expand Down Expand Up @@ -742,11 +746,18 @@ impl ExecutionConfig {
self.repartition_joins = enabled;
self
}

/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.repartition_aggregations = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.repartition_windows = enabled;
self
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
Expand Down
8 changes: 0 additions & 8 deletions datafusion/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,14 +352,6 @@ impl LogicalPlanBuilder {
}

/// Apply a window
///
/// NOTE: this feature is under development and this API will be changing
///
/// - https://github.com/apache/arrow-datafusion/issues/359 basic structure
/// - https://github.com/apache/arrow-datafusion/issues/298 empty over clause
/// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause
/// - https://github.com/apache/arrow-datafusion/issues/360 with order by
/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame
pub fn window(&self, window_expr: Vec<Expr>) -> Result<Self> {
let all_expr = window_expr.iter();
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;
Expand Down
11 changes: 8 additions & 3 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,18 @@ impl DefaultPhysicalPlanner {

let can_repartition = !partition_keys.is_empty()
&& ctx_state.config.concurrency > 1
&& ctx_state.config.repartition_aggregations;
&& ctx_state.config.repartition_windows;

let input_exec = if can_repartition {
let partition_keys = partition_keys
.iter()
.map(|e| {
self.create_physical_expr(e, &input_exec.schema(), ctx_state)
self.create_physical_expr(
e,
input.schema(),
&input_exec.schema(),
ctx_state,
)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
Arc::new(RepartitionExec::try_new(
Expand Down Expand Up @@ -337,7 +342,7 @@ impl DefaultPhysicalPlanner {
.map(|e| {
self.create_window_expr(
e,
&logical_input_schema,
logical_input_schema,
&physical_input_schema,
ctx_state,
)
Expand Down
54 changes: 5 additions & 49 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,14 @@ impl ExecutionPlan for WindowAggExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
// because we can have repartitioning using the partition keys
// this would be either 1 or more than 1 depending on the presense of
// repartitioning
self.input.output_partitioning()
}

fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
Distribution::UnspecifiedDistribution
}

fn with_new_children(
Expand All @@ -436,22 +439,7 @@ impl ExecutionPlan for WindowAggExec {
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"WindowAggExec invalid partition {}",
partition
)));
}

// window needs to operate on a single partition currently
if 1 != self.input.output_partitioning().partition_count() {
return Err(DataFusionError::Internal(
"WindowAggExec requires a single input partition".to_owned(),
));
}

let input = self.input.execute(partition).await?;

let stream = Box::pin(WindowAggStream::new(
self.schema.clone(),
self.window_expr.clone(),
Expand Down Expand Up @@ -591,38 +579,6 @@ mod tests {
Ok((input, schema))
}

#[tokio::test]
async fn window_function_input_partition() -> Result<()> {
let (input, schema) = create_test_schema(4)?;

let window_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
&WindowFunction::AggregateFunction(AggregateFunction::Count),
"count".to_owned(),
&[col("c3", &schema)?],
&[],
&[],
Some(WindowFrame::default()),
schema.as_ref(),
)?],
input,
schema.clone(),
)?);

let result = collect(window_exec).await;

assert!(result.is_err());
if let Some(DataFusionError::Internal(msg)) = result.err() {
assert_eq!(
msg,
"WindowAggExec requires a single input partition".to_owned()
);
} else {
unreachable!("Expect an internal error to happen");
}
Ok(())
}

#[tokio::test]
async fn window_function() -> Result<()> {
let (input, schema) = create_test_schema(1)?;
Expand Down
13 changes: 8 additions & 5 deletions datafusion/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ pub(crate) fn generate_sort_key(
/// keys.
pub(crate) fn window_expr_common_partition_keys(
window_exprs: &[Expr],
) -> Result<Vec<Expr>> {
) -> Result<&[Expr]> {
let all_partition_keys = window_exprs
.iter()
.map(|expr| match expr {
Expand All @@ -474,10 +474,13 @@ pub(crate) fn window_expr_common_partition_keys(
))),
})
.collect::<Result<Vec<_>>>()?;
let result = all_partition_keys.iter().min_by_key(|s| s.len()).ok_or(
DataFusionError::Execution("No window expressions found".to_owned()),
)?;
Ok(result.to_vec())
let result = all_partition_keys
.iter()
.min_by_key(|s| s.len())
.ok_or_else(|| {
DataFusionError::Execution("No window expressions found".to_owned())
})?;
Ok(result)
}

/// group a slice of window expression expr by their order by expressions
Expand Down

0 comments on commit 809cd80

Please sign in to comment.