Skip to content

Commit

Permalink
Break complex Reduce operators into simpler ones (#17013)
Browse files Browse the repository at this point in the history
This is an experimental PR where we break apart `Reduce` operators that
would be rendered with the `Collation` plan into atomic `Reduce`
operators that are joined together instead. This has the potential to be
(much) better than the collation plan, or .. to be worse. If we put an
`ArrangeBy` at the end it wouldn't be worse, but it could be much
better.

This PR is mostly to look at some plans and see what changes and how.

Edit: More explanation here:
https://materializeinc.slack.com/archives/C02PPB50ZHS/p1673028168703109

Edit2: Slack message copy/pasted for visibility:

> Other Reduce thought: We have various flavors of reduce plans, roughly
three (accumulable, hierarchical, and "generic"). We collate these
together with another reduce though .. we should just use a delta join.
All of the constituents present arrangements of their outputs, and a
delta join would use no additional memory (unlike the collation
operator). Moreover, we could then push down predicates and projection
and mapping and such.
> Downside: delta joins don't produce an output arrangement, so it
wouldn't always be the case that Reduce has an arrangement of its
output. We could always make one at not much additional cost (and I
think strictly less than the collation operator).

Edit 3: scope has changed to only breaking apart enough aggregates to
prevent collation. Fixes
MaterializeInc/database-issues#2273

### Motivation

<!--
Which of the following best describes the motivation behind this PR?

  * This PR fixes a recognized bug.

    [Ensure issue is linked somewhere.]

  * This PR adds a known-desirable feature.

    [Ensure issue is linked somewhere.]

  * This PR fixes a previously unreported bug.

    [Describe the bug in detail, as if you were filing a bug report.]

  * This PR adds a feature that has not yet been specified.

[Write a brief specification for the feature, including justification
for its inclusion in Materialize, as if you were writing the original
     feature specification.]

   * This PR refactors existing code.

[Describe what was wrong with the existing code, if it is not obvious.]
-->

### Tips for reviewer

<!--
Leave some tips for your reviewer, like:

    * The diff is much smaller if viewed with whitespace hidden.
    * [Some function/module/file] deserves extra attention.
* [Some function/module/file] is pure code movement and only needs a
skim.

Delete this section if no tips.
-->

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered.
- [ ] This PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way) and therefore is tagged with
a `T-proto` label.
- [ ] This PR includes the following [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note):

- <!-- Add release notes here or explicitly state that there are no
user-facing behavior changes. -->

---------

Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Co-authored-by: Moritz Hoffmann <mh@materialize.com>
  • Loading branch information
frankmcsherry and antiguru authored Nov 21, 2024
1 parent 94c514b commit 38cce6b
Show file tree
Hide file tree
Showing 20 changed files with 8,259 additions and 3,774 deletions.
2 changes: 2 additions & 0 deletions src/repr/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ optimizer_feature_flags!({
enable_reduce_unnest_list_fusion: bool,
// See the feature flag of the same name.
enable_window_aggregation_fusion: bool,
// See the feature flag of the same name.
enable_reduce_reduction: bool,
});

