Skip to content

Commit

Permalink
implement window functions with partition by
Browse files Browse the repository at this point in the history
  • Loading branch information
Jiayu Liu authored and jimexist committed Jun 24, 2021
1 parent aead7f8 commit 9f18613
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 53 deletions.
11 changes: 11 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,9 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
/// using the provided `concurrency` level
pub repartition_aggregations: bool,
/// Should DataFusion repartition data using the partition keys to execute window functions in
/// parallel using the provided `concurrency` level
pub repartition_windows: bool,
}

impl Default for ExecutionConfig {
Expand Down Expand Up @@ -652,6 +655,7 @@ impl Default for ExecutionConfig {
information_schema: false,
repartition_joins: true,
repartition_aggregations: true,
repartition_windows: true,
}
}
}
Expand Down Expand Up @@ -742,11 +746,18 @@ impl ExecutionConfig {
self.repartition_joins = enabled;
self
}

/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.repartition_aggregations = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.repartition_windows = enabled;
self
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
Expand Down
42 changes: 38 additions & 4 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::physical_plan::{
};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::sql::utils::generate_sort_key;
use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
use crate::variable::VarType;
use crate::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -264,6 +264,38 @@ impl DefaultPhysicalPlanner {
"Impossibly got empty window expression".to_owned(),
));
}

let input_exec = self.create_initial_plan(input, ctx_state)?;

// at this moment we are guaranteed by the logical planner
// to have all the window_expr to have equal sort key
let partition_keys = window_expr_common_partition_keys(window_expr)?;

let can_repartition = !partition_keys.is_empty()
&& ctx_state.config.concurrency > 1
&& ctx_state.config.repartition_windows;

let input_exec = if can_repartition {
let partition_keys = partition_keys
.iter()
.map(|e| {
self.create_physical_expr(
e,
input.schema(),
&input_exec.schema(),
ctx_state,
)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
Arc::new(RepartitionExec::try_new(
input_exec,
Partitioning::Hash(partition_keys, ctx_state.config.concurrency),
)?)
} else {
input_exec
};

// add a sort phase
let get_sort_keys = |expr: &Expr| match expr {
Expr::WindowFunction {
ref partition_by,
Expand All @@ -272,7 +304,6 @@ impl DefaultPhysicalPlanner {
} => generate_sort_key(partition_by, order_by),
_ => unreachable!(),
};

let sort_keys = get_sort_keys(&window_expr[0]);
if window_expr.len() > 1 {
debug_assert!(
Expand All @@ -283,7 +314,6 @@ impl DefaultPhysicalPlanner {
);
}

let input_exec = self.create_initial_plan(input, ctx_state)?;
let logical_input_schema = input.schema();

let input_exec = if sort_keys.is_empty() {
Expand All @@ -310,7 +340,11 @@ impl DefaultPhysicalPlanner {
_ => unreachable!(),
})
.collect::<Result<Vec<_>>>()?;
Arc::new(SortExec::try_new(sort_keys, input_exec)?)
Arc::new(if can_repartition {
SortExec::new_with_partitioning(sort_keys, input_exec, true)
} else {
SortExec::try_new(sort_keys, input_exec)?
})
};

let physical_input_schema = input_exec.schema();
Expand Down
54 changes: 5 additions & 49 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,14 @@ impl ExecutionPlan for WindowAggExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
// because we can have repartitioning using the partition keys
// this would be either 1 or more than 1 depending on the presense of
// repartitioning
self.input.output_partitioning()
}

fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
Distribution::UnspecifiedDistribution
}

fn with_new_children(
Expand All @@ -431,22 +434,7 @@ impl ExecutionPlan for WindowAggExec {
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"WindowAggExec invalid partition {}",
partition
)));
}

// window needs to operate on a single partition currently
if 1 != self.input.output_partitioning().partition_count() {
return Err(DataFusionError::Internal(
"WindowAggExec requires a single input partition".to_owned(),
));
}

let input = self.input.execute(partition).await?;

let stream = Box::pin(WindowAggStream::new(
self.schema.clone(),
self.window_expr.clone(),
Expand Down Expand Up @@ -583,38 +571,6 @@ mod tests {
Ok((input, schema))
}

#[tokio::test]
async fn window_function_input_partition() -> Result<()> {
let (input, schema) = create_test_schema(4)?;

let window_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
&WindowFunction::AggregateFunction(AggregateFunction::Count),
"count".to_owned(),
&[col("c3", &schema)?],
&[],
&[],
Some(WindowFrame::default()),
schema.as_ref(),
)?],
input,
schema.clone(),
)?);

let result = collect(window_exec).await;

assert!(result.is_err());
if let Some(DataFusionError::Internal(msg)) = result.err() {
assert_eq!(
msg,
"WindowAggExec requires a single input partition".to_owned()
);
} else {
unreachable!("Expect an internal error to happen");
}
Ok(())
}

#[tokio::test]
async fn window_function() -> Result<()> {
let (input, schema) = create_test_schema(1)?;
Expand Down
24 changes: 24 additions & 0 deletions datafusion/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ pub(crate) fn generate_sort_key(
sort_key
}

/// given a slice of window expressions sharing the same sort key, find their common partition
/// keys.
pub(crate) fn window_expr_common_partition_keys(
window_exprs: &[Expr],
) -> Result<&[Expr]> {
let all_partition_keys = window_exprs
.iter()
.map(|expr| match expr {
Expr::WindowFunction { partition_by, .. } => Ok(partition_by),
expr => Err(DataFusionError::Execution(format!(
"Impossibly got non-window expr {:?}",
expr
))),
})
.collect::<Result<Vec<_>>>()?;
let result = all_partition_keys
.iter()
.min_by_key(|s| s.len())
.ok_or_else(|| {
DataFusionError::Execution("No window expressions found".to_owned())
})?;
Ok(result)
}

/// group a slice of window expression expr by their order by expressions
pub(crate) fn group_window_expr_by_sort_keys(
window_expr: &[Expr],
Expand Down

0 comments on commit 9f18613

Please sign in to comment.