Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use repartition in window functions to speed up #569

Merged
merged 2 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,9 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
/// using the provided `concurrency` level
pub repartition_aggregations: bool,
/// Should DataFusion repartition data using the partition keys to execute window functions in
/// parallel using the provided `concurrency` level
pub repartition_windows: bool,
}

impl Default for ExecutionConfig {
Expand Down Expand Up @@ -659,6 +662,7 @@ impl Default for ExecutionConfig {
information_schema: false,
repartition_joins: true,
repartition_aggregations: true,
repartition_windows: true,
}
}
}
Expand Down Expand Up @@ -749,11 +753,18 @@ impl ExecutionConfig {
self.repartition_joins = enabled;
self
}

/// Enables or disables the use of repartitioning for aggregations to improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.repartition_aggregations = enabled;
self
}

/// Enables or disables the use of repartitioning for window functions to improve parallelism
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
self.repartition_windows = enabled;
self
}
}

/// Holds per-execution properties and data (such as starting timestamps, etc).
Expand Down
42 changes: 38 additions & 4 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::physical_plan::{
};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
use crate::sql::utils::generate_sort_key;
use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
use crate::variable::VarType;
use crate::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -264,6 +264,38 @@ impl DefaultPhysicalPlanner {
"Impossibly got empty window expression".to_owned(),
));
}

let input_exec = self.create_initial_plan(input, ctx_state)?;

// at this moment we are guaranteed by the logical planner
// to have all the window_expr to have equal sort key
let partition_keys = window_expr_common_partition_keys(window_expr)?;

let can_repartition = !partition_keys.is_empty()
&& ctx_state.config.concurrency > 1
&& ctx_state.config.repartition_windows;

let input_exec = if can_repartition {
let partition_keys = partition_keys
.iter()
.map(|e| {
self.create_physical_expr(
e,
input.schema(),
&input_exec.schema(),
ctx_state,
)
})
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
Arc::new(RepartitionExec::try_new(
input_exec,
Partitioning::Hash(partition_keys, ctx_state.config.concurrency),
)?)
} else {
input_exec
};

// add a sort phase
let get_sort_keys = |expr: &Expr| match expr {
Expr::WindowFunction {
ref partition_by,
Expand All @@ -272,7 +304,6 @@ impl DefaultPhysicalPlanner {
} => generate_sort_key(partition_by, order_by),
_ => unreachable!(),
};

let sort_keys = get_sort_keys(&window_expr[0]);
if window_expr.len() > 1 {
debug_assert!(
Expand All @@ -283,7 +314,6 @@ impl DefaultPhysicalPlanner {
);
}

let input_exec = self.create_initial_plan(input, ctx_state)?;
let logical_input_schema = input.schema();

let input_exec = if sort_keys.is_empty() {
Expand All @@ -310,7 +340,11 @@ impl DefaultPhysicalPlanner {
_ => unreachable!(),
})
.collect::<Result<Vec<_>>>()?;
Arc::new(SortExec::try_new(sort_keys, input_exec)?)
Arc::new(if can_repartition {
SortExec::new_with_partitioning(sort_keys, input_exec, true)
} else {
SortExec::try_new(sort_keys, input_exec)?
})
};

let physical_input_schema = input_exec.schema();
Expand Down
62 changes: 13 additions & 49 deletions datafusion/src/physical_plan/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,22 @@ impl ExecutionPlan for WindowAggExec {

/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
// because we can have repartitioning using the partition keys
// this would be either 1 or more than 1 depending on the presense of
// repartitioning
self.input.output_partitioning()
}

fn required_child_distribution(&self) -> Distribution {
Distribution::SinglePartition
if self
.window_expr()
.iter()
.all(|expr| expr.partition_by().is_empty())
{
Distribution::SinglePartition
} else {
Distribution::UnspecifiedDistribution
}
}

fn with_new_children(
Expand All @@ -428,22 +439,7 @@ impl ExecutionPlan for WindowAggExec {
}

async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
if 0 != partition {
return Err(DataFusionError::Internal(format!(
"WindowAggExec invalid partition {}",
partition
)));
}

// window needs to operate on a single partition currently
if 1 != self.input.output_partitioning().partition_count() {
return Err(DataFusionError::Internal(
"WindowAggExec requires a single input partition".to_owned(),
));
}

let input = self.input.execute(partition).await?;

let stream = Box::pin(WindowAggStream::new(
self.schema.clone(),
self.window_expr.clone(),
Expand Down Expand Up @@ -580,38 +576,6 @@ mod tests {
Ok((input, schema))
}

#[tokio::test]
async fn window_function_input_partition() -> Result<()> {
let (input, schema) = create_test_schema(4)?;

let window_exec = Arc::new(WindowAggExec::try_new(
vec![create_window_expr(
&WindowFunction::AggregateFunction(AggregateFunction::Count),
"count".to_owned(),
&[col("c3", &schema)?],
&[],
&[],
Some(WindowFrame::default()),
schema.as_ref(),
)?],
input,
schema.clone(),
)?);

let result = collect(window_exec).await;

assert!(result.is_err());
if let Some(DataFusionError::Internal(msg)) = result.err() {
assert_eq!(
msg,
"WindowAggExec requires a single input partition".to_owned()
);
} else {
unreachable!("Expect an internal error to happen");
}
Ok(())
}

#[tokio::test]
async fn window_function() -> Result<()> {
let (input, schema) = create_test_schema(1)?;
Expand Down
24 changes: 24 additions & 0 deletions datafusion/src/sql/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,30 @@ pub(crate) fn generate_sort_key(
sort_key
}

/// given a slice of window expressions sharing the same sort key, find their common partition
/// keys.
pub(crate) fn window_expr_common_partition_keys(
window_exprs: &[Expr],
) -> Result<&[Expr]> {
let all_partition_keys = window_exprs
.iter()
.map(|expr| match expr {
Expr::WindowFunction { partition_by, .. } => Ok(partition_by),
expr => Err(DataFusionError::Execution(format!(
"Impossibly got non-window expr {:?}",
expr
))),
})
.collect::<Result<Vec<_>>>()?;
let result = all_partition_keys
.iter()
.min_by_key(|s| s.len())
.ok_or_else(|| {
DataFusionError::Execution("No window expressions found".to_owned())
})?;
Ok(result)
}

/// group a slice of window expression expr by their order by expressions
pub(crate) fn group_window_expr_by_sort_keys(
window_expr: &[Expr],
Expand Down