/// A trait used to implement layered config construction.
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4661,6 +4661,7 @@ pub fn unplan_create_cluster(
enable_value_window_function_fusion,
enable_reduce_unnest_list_fusion,
enable_window_aggregation_fusion,
enable_reduce_reduction: _,
} = optimizer_feature_overrides;
let features_extracted = ClusterFeatureExtracted {
// Seen is ignored when unplanning.
Expand Down
1 change: 1 addition & 0 deletions src/sql/src/plan/statement/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ impl TryFrom<ExplainPlanOptionExtracted> for ExplainConfig {
enable_value_window_function_fusion: v.enable_value_window_function_fusion,
enable_reduce_unnest_list_fusion: v.enable_reduce_unnest_list_fusion,
enable_window_aggregation_fusion: v.enable_window_aggregation_fusion,
enable_reduce_reduction: Default::default(),
},
})
}
Expand Down
5 changes: 5 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,7 @@ impl SystemVars {
&ENABLE_STORAGE_SHARD_FINALIZATION,
&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
&ENABLE_DEFAULT_CONNECTION_VALIDATION,
&ENABLE_REDUCE_REDUCTION,
&MIN_TIMESTAMP_INTERVAL,
&MAX_TIMESTAMP_INTERVAL,
&LOGGING_FILTER,
Expand Down Expand Up @@ -2109,6 +2110,10 @@ impl SystemVars {
*self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
}

pub fn enable_reduce_reduction(&self) -> bool {
*self.expect_value(&ENABLE_REDUCE_REDUCTION)
}

/// Returns the `enable_default_connection_validation` configuration parameter.
pub fn enable_default_connection_validation(&self) -> bool {
*self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
Expand Down
8 changes: 8 additions & 0 deletions src/sql/src/session/vars/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,13 @@ pub static ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE: VarDefinition = VarDefinition:
true,
);

pub static ENABLE_REDUCE_REDUCTION: VarDefinition = VarDefinition::new(
"enable_reduce_reduction",
value!(bool; true),
"split complex reductions in to simpler ones and a join (Materialize).",
true,
);

pub static MIN_TIMESTAMP_INTERVAL: VarDefinition = VarDefinition::new(
"min_timestamp_interval",
value!(Duration; Duration::from_millis(1000)),
Expand Down Expand Up @@ -2226,6 +2233,7 @@ impl From<&super::SystemVars> for OptimizerFeatures {
enable_value_window_function_fusion: vars.enable_value_window_function_fusion(),
enable_reduce_unnest_list_fusion: vars.enable_reduce_unnest_list_fusion(),
enable_window_aggregation_fusion: vars.enable_window_aggregation_fusion(),
enable_reduce_reduction: vars.enable_reduce_reduction(),
persist_fast_path_limit: vars.persist_fast_path_limit(),
reoptimize_imported_views: false,
}
Expand Down
3 changes: 3 additions & 0 deletions src/transform/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub mod notice;
pub mod ordering;
pub mod predicate_pushdown;
pub mod reduce_elision;
pub mod reduce_reduction;
pub mod reduction_pushdown;
pub mod redundant_join;
pub mod semijoin_idempotence;
Expand Down Expand Up @@ -610,6 +611,8 @@ impl Optimizer {
// Replaces reduces with maps when the group keys are
// unique with maps
Box::new(reduce_elision::ReduceElision),
// Rips complex reduces apart.
Box::new(reduce_reduction::ReduceReduction),
// Converts `Cross Join {Constant(Literal) + Input}` to
// `Map {Cross Join (Input, Constant()), Literal}`.
// Join fusion will clean this up to `Map{Input, Literal}`
Expand Down
142 changes: 142 additions & 0 deletions src/transform/src/reduce_reduction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Breaks complex `Reduce` variants into a join of simpler variants.
//!
//! Specifically, any `Reduce` that contains two different "types" of aggregation,
//! in the sense of `ReductionType`, will be broken in to one `Reduce` for each
//! type of aggregation, each containing the aggregations of that type,
//! and the results are then joined back together.

use crate::TransformCtx;
use mz_compute_types::plan::reduce::reduction_type;
use mz_expr::MirRelationExpr;

/// Breaks complex `Reduce` variants into a join of simpler variants.
#[derive(Debug)]
pub struct ReduceReduction;

impl crate::Transform for ReduceReduction {
/// Transforms an expression through accumulated knowledge.
#[mz_ore::instrument(
target = "optimizer",
level = "debug",
fields(path.segment = "reduce_reduction")
)]
fn transform(
&self,
relation: &mut MirRelationExpr,
ctx: &mut TransformCtx,
) -> Result<(), crate::TransformError> {
if ctx.features.enable_reduce_reduction {
relation.visit_pre_mut(&mut Self::action);
mz_repr::explain::trace_plan(&*relation);
}
Ok(())
}
}

impl ReduceReduction {
/// Breaks complex `Reduce` variants into a join of simpler variants.
pub fn action(relation: &mut MirRelationExpr) {
if let MirRelationExpr::Reduce {
input,
group_key,
aggregates,
monotonic,
expected_group_size,
} = relation
{
// We start by segmenting the aggregates into those that should be rendered independently.
// Each element of this list is a pair of lists describing a bundle of aggregations that
// should be applied independently. Each pair of lists correspond to the aggregaties and
// the column positions in which they should appear in the output.
// Perhaps these should be lists of pairs, to ensure they align, but their subsequent use
// is as the shredded lists.
let mut segmented_aggregates: Vec<(Vec<mz_expr::AggregateExpr>, Vec<usize>)> =
Vec::new();

// Our rendering currently produces independent dataflow paths for 1. all accumulable aggregations,
// 2. all hierarchical aggregations, and 3. *each* basic aggregation.
// We'll form groups for accumulable, hierarchical, and a list of basic aggregates.
let mut accumulable = (Vec::new(), Vec::new());
let mut hierarchical = (Vec::new(), Vec::new());

use mz_compute_types::plan::reduce::ReductionType;
for (index, aggr) in aggregates.iter().enumerate() {
match reduction_type(&aggr.func) {
ReductionType::Accumulable => {
accumulable.0.push(aggr.clone());
accumulable.1.push(group_key.len() + index);
}
ReductionType::Hierarchical => {
hierarchical.0.push(aggr.clone());
hierarchical.1.push(group_key.len() + index);
}
ReductionType::Basic => segmented_aggregates
.push((vec![aggr.clone()], vec![group_key.len() + index])),
}
}

// Fold in hierarchical and accumulable aggregates.
if !hierarchical.0.is_empty() {
segmented_aggregates.push(hierarchical);
}
if !accumulable.0.is_empty() {
segmented_aggregates.push(accumulable);
}
segmented_aggregates.sort();

// Do nothing unless there are at least two distinct types of aggregations.
if segmented_aggregates.len() < 2 {
return;
}

// For each type of aggregation we'll plan the corresponding `Reduce`,
// and then join the at-least-two `Reduce` stages together.
// TODO: Perhaps we should introduce a `Let` stage rather than clone the input?
let mut reduces = Vec::with_capacity(segmented_aggregates.len());
// Track the current and intended locations of each output column.
let mut columns = Vec::new();

for (aggrs, indexes) in segmented_aggregates {
columns.extend(0..group_key.len());
columns.extend(indexes);

reduces.push(MirRelationExpr::Reduce {
input: input.clone(),
group_key: group_key.clone(),
aggregates: aggrs,
monotonic: *monotonic,
expected_group_size: *expected_group_size,
});
}

// Now build a `Join` of the reduces, on their keys, followed by a permutation of their aggregates.
// Equate all `group_key` columns in all inputs.
let mut equivalences = vec![Vec::with_capacity(reduces.len()); group_key.len()];
for column in 0..group_key.len() {
for input in 0..reduces.len() {
equivalences[column].push((input, column));
}
}

// Determine projection that puts aggregate columns in their intended locations,
// and projects away repeated key columns.
let max_column = columns.iter().max().expect("Non-empty aggregates expected");
let mut projection = Vec::with_capacity(max_column + 1);
for column in 0..max_column + 1 {
projection.push(columns.iter().position(|c| *c == column).unwrap())
}

// Now make the join.
*relation = MirRelationExpr::join(reduces, equivalences).project(projection);
}
}
}
11 changes: 8 additions & 3 deletions test/sqllogictest/attributes/mir_arity.slt
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ EXPLAIN OPTIMIZED PLAN WITH(arity) AS TEXT FOR
SELECT sum(e * f), max(f) FROM v GROUP BY mod(e, 5)
----
Explained Query:
Project (#1, #2) // { arity: 2 }
Reduce group_by=[(#0 % 5)] aggregates=[sum((#0 * #1)), max(#1)] // { arity: 3 }
ReadStorage materialize.public.v // { arity: 2 }
Project (#3, #1) // { arity: 2 }
Join on=(#0 = #2) type=differential // { arity: 4 }
ArrangeBy keys=[[#0]] // { arity: 2 }
Reduce group_by=[(#0 % 5)] aggregates=[max(#1)] // { arity: 2 }
ReadStorage materialize.public.v // { arity: 2 }
ArrangeBy keys=[[#0]] // { arity: 2 }
Reduce group_by=[(#0 % 5)] aggregates=[sum((#0 * #1))] // { arity: 2 }
ReadStorage materialize.public.v // { arity: 2 }

Source materialize.public.v

Expand Down
73 changes: 45 additions & 28 deletions test/sqllogictest/attributes/mir_column_types.slt
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,16 @@ Explained Query:
Threshold // { types: "(bigint?, text?, date?)" }
Union // { types: "(bigint?, text?, date?)" }
Negate // { types: "(bigint?, text?, date?)" }
Project (#1, #2, #0) // { types: "(bigint?, text?, date?)" }
Reduce group_by=[#2] aggregates=[sum(#0), max(#1)] // { types: "(date?, bigint?, text?)" }
ReadStorage materialize.public.t // { types: "(integer?, text?, date?)" }
Project (#3, #1, #0) // { types: "(bigint?, text?, date?)" }
Join on=(#0 = #2) type=differential // { types: "(date?, text?, date?, bigint?)" }
ArrangeBy keys=[[#0]] // { types: "(date?, text?)" }
Reduce group_by=[#1] aggregates=[max(#0)] // { types: "(date?, text?)" }
Project (#1, #2) // { types: "(text?, date?)" }
ReadStorage materialize.public.t // { types: "(integer?, text?, date?)" }
ArrangeBy keys=[[#0]] // { types: "(date?, bigint?)" }
Reduce group_by=[#1] aggregates=[sum(#0)] // { types: "(date?, bigint?)" }
Project (#0, #2) // { types: "(integer?, date?)" }
ReadStorage materialize.public.t // { types: "(integer?, text?, date?)" }
Constant // { types: "(bigint, text, date?)" }
- (1, "hello", null)

Expand Down Expand Up @@ -133,31 +140,41 @@ EXPLAIN OPTIMIZED PLAN WITH(types) AS TEXT FOR
(SELECT null::boolean as f1, 10 as f2) EXCEPT (SELECT min(f), count(*) FROM v WHERE (select d::double FROM u) = v.e GROUP BY e LIMIT 1)
----
Explained Query:
Threshold // { types: "(boolean?, bigint)" }
Union // { types: "(boolean?, bigint)" }
Negate // { types: "(boolean?, bigint)" }
TopK limit=1 // { types: "(boolean?, bigint)" }
Project (#1, #2) // { types: "(boolean?, bigint)" }
Reduce group_by=[#0] aggregates=[min(#1), count(*)] // { types: "(double precision, boolean?, bigint)" }
Project (#0, #1) // { types: "(double precision, boolean?)" }
Join on=(#0 = #2) type=differential // { types: "(double precision, boolean?, double precision)" }
ArrangeBy keys=[[#0]] // { types: "(double precision, boolean?)" }
Filter (#0) IS NOT NULL // { types: "(double precision, boolean?)" }
ReadStorage materialize.public.v // { types: "(double precision?, boolean?)" }
ArrangeBy keys=[[#0]] // { types: "(double precision?)" }
Union // { types: "(double precision?)" }
Project (#1) // { types: "(double precision?)" }
Filter (#0) IS NOT NULL // { types: "(integer, double precision?)" }
Map (integer_to_double(#0)) // { types: "(integer?, double precision?)" }
ReadStorage materialize.public.u // { types: "(integer?)" }
Map (error("more than one record produced in subquery")) // { types: "(double precision)" }
Project () // { types: "()" }
Filter (#0 > 1) // { types: "(bigint)" }
Reduce aggregates=[count(*)] // { types: "(bigint)" }
Project () // { types: "()" }
ReadStorage materialize.public.u // { types: "(integer?)" }
Constant // { types: "(boolean?, bigint)" }
- (null, 10)
Return // { types: "(boolean?, bigint)" }
Threshold // { types: "(boolean?, bigint)" }
Union // { types: "(boolean?, bigint)" }
Negate // { types: "(boolean?, bigint)" }
TopK limit=1 // { types: "(boolean?, bigint)" }
Project (#1, #3) // { types: "(boolean?, bigint)" }
Join on=(#0 = #2) type=differential // { types: "(double precision, boolean?, double precision, bigint)" }
ArrangeBy keys=[[#0]] // { types: "(double precision, boolean?)" }
Reduce group_by=[#0] aggregates=[min(#1)] // { types: "(double precision, boolean?)" }
Get l0 // { types: "(double precision, boolean?)" }
ArrangeBy keys=[[#0]] // { types: "(double precision, bigint)" }
Reduce group_by=[#0] aggregates=[count(*)] // { types: "(double precision, bigint)" }
Project (#0) // { types: "(double precision)" }
Get l0 // { types: "(double precision, boolean?)" }
Constant // { types: "(boolean?, bigint)" }
- (null, 10)
With
cte l0 =
Project (#0, #1) // { types: "(double precision, boolean?)" }
Join on=(#0 = #2) type=differential // { types: "(double precision, boolean?, double precision)" }
ArrangeBy keys=[[#0]] // { types: "(double precision, boolean?)" }
Filter (#0) IS NOT NULL // { types: "(double precision, boolean?)" }
ReadStorage materialize.public.v // { types: "(double precision?, boolean?)" }
ArrangeBy keys=[[#0]] // { types: "(double precision?)" }
Union // { types: "(double precision?)" }
Project (#1) // { types: "(double precision?)" }
Filter (#0) IS NOT NULL // { types: "(integer, double precision?)" }
Map (integer_to_double(#0)) // { types: "(integer?, double precision?)" }
ReadStorage materialize.public.u // { types: "(integer?)" }
Map (error("more than one record produced in subquery")) // { types: "(double precision)" }
Project () // { types: "()" }
Filter (#0 > 1) // { types: "(bigint)" }
Reduce aggregates=[count(*)] // { types: "(bigint)" }
Project () // { types: "()" }
ReadStorage materialize.public.u // { types: "(integer?)" }

Source materialize.public.u
Source materialize.public.v
Expand Down
32 changes: 28 additions & 4 deletions test/sqllogictest/autogenerated/all_parts_essential.slt
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,20 @@ GROUP BY 1,
2
----
Explained Query:
Project (#0..=#4, #3) // { arity: 6 }
Reduce group_by=[#1, #2] aggregates=[count(*), min(#2), min(#0)] // { arity: 5 }
Return // { arity: 6 }
Project (#0, #1, #6, #2, #3, #2) // { arity: 6 }
Join on=(#0 = #4 AND #1 = #5) type=differential // { arity: 7 }
implementation
%0[#0, #1]UKKA » %1[#0, #1]UKKA
ArrangeBy keys=[[#0, #1]] // { arity: 4 }
Reduce group_by=[#1, #2] aggregates=[min(#2), min(#0)] // { arity: 4 }
Get l0 // { arity: 3 }
ArrangeBy keys=[[#0, #1]] // { arity: 3 }
Reduce group_by=[#0, #1] aggregates=[count(*)] // { arity: 3 }
Project (#1, #2) // { arity: 2 }
Get l0 // { arity: 3 }
With
cte l0 =
Project (#0, #1, #28) // { arity: 3 }
Filter (#10 <= 1995-11-14) AND (#30 <= 12907776) AND (#10 >= 1995-04-19) AND (#30 >= #5) // { arity: 33 }
Join on=(#0 = #16 AND #17 = #25) type=delta // { arity: 33 }
Expand Down Expand Up @@ -608,8 +620,20 @@ GROUP BY 1,
2
----
Explained Query:
Project (#0, #0..=#2) // { arity: 4 }
Reduce group_by=[#1] aggregates=[min(#0), count(*)] // { arity: 3 }
Return // { arity: 4 }
Project (#0, #0, #1, #3) // { arity: 4 }
Join on=(#0 = #2) type=differential // { arity: 4 }
implementation
%0[#0]UKA » %1[#0]UKA
ArrangeBy keys=[[#0]] // { arity: 2 }
Reduce group_by=[#1] aggregates=[min(#0)] // { arity: 2 }
Get l0 // { arity: 2 }
ArrangeBy keys=[[#0]] // { arity: 2 }
Reduce group_by=[#0] aggregates=[count(*)] // { arity: 2 }
Project (#1) // { arity: 1 }
Get l0 // { arity: 2 }
With
cte l0 =
Project (#2, #3) // { arity: 2 }
Filter (#1 < #5) AND (#11 > #4) AND (#11 >= #0) // { arity: 14 }
Join on=(#3 = #6) type=delta // { arity: 14 }
Expand Down
Loading

0 comments on commit 38cce6b

Please sign in to comment.