Skip to content

Commit

Permalink
Tests, linter fixes and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
thinkharderdev committed Jun 11, 2022
1 parent 9f477a6 commit dfbfee2
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 47 deletions.
10 changes: 6 additions & 4 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ use datafusion_physical_expr::aggregate::row_accumulator::RowAccumulator;
pub use datafusion_physical_expr::expressions::create_aggregate_expr;
use datafusion_row::{row_supported, RowType};

/// Represents of physical grouping set.
pub type PhysicalGroupingSetExpr = Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>;

/// Hash aggregate modes
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AggregateMode {
Expand All @@ -75,7 +78,7 @@ pub struct AggregateExec {
/// In the normal group by case the first element in this list will contain the group by expressions.
/// In the case of GROUPING SETS, CUBE or ROLLUP, we expect a list of physical expressions with the same
/// length and schema.
grouping_set_expr: Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>,
grouping_set_expr: PhysicalGroupingSetExpr,
/// Aggregate expressions
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
/// Input plan, could be a partial aggregate or the input to the aggregate
Expand All @@ -94,7 +97,7 @@ impl AggregateExec {
/// Create a new hash aggregate execution plan
pub fn try_new(
mode: AggregateMode,
grouping_set_expr: Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>,
grouping_set_expr: PhysicalGroupingSetExpr,
aggr_expr: Vec<Arc<dyn AggregateExpr>>,
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
Expand Down Expand Up @@ -143,7 +146,7 @@ impl AggregateExec {

/// Grouping expressions
pub fn group_expr(&self) -> &[(Arc<dyn PhysicalExpr>, String)] {
// TODO This probably isn't right....
// TODO Is this right?
&self.grouping_set_expr[0]
}

Expand Down Expand Up @@ -209,7 +212,6 @@ impl ExecutionPlan for AggregateExec {
fn required_child_distribution(&self) -> Distribution {
match &self.mode {
AggregateMode::Partial => Distribution::UnspecifiedDistribution,
// TODO this isn't right, we need to pass in the partition exprs explicitly I think
AggregateMode::FinalPartitioned => Distribution::HashPartitioned(
self.grouping_set_expr
.iter()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl GroupedHashAggregateStreamV2 {
baseline_metrics: BaselineMetrics,
) -> Result<Self> {
assert!(
group_expr.len() > 0,
!group_expr.is_empty(),
"Require non-zero set of grouping expressions"
);

Expand Down
100 changes: 61 additions & 39 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ use crate::logical_plan::{
use crate::logical_plan::{Limit, Values};
use crate::physical_expr::create_physical_expr;
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupingSetExpr,
};
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
Expand Down Expand Up @@ -538,7 +540,7 @@ impl DefaultPhysicalPlanner {
let logical_input_schema = input.as_ref().schema();

let groups = self.create_grouping_physical_expr(
&group_expr,
group_expr,
logical_input_schema,
&physical_input_schema,
session_state)?;
Expand Down Expand Up @@ -573,7 +575,7 @@ impl DefaultPhysicalPlanner {
.flat_map(|x| x.0.data_type(physical_input_schema.as_ref()))
.any(|x| matches!(x, DataType::Dictionary(_, _)));

let can_repartition = !(groups.iter().flatten().count() == 0)
let can_repartition = groups.iter().flatten().count() != 0
&& session_state.config.target_partitions > 1
&& session_state.config.repartition_aggregations
&& !contains_dict;
Expand Down Expand Up @@ -1005,26 +1007,31 @@ impl DefaultPhysicalPlanner {
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>> {
) -> Result<PhysicalGroupingSetExpr> {
if group_expr.len() == 1 {
match &group_expr[0] {
Expr::GroupingSet(GroupingSet::GroupingSets(grouping_sets)) => {
merge_grouping_set_expr(
&grouping_sets,
merge_grouping_set_physical_expr(
grouping_sets,
input_dfschema,
input_schema,
session_state,
)
}
Expr::GroupingSet(GroupingSet::Cube(exprs)) => {
create_cube_expr(&exprs, input_dfschema, input_schema, session_state)
}
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => create_rollup_expr(
&exprs,
Expr::GroupingSet(GroupingSet::Cube(exprs)) => create_cube_physical_expr(
exprs,
input_dfschema,
input_schema,
session_state,
),
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
create_rollup_physical_expr(
exprs,
input_dfschema,
input_schema,
session_state,
)
}
expr => Ok(vec![vec![tuple_err((
self.create_physical_expr(
expr,
Expand Down Expand Up @@ -1054,12 +1061,22 @@ impl DefaultPhysicalPlanner {
}
}

fn merge_grouping_set_expr(
/// Expand and align a GROUPING SET expression.
/// (see https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
///
/// This will take a list of grouping sets and ensure that each group is
/// properly aligned for the physical execution plan. We do this by
/// identifying all unique expression in each group and conforming each
/// group to the same set of expression types and ordering.
/// For example, if we have something like `GROUPING SETS ((a,b,c),(a),(b),(b,c))`
/// we would expand this to `GROUPING SETS ((a,b,c),(a,NULL,NULL),(NULL,b,NULL),(NULL,b,c))
/// (see https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
fn merge_grouping_set_physical_expr(
grouping_sets: &[Vec<Expr>],
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>> {
) -> Result<PhysicalGroupingSetExpr> {
let num_groups = grouping_sets.len();
let mut all_exprs: Vec<Expr> = vec![];
let mut null_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![];
Expand All @@ -1082,11 +1099,11 @@ fn merge_grouping_set_expr(

let expr_count = all_exprs.len();

for group_idx in 0..num_groups {
for expr_group in grouping_sets.iter() {
let mut group: Vec<(Arc<dyn PhysicalExpr>, String)> =
Vec::with_capacity(expr_count);
for idx in 0..expr_count {
if grouping_sets[group_idx].contains(&all_exprs[idx]) {
if expr_group.contains(&all_exprs[idx]) {
group.push(get_physical_expr_pair(
&all_exprs[idx],
input_dfschema,
Expand All @@ -1104,12 +1121,14 @@ fn merge_grouping_set_expr(
Ok(merged_sets)
}

fn create_cube_expr(
/// Expand and align a CUBE expression. This is a special case of GROUPING SETS
/// (see https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
fn create_cube_physical_expr(
exprs: &[Expr],
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>> {
) -> Result<PhysicalGroupingSetExpr> {
let num_terms = exprs.len();
let num_groups = num_terms * num_terms;

Expand All @@ -1134,8 +1153,7 @@ fn create_cube_expr(
)?)
}

let mut groups: Vec<Vec<(Arc<dyn PhysicalExpr>, String)>> =
Vec::with_capacity(num_groups);
let mut groups: PhysicalGroupingSetExpr = Vec::with_capacity(num_groups);

groups.push(all_exprs.clone());

Expand All @@ -1152,32 +1170,33 @@ fn create_cube_expr(
Ok(groups)
}

fn create_rollup_expr(
/// Expand and align a ROLLUP expression. This is a special case of GROUPING SETS
/// (see https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS)
fn create_rollup_physical_expr(
exprs: &[Expr],
input_dfschema: &DFSchema,
input_schema: &Schema,
session_state: &SessionState,
) -> Result<Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>> {
) -> Result<PhysicalGroupingSetExpr> {
let num_of_exprs = exprs.len();

let mut groups: Vec<Vec<(Arc<dyn PhysicalExpr>, String)>> =
Vec::with_capacity(num_of_exprs + 1);
let mut groups: PhysicalGroupingSetExpr = Vec::with_capacity(num_of_exprs + 1);

for total in 0..num_of_exprs + 1 {
for total in 0..=num_of_exprs {
let mut group: Vec<(Arc<dyn PhysicalExpr>, String)> =
Vec::with_capacity(num_of_exprs);

for index in 0..num_of_exprs {
for (index, expr) in exprs.iter().enumerate() {
if index < total {
group.push(get_physical_expr_pair(
&exprs[index],
expr,
input_dfschema,
input_schema,
session_state,
)?);
} else {
group.push(get_null_physical_expr_pair(
&exprs[index],
expr,
input_dfschema,
input_schema,
session_state,
Expand All @@ -1191,6 +1210,7 @@ fn create_rollup_expr(
Ok(groups)
}

/// For a given logical expr, get a properly typed NULL ScalarValue physical expression
fn get_null_physical_expr_pair(
expr: &Expr,
input_dfschema: &DFSchema,
Expand Down Expand Up @@ -1592,12 +1612,13 @@ mod tests {
let logical_input_schema = logical_plan.schema();
let session_state = make_session_state();

let cube: Result<Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>> = create_cube_expr(
&exprs,
&logical_input_schema,
&physical_input_schema,
&session_state,
);
let cube: Result<Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>> =
create_cube_physical_expr(
&exprs,
&logical_input_schema,
&physical_input_schema,
&session_state,
);

let expected = r#"Ok([[(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], [(Literal { value: Utf8(NULL) }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")], [(Column { name: "c1", index: 0 }, "c1"), (Literal { value: Int64(NULL) }, "c2"), (Column { name: "c3", index: 2 }, "c3")], [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Literal { value: Int64(NULL) }, "c3")], [(Literal { value: Utf8(NULL) }, "c1"), (Literal { value: Int64(NULL) }, "c2"), (Column { name: "c3", index: 2 }, "c3")], [(Literal { value: Utf8(NULL) }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Literal { value: Int64(NULL) }, "c3")], [(Column { name: "c1", index: 0 }, "c1"), (Literal { value: Int64(NULL) }, "c2"), (Literal { value: Int64(NULL) }, "c3")], [(Literal { value: Utf8(NULL) }, "c1"), (Literal { value: Int64(NULL) }, "c2"), (Literal { value: Int64(NULL) }, "c3")]])"#;

Expand Down Expand Up @@ -1626,12 +1647,13 @@ mod tests {
let logical_input_schema = logical_plan.schema();
let session_state = make_session_state();

let cube: Result<Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>> = create_rollup_expr(
&exprs,
&logical_input_schema,
&physical_input_schema,
&session_state,
);
let cube: Result<Vec<Vec<(Arc<dyn PhysicalExpr>, String)>>> =
create_rollup_physical_expr(
&exprs,
&logical_input_schema,
&physical_input_schema,
&session_state,
);

let expected = r#"Ok([[(Literal { value: Utf8(NULL) }, "c1"), (Literal { value: Int64(NULL) }, "c2"), (Literal { value: Int64(NULL) }, "c3")], [(Column { name: "c1", index: 0 }, "c1"), (Literal { value: Int64(NULL) }, "c2"), (Literal { value: Int64(NULL) }, "c3")], [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Literal { value: Int64(NULL) }, "c3")], [(Column { name: "c1", index: 0 }, "c1"), (Column { name: "c2", index: 1 }, "c2"), (Column { name: "c3", index: 2 }, "c3")]])"#;

Expand Down
Loading

0 comments on commit dfbfee2

Please sign in to comment.