Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Jul 12, 2024
1 parent dc716c5 commit f3234ee
Show file tree
Hide file tree
Showing 88 changed files with 10,756 additions and 8,238 deletions.
121 changes: 57 additions & 64 deletions src/transform/src/reduce_reduction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@
// by the Apache License, Version 2.0.

//! Breaks complex `Reduce` variants into a join of simpler variants.
//!
//! Specifically, any `Reduce` that contains two different "types" of aggregation,
//! in the sense of `ReductionType`, will be broken in to one `Reduce` for each
//! type of aggregation, each containing the aggregations of that type,
//! and the results are then joined back together.

use crate::TransformCtx;
use mz_compute_types::plan::reduce::{reduction_type, ReductionType};
use mz_compute_types::plan::reduce::reduction_type;
use mz_expr::MirRelationExpr;

/// Fuses multiple `Filter` operators into one and deduplicates predicates.
/// Breaks complex `Reduce` variants into a join of simpler variants.
#[derive(Debug)]
pub struct ReduceReduction;

Expand Down Expand Up @@ -46,72 +51,60 @@ impl ReduceReduction {
expected_group_size,
} = relation
{
// Do nothing if `aggregates` contains 1. nothing, 2. a single aggregate, or 3. only accumulable aggregates.
// Otherwise, rip apart `aggregates` into accumulable aggregates and each other aggregate.
let mut accumulable = Vec::new();
let mut all_others = Vec::new();
for (index, aggregate) in aggregates.iter().enumerate() {
if reduction_type(&aggregate.func) == ReductionType::Accumulable {
accumulable.push((index, aggregate));
} else {
all_others.push((index, aggregate));
}
// Segment the aggregates by reduction type.
let mut segmented_aggregates = std::collections::BTreeMap::default();
for (index, aggr) in aggregates.iter().enumerate() {
let (aggrs, indexes) = segmented_aggregates
.entry(reduction_type(&aggr.func))
.or_insert_with(|| (Vec::default(), Vec::default()));
indexes.push(group_key.len() + index);
aggrs.push(aggr.clone());
}

// We only leap in to action if there are things to break apart.
if all_others.len() > 1 || (!accumulable.is_empty() && !all_others.is_empty()) {
let mut reduces = Vec::new();
for (_index, aggr) in all_others.iter() {
reduces.push(MirRelationExpr::Reduce {
input: input.clone(),
group_key: group_key.clone(),
aggregates: vec![(*aggr).clone()],
monotonic: *monotonic,
expected_group_size: *expected_group_size,
});
}
if !accumulable.is_empty() {
reduces.push(MirRelationExpr::Reduce {
input: input.clone(),
group_key: group_key.clone(),
aggregates: accumulable
.iter()
.map(|(_index, aggr)| (*aggr).clone())
.collect(),
monotonic: *monotonic,
expected_group_size: *expected_group_size,
});
}
// Now build a `Join` of the reduces, on their keys, followed by a permutation of their aggregates.
// Equate all `group_key` columns in all inputs.
let mut equivalences = vec![Vec::with_capacity(reduces.len()); group_key.len()];
for column in 0..group_key.len() {
for input in 0..reduces.len() {
equivalences[column].push((input, column));
}
}
// Project the group key and then each of the aggregates in order.
let mut projection =
Vec::with_capacity(group_key.len() + all_others.len() + accumulable.len());
projection.extend(0..group_key.len());
let mut accumulable_count = 0;
let mut all_others_count = 0;
for aggr in aggregates.iter() {
if reduction_type(&aggr.func) == ReductionType::Accumulable {
projection.push(
(group_key.len() + 1) * all_others.len()
+ group_key.len()
+ accumulable_count,
);
accumulable_count += 1;
} else {
projection.push((group_key.len() + 1) * all_others_count + group_key.len());
all_others_count += 1;
}
// Do nothing unless there are at least two distinct types of aggregations.
if segmented_aggregates.len() < 2 {
return;
}

// For each type of aggregation we'll plan the corresponding `Reduce`,
// and then join the at-least-two `Reduce` stages together.
// TODO: Perhaps we should introduce a `Let` stage rather than clone the input?
let mut reduces = Vec::with_capacity(segmented_aggregates.keys().count());
// Track the current and intended locations of each output column.
let mut columns = Vec::new();

for (_aggr_type, (aggrs, indexes)) in segmented_aggregates {
columns.extend(0..group_key.len());
columns.extend(indexes);

reduces.push(MirRelationExpr::Reduce {
input: input.clone(),
group_key: group_key.clone(),
aggregates: aggrs,
monotonic: *monotonic,
expected_group_size: *expected_group_size,
});
}

// Now build a `Join` of the reduces, on their keys, followed by a permutation of their aggregates.
// Equate all `group_key` columns in all inputs.
let mut equivalences = vec![Vec::with_capacity(reduces.len()); group_key.len()];
for column in 0..group_key.len() {
for input in 0..reduces.len() {
equivalences[column].push((input, column));
}
// Now make the join.
*relation = MirRelationExpr::join(reduces, equivalences).project(projection);
}

// Determine projection that puts aggregate columns in their intended locations,
// and projects away repeated key columns.
let max_column = columns.iter().max().expect("Non-empty aggregates expected");
let mut projection = Vec::with_capacity(max_column + 1);
for column in 0..max_column + 1 {
projection.push(columns.iter().position(|c| *c == column).unwrap())
}

// Now make the join.
*relation = MirRelationExpr::join(reduces, equivalences).project(projection);
}
}
}
36 changes: 18 additions & 18 deletions test/sqllogictest/aggregates.slt
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ query T multiline
EXPLAIN WITH(arity, join implementations) SELECT a, sum(b) from agg_pk group by a
----
Explained Query:
Project (#0, #3) // { arity: 2 }
Map (integer_to_bigint(#1)) // { arity: 4 }
Project (#0{a}, #3) // { arity: 2 }
Map (integer_to_bigint(#1{b})) // { arity: 4 }
ReadStorage materialize.public.agg_pk // { arity: 3 }

Source materialize.public.agg_pk
Expand All @@ -249,8 +249,8 @@ query T multiline
EXPLAIN WITH(arity, join implementations) SELECT a, sum(c) from agg_pk group by a
----
Explained Query:
Project (#0, #3) // { arity: 2 }
Map (bigint_to_numeric(#2)) // { arity: 4 }
Project (#0{a}, #3) // { arity: 2 }
Map (bigint_to_numeric(#2{c})) // { arity: 4 }
ReadStorage materialize.public.agg_pk // { arity: 3 }

Source materialize.public.agg_pk
Expand Down Expand Up @@ -722,23 +722,23 @@ query T multiline
EXPLAIN OPTIMIZED PLAN FOR SELECT stddev(x), sum(x) FROM t_variance;
----
Explained Query:
Return
Project (#4, #3)
Map (sqrtf64(case when ((#0) IS NULL OR (#1) IS NULL OR (case when (#2 = 0) then null else #2 end) IS NULL OR (case when (0 = (#2 - 1)) then null else (#2 - 1) end) IS NULL) then null else greatest(((#0 - ((#1 * #1) / bigint_to_double(case when (#2 = 0) then null else #2 end))) / bigint_to_double(case when (0 = (#2 - 1)) then null else (#2 - 1) end)), 0) end))
Union
Project (#0..=#2, #1)
Get l0
Map (null, null, 0, null)
Union
Negate
Project ()
Get l0
Constant
Return // { arity: 2 }
Project (#4, #3) // { arity: 2 }
Map (sqrtf64(case when ((#0{sum}) IS NULL OR (#1{sum_x}) IS NULL OR (case when (#2{count_x} = 0) then null else #2{count_x} end) IS NULL OR (case when (0 = (#2{count_x} - 1)) then null else (#2{count_x} - 1) end) IS NULL) then null else greatest(((#0{sum} - ((#1{sum_x} * #1{sum_x}) / bigint_to_double(case when (#2{count_x} = 0) then null else #2{count_x} end))) / bigint_to_double(case when (0 = (#2{count_x} - 1)) then null else (#2{count_x} - 1) end)), 0) end)) // { arity: 5 }
Union // { arity: 4 }
Project (#0{sum}..=#2{count_x}, #1{sum_x}) // { arity: 4 }
Get l0 // { arity: 3 }
Map (null, null, 0, null) // { arity: 4 }
Union // { arity: 0 }
Negate // { arity: 0 }
Project () // { arity: 0 }
Get l0 // { arity: 3 }
Constant // { arity: 0 }
- ()
With
cte l0 =
Reduce aggregates=[sum((#0 * #0)), sum(#0), count(#0)]
ReadStorage materialize.public.t_variance
Reduce aggregates=[sum((#0{x} * #0{x})), sum(#0{x}), count(#0{x})] // { arity: 3 }
ReadStorage materialize.public.t_variance // { arity: 1 }

Source materialize.public.t_variance

Expand Down
10 changes: 5 additions & 5 deletions test/sqllogictest/arithmetic.slt
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ FROM nums
----
Explained Query:
Project (#9, #10) // { arity: 2 }
Map (((#0 >> smallint_to_integer(#1)) << smallint_to_integer(#2)), ((#3 << #4) >> #5)) // { arity: 11 }
Map (((#0{x1} >> smallint_to_integer(#1{x2})) << smallint_to_integer(#2{x3})), ((#3{y1} << #4{y2}) >> #5{y3})) // { arity: 11 }
ReadStorage materialize.public.nums // { arity: 9 }

Source materialize.public.nums
Expand Down Expand Up @@ -993,7 +993,7 @@ FROM nums
----
Explained Query:
Project (#9, #10) // { arity: 2 }
Map (((#0 >> smallint_to_integer(#1)) & #2), ((#3 << #4) & #5)) // { arity: 11 }
Map (((#0{x1} >> smallint_to_integer(#1{x2})) & #2{x3}), ((#3{y1} << #4{y2}) & #5{y3})) // { arity: 11 }
ReadStorage materialize.public.nums // { arity: 9 }

Source materialize.public.nums
Expand Down Expand Up @@ -1030,7 +1030,7 @@ FROM nums
----
Explained Query:
Project (#9..=#12) // { arity: 4 }
Map (((#0 & #1) | #2), ((#3 & #4) | #5), ((#6 & #7) | #8), (integer_to_bigint((smallint_to_integer(#0) & #4)) | #8)) // { arity: 13 }
Map (((#0{x1} & #1{x2}) | #2{x3}), ((#3{y1} & #4{y2}) | #5{y3}), ((#6{z1} & #7{z2}) | #8{z3}), (integer_to_bigint((smallint_to_integer(#0{x1}) & #4{y2})) | #8{z3})) // { arity: 13 }
ReadStorage materialize.public.nums // { arity: 9 }

Source materialize.public.nums
Expand Down Expand Up @@ -1067,7 +1067,7 @@ FROM nums
----
Explained Query:
Project (#9..=#12) // { arity: 4 }
Map (((#0 # #1) & #2), ((#3 # #4) & #5), ((#6 # #7) & #8), (integer_to_bigint((smallint_to_integer(#0) # #4)) & #8)) // { arity: 13 }
Map (((#0{x1} # #1{x2}) & #2{x3}), ((#3{y1} # #4{y2}) & #5{y3}), ((#6{z1} # #7{z2}) & #8{z3}), (integer_to_bigint((smallint_to_integer(#0{x1}) # #4{y2})) & #8{z3})) // { arity: 13 }
ReadStorage materialize.public.nums // { arity: 9 }

Source materialize.public.nums
Expand Down Expand Up @@ -1104,7 +1104,7 @@ FROM nums
----
Explained Query:
Project (#9..=#12) // { arity: 4 }
Map (((#0 # #1) | #2), ((#3 # #4) | #5), ((#6 # #7) | #8), (integer_to_bigint((smallint_to_integer(#0) # #4)) | #8)) // { arity: 13 }
Map (((#0{x1} # #1{x2}) | #2{x3}), ((#3{y1} # #4{y2}) | #5{y3}), ((#6{z1} # #7{z2}) | #8{z3}), (integer_to_bigint((smallint_to_integer(#0{x1}) # #4{y2})) | #8{z3})) // { arity: 13 }
ReadStorage materialize.public.nums // { arity: 9 }

Source materialize.public.nums
Expand Down
19 changes: 10 additions & 9 deletions test/sqllogictest/boolean.slt
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ FROM x
----
Explained Query:
Project (#4..=#9, #3, #10) // { arity: 8 }
Map ((#0 != #1), (#0 = #1), (#0 >= #1), (#0 <= #1), (#0 < #1), (#0 > #1), NOT((#2 @> {}))) // { arity: 11 }
Map ((#0{a} != #1{u}), (#0{a} = #1{u}), (#0{a} >= #1{u}), (#0{a} <= #1{u}), (#0{a} < #1{u}), (#0{a} > #1{u}), NOT((#2{j} @> {}))) // { arity: 11 }
ReadStorage materialize.public.x // { arity: 4 }

Source materialize.public.x
Expand All @@ -285,7 +285,7 @@ EXPLAIN WITH(arity, join implementations) SELECT
FROM y
----
Explained Query:
Project (#0) // { arity: 1 }
Project (#0{a}) // { arity: 1 }
ReadStorage materialize.public.y // { arity: 2 }

Source materialize.public.y
Expand All @@ -302,7 +302,7 @@ FROM y
----
Explained Query:
Project (#2) // { arity: 1 }
Map ((null OR NOT(#1) OR (#1) IS NULL)) // { arity: 3 }
Map ((null OR NOT(#1{b}) OR (#1{b}) IS NULL)) // { arity: 3 }
ReadStorage materialize.public.y // { arity: 2 }

Source materialize.public.y
Expand All @@ -320,7 +320,7 @@ FROM y
----
Explained Query:
Project (#2) // { arity: 1 }
Map ((#1 AND null AND (#1) IS NOT NULL)) // { arity: 3 }
Map ((#1{b} AND null AND (#1{b}) IS NOT NULL)) // { arity: 3 }
ReadStorage materialize.public.y // { arity: 2 }

Source materialize.public.y
Expand All @@ -337,7 +337,7 @@ FROM y
----
Explained Query:
Project (#2) // { arity: 1 }
Map ((null OR (#1 AND (#1) IS NOT NULL))) // { arity: 3 }
Map ((null OR (#1{b} AND (#1{b}) IS NOT NULL))) // { arity: 3 }
ReadStorage materialize.public.y // { arity: 2 }

Source materialize.public.y
Expand All @@ -354,7 +354,7 @@ FROM y
----
Explained Query:
Project (#2) // { arity: 1 }
Map ((null AND (NOT(#1) OR (#1) IS NULL))) // { arity: 3 }
Map ((null AND (NOT(#1{b}) OR (#1{b}) IS NULL))) // { arity: 3 }
ReadStorage materialize.public.y // { arity: 2 }

Source materialize.public.y
Expand All @@ -370,7 +370,7 @@ FROM y
----
Explained Query:
Project (#2) // { arity: 1 }
Map ((NOT(#1) OR (#1) IS NULL)) // { arity: 3 }
Map ((NOT(#1{b}) OR (#1{b}) IS NULL)) // { arity: 3 }
ReadStorage materialize.public.y // { arity: 2 }

Source materialize.public.y
Expand All @@ -391,11 +391,12 @@ FROM z
WHERE CASE WHEN a > b THEN FALSE ELSE TRUE END
----
Explained Query:
Filter ((#0) IS NULL OR (#1) IS NULL OR (#0 <= #1)) // { arity: 2 }
Filter ((#0{a}) IS NULL OR (#1{b}) IS NULL OR (#0{a} <= #1{b})) // { arity: 2 }
ReadStorage materialize.public.z // { arity: 2 }

Source materialize.public.z
filter=(((#0) IS NULL OR (#1) IS NULL OR (#0 <= #1)))
filter=(((#0{a}) IS NULL OR (#1{b}) IS NULL OR (#0{a} <= #1{b})))
pushdown=(((#0{a}) IS NULL OR (#1{b}) IS NULL OR (#0{a} <= #1{b})))

Target cluster: quickstart

Expand Down
Loading

0 comments on commit f3234ee

Please sign in to comment.