Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break complex Reduce operators into simpler ones #17013

Merged
merged 9 commits into from
Nov 21, 2024

Conversation

frankmcsherry
Copy link
Contributor

@frankmcsherry frankmcsherry commented Jan 6, 2023

This is an experimental PR where we break apart Reduce operators that would be rendered with the Collation plan into atomic Reduce operators that are joined together instead. This has the potential to be (much) better than the collation plan, or .. to be worse. If we put an ArrangeBy at the end it wouldn't be worse, but it could be much better.

This PR is mostly to look at some plans and see what changes and how.

Edit: More explanation here: https://materializeinc.slack.com/archives/C02PPB50ZHS/p1673028168703109

Edit2: Slack message copy/pasted for visibility:

Other Reduce thought: We have various flavors of reduce plans, roughly three (accumulable, hierarchical, and "generic"). We collate these together with another reduce though .. we should just use a delta join. All of the constituents present arrangements of their outputs, and a delta join would use no additional memory (unlike the collation operator). Moreover, we could then push down predicates and projection and mapping and such.
Downside: delta joins don't produce an output arrangement, so it wouldn't always be the case that Reduce has an arrangement of its output. We could always make one at not much additional cost (and I think strictly less than the collation operator).

Edit 3: scope has changed to only breaking apart enough aggregates to prevent collation. Fixes https://github.com/MaterializeInc/database-issues/issues/2273

Motivation

Tips for reviewer

Checklist

Copy link
Contributor

@vmarcos vmarcos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shape of the resulting plans looks pretty good to me, where the style starts resembling a bit how planning could look like under vertical partitioning. I am not sure about splitting and rejoining min/max aggregates on the same attributes, though. So most of my questions relate to the latter point.

Comment on lines 382 to 392
Join on=(#0 = #3 AND #1 = #4) type=differential // { arity: 6 }
implementation
%1[#0, #1] » %0[#0, #1]UKKA
ArrangeBy keys=[[#0, #1]] // { arity: 3 }
Reduce group_by=[(#1 * #2), #3] aggregates=[max(#0)] // { arity: 3 }
Project (#1..=#4) // { arity: 4 }
Get l0 // { arity: 5 }
ArrangeBy keys=[[#0, #1]] // { arity: 3 }
Reduce group_by=[(#1 * #2), #3] aggregates=[max(#0)] // { arity: 3 }
Project (#0, #2..=#4) // { arity: 4 }
Get l0 // { arity: 5 }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would two max aggregates with the same group-by key now necessitate a collation? Perhaps this is one example where the join plan would be adding unnecessary overhead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and it's actually intentional. The code used to do this but a design regression lost that ability at one point. The reason you might want this is that if you are doing

SELECT
    key,
    MAX(height),
    MAX(weight),
    MAX(age),
    MAX(salary),
...
GROUP BY key

then with the join plan you keep a footprint proportional to the sum of distinct values for each of the attributes. With the non-join plan, as we currently do it, you keep a footprint proportional to the distinct products of values, which can be substantially larger (and was for the only user we ever asked).

Though, 100% doing this intentionally makes sense, but I think the join plan reduces the worst case bound, at the potential cost of the best case bound (maybe height and weight are correlated, and there aren't that many distinct pairs?).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point, and I am definitely all for ensuring good worst-case behavior, even if it costs us a bit on the best case. Does it make sense here to make a distinction between monotonic vs. non-monotonic plans? Perhaps I am missing something, but the monotonic rendering seems to be better behaved wrt. the effect you describe?

Comment on lines 205 to 211
CrossJoin type=differential
ArrangeBy keys=[[]]
Reduce aggregates=[min(#0)]
Get l0
ArrangeBy keys=[[]]
Reduce aggregates=[max(#0)]
Get l0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another instance here of two hierarchical aggregates, though in this case the cost of the join should be quite small.

@@ -3695,748 +3564,3105 @@ GROUP BY a
}
EOF

# Test Reduce::Collated (global aggregate).
# Test Reduce::Collated (with GROUP BY).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR, we'd not be rendering a collation here, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, I have no idea what this text is and did not intentionally change it.

Comment on lines 263 to 277
ArrangeBy keys=[[#0]] // { arity: 2 }
Reduce group_by=[#0] aggregates=[max(#1)] // { arity: 2 }
Get l2 // { arity: 2 }
ArrangeBy keys=[[#0]] // { arity: 2 }
Reduce group_by=[#0] aggregates=[min(#1)] // { arity: 2 }
Get l2 // { arity: 2 }
ArrangeBy keys=[[#0]] // { arity: 2 }
Reduce group_by=[#0] aggregates=[min(#1)] // { arity: 2 }
Get l3 // { arity: 2 }
ArrangeBy keys=[[#0]] // { arity: 2 }
Reduce group_by=[#0] aggregates=[max(#1)] // { arity: 2 }
Get l3 // { arity: 2 }
ArrangeBy keys=[[#0]] // { arity: 5 }
Reduce group_by=[#0] aggregates=[count(#2), sum(#2), count(#1), sum(#1)] // { arity: 5 }
Get l1 // { arity: 3 }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting case in that it would not be a bad idea to split the hierarchical aggregates that operate on different attributes, but it seems to be still better to compute min/max on the same attribute in a single aggregation as you pointed out in Slack.

@vmarcos
Copy link
Contributor

vmarcos commented Jan 10, 2023

For what it is worth, with this PR, we get on a query adapted and extended from CH-benCHmark's Q1:

materialize=> select  l_linenumber,
         sum(l_quantity) as sum_qty,
         sum(l_extendedprice) as sum_price,
         avg(l_quantity) as avg_qty,
         avg(l_extendedprice) as avg_price,
         count(*) as count_order,
         max(l_discount) as max_disc,
         min(l_discount) as min_disc,
         max(l_tax) as max_tax,
         min(l_tax) as min_tax
from     lineitem
group by l_linenumber 
order by l_linenumber;
 l_linenumber | sum_qty  |   sum_price    |      avg_qty       |     avg_price      | count_order | max_disc | min_disc | max_tax | min_tax 
--------------+----------+----------------+--------------------+--------------------+-------------+----------+----------+---------+---------
            1 | 38248246 | 57357083080.11 | 25.498830666666667 |     38238.05538674 |     1500000 |      0.1 |        0 |    0.08 |       0
            2 | 32789215 | 49179568260.61 | 25.500467403105237 | 38247.392544422735 |     1285828 |      0.1 |        0 |    0.08 |       0
            3 | 27349884 | 41020910796.75 |  25.52738208352856 |  38287.41881768052 |     1071394 |      0.1 |        0 |    0.08 |       0
            4 | 21857330 |  32777528689.6 | 25.504022683383603 | 38246.155189349076 |      857015 |      0.1 |        0 |    0.08 |       0
            5 | 16411322 | 24616149732.39 | 25.511664311574773 | 38266.201139444755 |      643287 |      0.1 |        0 |    0.08 |       0
            6 | 10937358 | 16400552024.22 | 25.490847647237047 |  38223.48806539725 |      429070 |      0.1 |        0 |    0.08 |       0
            7 |  5485440 |  8225518317.52 | 25.558729108521533 | 38325.785070053724 |      214621 |      0.1 |        0 |    0.08 |       0
(7 rows)

Time: 2816.371 ms (00:02.816)

While this is not a highly controlled run, on main, performance appears to be better:

materialize=> select  l_linenumber,
         sum(l_quantity) as sum_qty,
         sum(l_extendedprice) as sum_price,
         avg(l_quantity) as avg_qty,
         avg(l_extendedprice) as avg_price,
         count(*) as count_order,
         max(l_discount) as max_disc,
         min(l_discount) as min_disc,
         max(l_tax) as max_tax,
         min(l_tax) as min_tax
from     lineitem
group by l_linenumber 
order by l_linenumber;
 l_linenumber | sum_qty  |   sum_price    |      avg_qty       |     avg_price      | count_order | max_disc | min_disc | max_tax | min_tax 
--------------+----------+----------------+--------------------+--------------------+-------------+----------+----------+---------+---------
            1 | 38248246 | 57357083080.11 | 25.498830666666667 |     38238.05538674 |     1500000 |      0.1 |        0 |    0.08 |       0
            2 | 32789215 | 49179568260.61 | 25.500467403105237 | 38247.392544422735 |     1285828 |      0.1 |        0 |    0.08 |       0
            3 | 27349884 | 41020910796.75 |  25.52738208352856 |  38287.41881768052 |     1071394 |      0.1 |        0 |    0.08 |       0
            4 | 21857330 |  32777528689.6 | 25.504022683383603 | 38246.155189349076 |      857015 |      0.1 |        0 |    0.08 |       0
            5 | 16411322 | 24616149732.39 | 25.511664311574773 | 38266.201139444755 |      643287 |      0.1 |        0 |    0.08 |       0
            6 | 10937358 | 16400552024.22 | 25.490847647237047 |  38223.48806539725 |      429070 |      0.1 |        0 |    0.08 |       0
            7 |  5485440 |  8225518317.52 | 25.558729108521533 | 38325.785070053724 |      214621 |      0.1 |        0 |    0.08 |       0
(7 rows)

Time: 1934.263 ms (00:01.934)

@vmarcos
Copy link
Contributor

vmarcos commented Jan 10, 2023

The difference drops somewhat when we combine with only one hierarchical aggregate instead; however, it is still favorable to main. With this PR:

materialize=> select  l_linenumber,
         sum(l_quantity) as sum_qty,
         sum(l_extendedprice) as sum_price,
         avg(l_quantity) as avg_qty,
         avg(l_extendedprice) as avg_price,
         count(*) as count_order,
         max(l_discount) as max_disc
from     lineitem
group by l_linenumber 
order by l_linenumber;
 l_linenumber | sum_qty  |   sum_price    |      avg_qty       |     avg_price      | count_order | max_disc 
--------------+----------+----------------+--------------------+--------------------+-------------+----------
            1 | 38248246 | 57357083080.11 | 25.498830666666667 |     38238.05538674 |     1500000 |      0.1
            2 | 32789215 | 49179568260.61 | 25.500467403105237 | 38247.392544422735 |     1285828 |      0.1
            3 | 27349884 | 41020910796.75 |  25.52738208352856 |  38287.41881768052 |     1071394 |      0.1
            4 | 21857330 |  32777528689.6 | 25.504022683383603 | 38246.155189349076 |      857015 |      0.1
            5 | 16411322 | 24616149732.39 | 25.511664311574773 | 38266.201139444755 |      643287 |      0.1
            6 | 10937358 | 16400552024.22 | 25.490847647237047 |  38223.48806539725 |      429070 |      0.1
            7 |  5485440 |  8225518317.52 | 25.558729108521533 | 38325.785070053724 |      214621 |      0.1
(7 rows)

Time: 1627.088 ms (00:01.627)

And on main:

materialize=> select  l_linenumber,
         sum(l_quantity) as sum_qty,
         sum(l_extendedprice) as sum_price,
         avg(l_quantity) as avg_qty,
         avg(l_extendedprice) as avg_price,
         count(*) as count_order,
         max(l_discount) as max_disc
from     lineitem
group by l_linenumber 
order by l_linenumber;
 l_linenumber | sum_qty  |   sum_price    |      avg_qty       |     avg_price      | count_order | max_disc 
--------------+----------+----------------+--------------------+--------------------+-------------+----------
            1 | 38248246 | 57357083080.11 | 25.498830666666667 |     38238.05538674 |     1500000 |      0.1
            2 | 32789215 | 49179568260.61 | 25.500467403105237 | 38247.392544422735 |     1285828 |      0.1
            3 | 27349884 | 41020910796.75 |  25.52738208352856 |  38287.41881768052 |     1071394 |      0.1
            4 | 21857330 |  32777528689.6 | 25.504022683383603 | 38246.155189349076 |      857015 |      0.1
            5 | 16411322 | 24616149732.39 | 25.511664311574773 | 38266.201139444755 |      643287 |      0.1
            6 | 10937358 | 16400552024.22 | 25.490847647237047 |  38223.48806539725 |      429070 |      0.1
            7 |  5485440 |  8225518317.52 | 25.558729108521533 | 38325.785070053724 |      214621 |      0.1
(7 rows)

Time: 1379.412 ms (00:01.379)

@vmarcos
Copy link
Contributor

vmarcos commented Jan 20, 2023

I am trying to get a better feel for the performance characteristics of this PR by running variations of the first query above adapted from CH-benCHmark's Q1. For full disclosure, the numbers in my previous comments were generated with a monotonic source, so this means that we were in the happy case of aggregate computation. All numbers in the following are with 1 worker in my local development environment and I created all PK and FK indexes on the TPC-H schema with a database at scale factor 1.

With a non-monotonic source and 1 worker, we get higher absolute query processing times, though main still performs better. With this PR:

materialize=> select  l_linenumber,
         sum(l_quantity) as sum_qty,
         sum(l_extendedprice) as sum_price,
         avg(l_quantity) as avg_qty,
         avg(l_extendedprice) as avg_price,
         count(*) as count_order,
         max(l_discount) as max_disc,
         min(l_discount) as min_disc,
         max(l_tax) as max_tax,
         min(l_tax) as min_tax
from     lineitem
group by l_linenumber 
order by l_linenumber;
 l_linenumber | sum_qty  |  sum_price  |      avg_qty       |     avg_price      | count_order | max_disc | min_disc | max_tax | min_tax 
--------------+----------+-------------+--------------------+--------------------+-------------+----------+----------+---------+---------
            1 | 38255417 | 57347642187 | 25.503611333333332 |       38231.761458 |     1500000 |     0.08 |        0 |     0.1 |       0
            2 | 32744738 | 49079433329 |  25.46966056515175 |  38175.18734215023 |     1285637 |     0.08 |        0 |     0.1 |       0
            3 | 27341357 | 40979779888 | 25.496839164334546 |  38215.17918054204 |     1072343 |     0.08 |        0 |     0.1 |       0
            4 | 21874643 | 32784671314 |  25.49426821223698 | 38209.592893888504 |      858022 |     0.08 |        0 |     0.1 |       0
            5 | 16393227 | 24574702761 | 25.468292857231635 |  38178.92150194509 |      643672 |     0.08 |        0 |     0.1 |       0
            6 | 10950676 | 16420605972 | 25.504292375269582 | 38243.843182739205 |      429366 |     0.08 |        0 |     0.1 |       0
            7 |  5484325 |  8220701677 | 25.524990575302173 |  38260.55764889859 |      214861 |     0.08 |        0 |     0.1 |       0
(7 rows)

Time: 21308.044 ms (00:21.308)

On main:

materialize=> select  l_linenumber,
         sum(l_quantity) as sum_qty,
         sum(l_extendedprice) as sum_price,
         avg(l_quantity) as avg_qty,
         avg(l_extendedprice) as avg_price,
         count(*) as count_order,
         max(l_discount) as max_disc,
         min(l_discount) as min_disc,
         max(l_tax) as max_tax,
         min(l_tax) as min_tax
from     lineitem
group by l_linenumber 
order by l_linenumber;
 l_linenumber | sum_qty  |  sum_price  |      avg_qty       |     avg_price      | count_order | max_disc | min_disc | max_tax | min_tax 
--------------+----------+-------------+--------------------+--------------------+-------------+----------+----------+---------+---------
            1 | 38255417 | 57347642187 | 25.503611333333332 |       38231.761458 |     1500000 |     0.08 |        0 |     0.1 |       0
            2 | 32744738 | 49079433329 |  25.46966056515175 |  38175.18734215023 |     1285637 |     0.08 |        0 |     0.1 |       0
            3 | 27341357 | 40979779888 | 25.496839164334546 |  38215.17918054204 |     1072343 |     0.08 |        0 |     0.1 |       0
            4 | 21874643 | 32784671314 |  25.49426821223698 | 38209.592893888504 |      858022 |     0.08 |        0 |     0.1 |       0
            5 | 16393227 | 24574702761 | 25.468292857231635 |  38178.92150194509 |      643672 |     0.08 |        0 |     0.1 |       0
            6 | 10950676 | 16420605972 | 25.504292375269582 | 38243.843182739205 |      429366 |     0.08 |        0 |     0.1 |       0
            7 |  5484325 |  8220701677 | 25.524990575302173 |  38260.55764889859 |      214861 |     0.08 |        0 |     0.1 |       0
(7 rows)

Time: 12227.115 ms (00:12.227)

@vmarcos
Copy link
Contributor

vmarcos commented Jan 20, 2023

Now, let's change the query to manifest the effect pointed out in #17013 (comment), again with a non-monotonic source. The domains of l_discount and l_tax are pretty small; let's use instead l_orderkey and l_partkey.

With this PR:

materialize=> select  l_linenumber,
         sum(l_quantity) as sum_qty,
         sum(l_extendedprice) as sum_price,
         avg(l_quantity) as avg_qty,
         avg(l_extendedprice) as avg_price,
         count(*) as count_order,
         max(l_orderkey) as max_order,
         min(l_orderkey) as min_order,
         max(l_partkey) as max_part,
         min(l_partkey) as min_part
from     lineitem
group by l_linenumber 
order by l_linenumber;
 l_linenumber | sum_qty  |  sum_price  |      avg_qty       |     avg_price      | count_order | max_order | min_order | max_part | min_part 
--------------+----------+-------------+--------------------+--------------------+-------------+-----------+-----------+----------+----------
            1 | 38255417 | 57347642187 | 25.503611333333332 |       38231.761458 |     1500000 |   6000000 |         1 |   200000 |        1
            2 | 32744738 | 49079433329 |  25.46966056515175 |  38175.18734215023 |     1285637 |   6000000 |         1 |   200000 |        1
            3 | 27341357 | 40979779888 | 25.496839164334546 |  38215.17918054204 |     1072343 |   6000000 |         1 |   200000 |        1
            4 | 21874643 | 32784671314 |  25.49426821223698 | 38209.592893888504 |      858022 |   6000000 |         2 |   200000 |        1
            5 | 16393227 | 24574702761 | 25.468292857231635 |  38178.92150194509 |      643672 |   6000000 |         2 |   200000 |        1
            6 | 10950676 | 16420605972 | 25.504292375269582 | 38243.843182739205 |      429366 |   6000000 |         2 |   200000 |        1
            7 |  5484325 |  8220701677 | 25.524990575302173 |  38260.55764889859 |      214861 |   5999971 |        64 |   200000 |        1
(7 rows)

Time: 82687.735 ms (01:22.688)

With main:

materialize=> select  l_linenumber,
         sum(l_quantity) as sum_qty,
         sum(l_extendedprice) as sum_price,
         avg(l_quantity) as avg_qty,
         avg(l_extendedprice) as avg_price,
         count(*) as count_order,
         max(l_orderkey) as max_order,
         min(l_orderkey) as min_order,
         max(l_partkey) as max_part,
         min(l_partkey) as min_part
from     lineitem
group by l_linenumber 
order by l_linenumber;
 l_linenumber | sum_qty  |  sum_price  |      avg_qty       |     avg_price      | count_order | max_order | min_order | max_part | min_part 
--------------+----------+-------------+--------------------+--------------------+-------------+-----------+-----------+----------+----------
            1 | 38255417 | 57347642187 | 25.503611333333332 |       38231.761458 |     1500000 |   6000000 |         1 |   200000 |        1
            2 | 32744738 | 49079433329 |  25.46966056515175 |  38175.18734215023 |     1285637 |   6000000 |         1 |   200000 |        1
            3 | 27341357 | 40979779888 | 25.496839164334546 |  38215.17918054204 |     1072343 |   6000000 |         1 |   200000 |        1
            4 | 21874643 | 32784671314 |  25.49426821223698 | 38209.592893888504 |      858022 |   6000000 |         2 |   200000 |        1
            5 | 16393227 | 24574702761 | 25.468292857231635 |  38178.92150194509 |      643672 |   6000000 |         2 |   200000 |        1
            6 | 10950676 | 16420605972 | 25.504292375269582 | 38243.843182739205 |      429366 |   6000000 |         2 |   200000 |        1
            7 |  5484325 |  8220701677 | 25.524990575302173 |  38260.55764889859 |      214861 |   5999971 |        64 |   200000 |        1
(7 rows)

Time: 39159.046 ms (00:39.159)

I expected to see increased query processing times; however, we get the same with this PR and still consistently close to 2x slower, which is pretty puzzling.

To understand a bit better, I tried to create the query above using l_orderkey and l_partkey as an indexed view and check how many records the dataflow is keeping around. With main, we get 30,949,728 records, while with this PR, we get 73,246,205 records.

From what I can tell, the effect is coming from our hierarchical reduces being forced to maintain so many records. Since we are vertically partitioning hierarchical aggregates in this PR, we are increasing the number of arrangement hierarchies. Perhaps one less bold step would be to only break down reductions by type (more in line with a reduce collation) and not by component columns? It would still not get rid of the effect pointed out in #17013 (comment), but if IIUC this would be enough for us to get rid of the reduce collation rendering?

@frankmcsherry
Copy link
Contributor Author

frankmcsherry commented Jan 20, 2023

The reason is that in that query the number of distinct values for l_discount and l_tax is relatively small, and the number of groups is also small. There is no harm in doing the primitive aggregation done on main. I suspect if you try it with more heterogeneity in each of a larger number of groups you may see different results. For example, consider data with columns (ZIP, height, weight, age), a cross product of all reasonable heights and weights and ages in each group, and then query

SELECT ZIP
    MAX(height),
    MAX(weight),
    MAX(age),
FROM that_data
GROUP BY ZIP;

The memory footprint of this should be proportional to the sum of distinct (ZIP, height), distinct (ZIP, weight), and distinct (ZIP, age) records, rather than the distinct (ZIP, height, weight, age) records.

Edit: I responded to your first comment and haven't read the second comment yet, which seems like it may speak to this.

@frankmcsherry
Copy link
Contributor Author

frankmcsherry commented Jan 20, 2023

To do a thing I should have done with the PR, here is a scenario that demonstrates the difference between the approaches. It is meant to reveal the difference between the things, and I hope I have not screwed it up by computing two different things. :)

First, let's create some maximally heterogeneous data, unioning in a table to make sure the data aren't closed out and we can see the consequences of their dataflows once live.

CREATE TABLE foo (a int, b int, c int, d int);

CREATE MATERIALIZED VIEW test AS 
SELECT * FROM 
    generate_series(1, 100) as a, 
    generate_series(1, 100) as b, 
    generate_series(1, 100) as c, 
    generate_series(1, 100) as d
UNION ALL
SELECT * FROM foo;

At this point, on main, we can do two styles of queries that determine the same thing.

CREATE MATERIALIZED VIEW max_old AS
SELECT a, MAX(b) as b, MAX(c) as c, MAX(d) as d
FROM test
GROUP BY a;

I wanted this to work, but I bailed out at 100GB of memory on my laptop (after many minutes).

CREATE MATERIALIZED VIEW max_new AS
WITH
    b_max as (SELECT a, MAX(b) as b from test GROUP BY a),
    c_max as (SELECT a, MAX(c) as c from test GROUP BY a),
    d_max as (SELECT a, MAX(d) as d from test GROUP BY a)
SELECT b_max.a, b, c, d
FROM
    b_max, c_max, d_max
WHERE
    b_max.a = c_max.a AND b_max.a = d_max.a
;

This is currently being maintained on my laptop in 243MB of memory, some 100MB of which was already part of clusterd when I started.

@frankmcsherry
Copy link
Contributor Author

I removed the d column (bringing the records down to 1M total) and just the query times are already different:

max_old: Time: 7132.956 ms (00:07.133)
max_new: Time: 2313.396 ms (00:02.313)

The memory cost of each as a materialized view is imperceptible for max_new (from 204MB to 206MB), and over a GB for max_old (from 206MB to 1.24GB).

@vmarcos
Copy link
Contributor

vmarcos commented Jan 23, 2023

Ah, your comments clarify things a lot, thanks! My second set of measurements above did not manifest the effect because (l_orderkey, linenumber) is a key for lineitem. So this functional dependency made the size of the relation be not too far from the sums of the sizes of the projections.

Your explanation and scenario make it clear that this PR is extremely desirable to avoid terrible worst-case performance, even if it costs us some regressions in other better behaved cases.

I have a few follow-up questions, also formulated as you did on main:

  1. Consider computing MAX/MIN on the same attribute. In such a case, this PR seems to vertically partition more than desirable? For example, consider:
CREATE MATERIALIZED VIEW maxmin_old AS
SELECT a, MAX(b) as max_b, MIN(b) as min_b
FROM test
GROUP BY a;

and

CREATE MATERIALIZED VIEW maxmin_new AS
WITH
    b_max as (SELECT a, MAX(b) as max_b from test GROUP BY a),
    b_min as (SELECT a, MIN(b) as min_b from test GROUP BY a)
SELECT b_max.a, max_b, min_b
FROM
    b_max, b_min
WHERE
    b_max.a = b_min.a;

In maxmin_new, we maintain two stacks of arrangements when one would have been sufficient. I get 93,200 dataflow records for maxmin_old and 180,200 for maxmin_new. Query processing times also differ by a bit (573.509 ms vs. 629.909 ms in my machine).

  1. The PR does not currently consider the monotonic flag. Would it be desirable to vertically partition the hierarchical aggregates under monotonic rendering? I put the contents of test without column d into a Kafka topic and created a corresponding source and view:
CREATE CONNECTION kafka FOR KAFKA BROKER 'localhost:9092';

CREATE SOURCE test_src
FROM  KAFKA CONNECTION kafka (TOPIC 'test')
FORMAT CSV
WITH 3 COLUMNS
DELIMITED BY '  '
WITH (SIZE = '1');

CREATE VIEW test_m AS
SELECT
    column1::integer as a,
    column2::integer as b,
    column3::integer as c
FROM test_src;

Revising your materialized view definitions to work on test_m and without the d column, we get about 200 dataflow records maintained under monotonic rendering for max_old and 400 for max_new, and the query processing times are also better for the SQL in max_old (roughly 1411.590 ms vs 2278.332 ms in my machine).

Given that this PR is fixing a huge potential problem, I would not oppose merging it without addressing the points above (if they would make sense to address anyhow).

@frankmcsherry
Copy link
Contributor Author

because (l_orderkey, linenumber) is a key for lineitem.

Ah, that makes sense.

Consider computing MAX/MIN on the same attribute. In such a case, this PR seems to vertically partition more than desirable?

Yeah I think this is in Slack somewhere. A MinMax aggregate is almost free, because the heterogeneity of values is reduced owing to using the same expr. Seems reasonable to do the slice there rather than wait for the MinMax function to show up (it could, but the new slice should also pan out).

The PR does not currently consider the monotonic flag.

Afaiu, monotonicity gets discovered in the dataflow pre-rendering optimization, not earlier. So, there's an open question of whether this transform is a rendering hack, in which case we can await that information, or whether it would be useful as a general transform (it might inform join planning / fuse with existing joins).

I haven't figured this out, but .. it does seem like it will be hard to invert whenever we apply the transform. That's maybe a reason to do it later in the process, and just consider it a physical plan optimization rather than a clever logical plan change.

@frankmcsherry
Copy link
Contributor Author

My sense at the moment is that this perhaps all belongs in LIR planning for Reduce, as many of the nits and curious bits relate to things like monotonicity discovered late in the process, or quirks of how we end up rendering things (e.g. that MinMax could be fused together, but they shouldn't be fused with JsonbAggr even if applied to the same expr, although JsonbAgg could be fused with ListConcat because they have the same strategy, etc... We also need to mind whether the distinct bool is set, stuff like that.

@vmarcos
Copy link
Contributor

vmarcos commented Jan 23, 2023

I see... I'd defer here to someone with more optimizer ownership, perhaps @ggevay. The discussion so far indicates that some low-level planning concerns (types of aggregates, monotonicity) can help us make a better determination of situations where this transform could degrade performance (by a small factor). So it seems better suited for a planning pass closer to rendering than at the logical level. One way or another, I am favorable to having it in place since it can avoid very large performance degradations in some instances.

@ggevay
Copy link
Contributor

ggevay commented Jan 23, 2023

If monotonicity info is important, then we don't have much choice: we indeed have to put it in the LIR planning, as @frankmcsherry said above.

@frankmcsherry
Copy link
Contributor Author

Although I don't disagree about putting it in LIR, I think the thread above tells us that we can certainly make more informed decisions if the call gets made with information like unique key structure to informs us about heterogeneity (or other proxies for stats). LIR seems super-safe for the moment, though.

@ggevay
Copy link
Contributor

ggevay commented Jan 23, 2023

we can certainly make more informed decisions if the call gets made with information like unique key structure

We do have unique key info during the LIR planning, because we are looking at an MIR plan, so we can just call typ, right? Or am I misunderstanding something? (Is the quoted sentence not meant to be a counter-argument against putting it in LIR planning?)

@frankmcsherry
Copy link
Contributor Author

Historically, LIR has been a translation of things expressible in MIR into more specific implementation details. Given that the whole of this transform can be expressed in MIR, and given that MIR is where we have the type information like keys, MIR seems like a more natural home for the "optimization". LIR seems a bit like a (fine for now) place of last resort, as it isn't meant to be a place for further optimizations.

@frankmcsherry
Copy link
Contributor Author

Sorry, just to restate a take: MIR is a fine place to end up having this, as for example MIR is where we do monotonicity analysis, we just do it as part of the whole dataflow analysis as it cannot be done without sources or as_of/until information. It suggests that just having this as a transformation isn't great, as we usually apply those before we do whole dataflow optimization. But even that happens before we lower to LIR .

@mgree
Copy link
Contributor

mgree commented Jun 27, 2023

I am leaving this comment to remind myself that Frank and I talked about this as a possible place to apply cost-based decisions.

@vmarcos
Copy link
Contributor

vmarcos commented Oct 26, 2023

Reflecting on the discussion in this PR, I see the approach here tackling simultaneously two different problems:

  1. ReducePlan::Collation takes up more memory than necessary (solvable by using a delta join instead).
  2. HierarchicalPlan::Bucketed suffers when multiple uncorrelated aggregates are computed (solvable by vertical partitioning of hierarchical aggregates when neither monotonic nor having a strong correlation signal, e.g., FDs).

We can try to tackle both problems at once, as in this PR, or separate the work out on each. There might be an advantage to the latter in that: (a) A solution to problem 1 could be executed at MIR-level and a solution to problem 2 at LIR-level; (b) While both problems are important, problem 2 seems to be more serious than problem 1.

To support prioritization, I collected some data and wrote down observations in an internal document.

@frankmcsherry frankmcsherry force-pushed the reduce_reduction branch 2 times, most recently from f3234ee to 8dc8d70 Compare July 12, 2024 16:54
Project (#0, #1, #0) // { arity: 3 }
Get l1 // { arity: 2 }
Project (#0..=#2, #6, #7, #3, #4, #8, #9) // { arity: 9 }
Filter (#4 >= 0) // { arity: 10 }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this should be able to be pushed down to the first Reduce, essentially becoming HAVING SUM(#1) >= 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not pushed in MIR because that would prevent arrangement re-use, so JoinImplementation actually lifts it away, I guess.

(But I guess it's pushed during the mir-to-lir lowering into the closure that is after the corresponding join stage.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, it feels like there's maybe a tension that we want the arrangement output from Reduce, but also we know how to push filters into Reduce; does the LIR translation do that level of MFP pushdown (through the entire Join operator, and on to the next operator). It did not when I originally wrote it, iirc, and if that's still the case perhaps we want to think about pushing it down despite the join arrangement interactions.

Copy link
Contributor

@ggevay ggevay Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the filter will currently end up inside the LIR Join node.

I guess we could add a special case in JoinImplementation's implement_arrangements: If an already-arranged input is a Reduce, then there is no need to lift away the Filter, because arrangement reuse will happen even with the Filter being left there, because lowering will push the Filter into the mfp_after of the Reduce. Do I understand this correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only case we don't want to push down the filter is when it is a Get operator with an arrangement it references? If it is a bespoke ArrangeBy .. we want to push down the filter. If it is a Reduce we should push it down. If it is a Threshold or a TopK, .. I don't think we make those arrangements available .. but if we did we could push down a filter to them.

@frankmcsherry
Copy link
Contributor Author

Rebased and rewritten to only shred Reduce enough to avoid the ReducePlan::Collation variant. This means that e.g. mins and maxes remain in the same Reduce, and all the various _agg things also get one Reduce. This should be pretty close to regression free, in the sense that we would have rendered them as this, followed by a DD reduce to collate the results. Instead we use a Join which avoids two arrangements.

Still lots of opportunities for regressions, of course. There seem to be several filters not being pushed down, for example.

@frankmcsherry frankmcsherry force-pushed the reduce_reduction branch 2 times, most recently from b2be33c to 9826b00 Compare July 12, 2024 19:45
Comment on lines 324 to 330
Project (#5, #6, #0..=#4, #7) // { types: "(integer, integer, bigint, bigint, numeric, numeric, bigint, integer list)" }
Get l1 // { types: "(bigint, bigint, numeric, numeric, bigint, integer, integer, integer list)" }
Map (null, null, null, 0, null, null, 0, null) // { types: "(integer?, integer?, bigint?, bigint, numeric?, numeric?, bigint, integer list?)" }
Union // { types: "()" }
Negate // { types: "()" }
Project () // { types: "()" }
Get l0 // { types: "(integer, integer, bigint, bigint, numeric, numeric, bigint, integer list)" }
Get l1 // { types: "(bigint, bigint, numeric, numeric, bigint, integer, integer, integer list)" }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a projection pushdown failure. Or .. given that it only happens later in the pipeline, and apparent regression that gets fixed by projection pushdown potentially. The new Project (#5 ... stuff is just re-ordering the aggregate fields from the l1 = CrossJoin .. term (and potentially projecting away duplicate keys, but it is a cross join so .. no keys).

Copy link
Contributor

@ggevay ggevay Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProjectionPushdown is able to push reorderings through some operators, but not through others. Specifically, it doesn't do it for Gets, because gets keeps a BTreeSet of columns for each id, rather than some other data structure that would be able to represent a reordering.

If it's desirable to push down reorderings, I can rework how ProjectionPushdown behaves with respect to reorderings as part of fixing https://github.com/MaterializeInc/materialize/issues/17522 (which will involve a pretty big refactor of ProjectionPushdown anyway). But my feeling is that it's generally pretty hard to tell in MIR where a reordering should optimally be, because this is influenced by which operator is able to absorb it in LIR.

@ggevay
Copy link
Contributor

ggevay commented Jul 30, 2024

Kicked off a new Nightly: https://buildkite.com/materialize/nightly/builds/8868

@ggevay
Copy link
Contributor

ggevay commented Oct 29, 2024

I think this is close to merging. Could you add a feature flag (on by default) and rebase? Then we'll run a fresh Nightly, and then hopefully merge.

@antiguru antiguru self-requested a review October 29, 2024 18:01
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Copy link

shepherdlybot bot commented Nov 21, 2024

Risk Score:83 / 100 Bug Hotspots:2 Resilience Coverage:60%

Mitigations

Completing required mitigations increases Resilience Coverage.

  • (Required) Code Review 🔍 Detected
  • (Required) Feature Flag
  • (Required) Integration Test 🔍 Detected
  • (Required) Observability
  • (Required) QA Review
  • Unit Test
Risk Summary:

This pull request has a high risk score of 83, driven by predictors such as File Diffusion and Large Files. Historically, PRs with these predictors are 155% more likely to cause a bug than the repository baseline. The repository's observed bug trend remains steady, while there are two file hotspots involved in this PR.

Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity.

Bug Hotspots:
What's This?

File Percentile
../src/lib.rs 90
../statement/ddl.rs 95

@ggevay ggevay self-requested a review November 21, 2024 13:26
@ggevay
Copy link
Contributor

ggevay commented Nov 21, 2024

@ggevay
Copy link
Contributor

ggevay commented Nov 21, 2024

There is a new Testdrive failure, at session.td:18, because of the new feature flag.

Signed-off-by: Moritz Hoffmann <mh@materialize.com>
@antiguru
Copy link
Member

Pushed an update to fix sessions.td.

@jkosh44 jkosh44 removed request for a team and jkosh44 November 21, 2024 17:08
@antiguru antiguru merged commit 38cce6b into MaterializeInc:main Nov 21, 2024
80 checks passed
@def- def- mentioned this pull request Nov 22, 2024
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants