-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
groupby_funcs.go
374 lines (333 loc) · 12.6 KB
/
groupby_funcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package norm
import (
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/opt/props/physical"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
// RemoveGroupingCols returns a new grouping private struct with the given
// columns removed from the grouping column set.
func (c *CustomFuncs) RemoveGroupingCols(
private *memo.GroupingPrivate, cols opt.ColSet,
) *memo.GroupingPrivate {
p := *private
p.GroupingCols = private.GroupingCols.Difference(cols)
return &p
}
// AppendAggCols constructs a new Aggregations operator containing the aggregate
// functions from an existing Aggregations operator plus an additional set of
// aggregate functions, one for each column in the given set. The new functions
// are of the given aggregate operator type.
func (c *CustomFuncs) AppendAggCols(
aggs memo.AggregationsExpr, aggOp opt.Operator, cols opt.ColSet,
) memo.AggregationsExpr {
outAggs := make(memo.AggregationsExpr, len(aggs)+cols.Len())
copy(outAggs, aggs)
c.makeAggCols(aggOp, cols, outAggs[len(aggs):])
return outAggs
}
// AppendAggCols2 constructs a new Aggregations operator containing the
// aggregate functions from an existing Aggregations operator plus an
// additional set of aggregate functions, one for each column in the given set.
// The new functions are of the given aggregate operator type.
func (c *CustomFuncs) AppendAggCols2(
aggs memo.AggregationsExpr,
aggOp opt.Operator,
cols opt.ColSet,
aggOp2 opt.Operator,
cols2 opt.ColSet,
) memo.AggregationsExpr {
colsLen := cols.Len()
outAggs := make(memo.AggregationsExpr, len(aggs)+colsLen+cols2.Len())
copy(outAggs, aggs)
offset := len(aggs)
c.makeAggCols(aggOp, cols, outAggs[offset:])
offset += colsLen
c.makeAggCols(aggOp2, cols2, outAggs[offset:])
return outAggs
}
// makeAggCols is a helper method that constructs a new aggregate function of
// the given operator type for each column in the given set. The resulting
// aggregates are written into outElems and outColList. As an example, for
// columns (1,2) and operator ConstAggOp, makeAggCols will set the following:
//
// outElems[0] = (ConstAggOp (Variable 1))
// outElems[1] = (ConstAggOp (Variable 2))
//
// outColList[0] = 1
// outColList[1] = 2
//
func (c *CustomFuncs) makeAggCols(
aggOp opt.Operator, cols opt.ColSet, outAggs memo.AggregationsExpr,
) {
// Append aggregate functions wrapping a Variable reference to each column.
i := 0
for id, ok := cols.Next(0); ok; id, ok = cols.Next(id + 1) {
varExpr := c.f.ConstructVariable(id)
var outAgg opt.ScalarExpr
switch aggOp {
case opt.ConstAggOp:
outAgg = c.f.ConstructConstAgg(varExpr)
case opt.AnyNotNullAggOp:
outAgg = c.f.ConstructAnyNotNullAgg(varExpr)
case opt.FirstAggOp:
outAgg = c.f.ConstructFirstAgg(varExpr)
default:
panic(errors.AssertionFailedf("unrecognized aggregate operator type: %v", log.Safe(aggOp)))
}
outAggs[i] = c.f.ConstructAggregationsItem(outAgg, id)
i++
}
}
// CanRemoveAggDistinctForKeys returns true if the given aggregate function
// where its input column, together with the grouping columns, form a key. In
// this case, the wrapper AggDistinct can be removed.
func (c *CustomFuncs) CanRemoveAggDistinctForKeys(
input memo.RelExpr, private *memo.GroupingPrivate, agg opt.ScalarExpr,
) bool {
if agg.ChildCount() == 0 {
return false
}
inputFDs := &input.Relational().FuncDeps
variable := agg.Child(0).(*memo.VariableExpr)
cols := c.AddColToSet(private.GroupingCols, variable.Col)
return inputFDs.ColsAreStrictKey(cols)
}
// ReplaceAggregationsItem returns a new list that is a copy of the given list,
// except that the given search item has been replaced by the given replace
// item. If the list contains the search item multiple times, then only the
// first instance is replaced. If the list does not contain the item, then the
// method panics.
func (c *CustomFuncs) ReplaceAggregationsItem(
aggs memo.AggregationsExpr, search *memo.AggregationsItem, replace opt.ScalarExpr,
) memo.AggregationsExpr {
newAggs := make([]memo.AggregationsItem, len(aggs))
for i := range aggs {
if search == &aggs[i] {
copy(newAggs, aggs[:i])
newAggs[i] = c.f.ConstructAggregationsItem(replace, search.Col)
copy(newAggs[i+1:], aggs[i+1:])
return newAggs
}
}
panic(errors.AssertionFailedf("item to replace is not in the list: %v", search))
}
// HasNoGroupingCols returns true if the GroupingCols in the private are empty.
func (c *CustomFuncs) HasNoGroupingCols(private *memo.GroupingPrivate) bool {
return private.GroupingCols.Empty()
}
// GroupingInputOrdering returns the Ordering in the private.
func (c *CustomFuncs) GroupingInputOrdering(private *memo.GroupingPrivate) physical.OrderingChoice {
return private.Ordering
}
// ConstructProjectionFromDistinctOn converts a DistinctOn to a projection; this
// is correct when input groupings have at most one row (i.e. the input is
// already distinct). Note that DistinctOn can only have aggregations of type
// FirstAgg or ConstAgg.
func (c *CustomFuncs) ConstructProjectionFromDistinctOn(
input memo.RelExpr, groupingCols opt.ColSet, aggs memo.AggregationsExpr,
) memo.RelExpr {
// Always pass through grouping columns.
passthrough := groupingCols.Copy()
var projections memo.ProjectionsExpr
for i := range aggs {
varExpr := memo.ExtractAggFirstVar(aggs[i].Agg)
inputCol := varExpr.Col
outputCol := aggs[i].Col
if inputCol == outputCol {
passthrough.Add(inputCol)
} else {
projections = append(projections, c.f.ConstructProjectionsItem(varExpr, aggs[i].Col))
}
}
return c.f.ConstructProject(input, projections, passthrough)
}
// AreValuesDistinct returns true if a constant Values operator input contains
// only rows that are already distinct with respect to the given grouping
// columns. The Values operator can be wrapped by Select, Project, and/or
// LeftJoin operators.
//
// If nullsAreDistinct is true, then NULL values are treated as not equal to one
// another, and therefore rows containing a NULL value in any grouping column
// are always distinct.
func (c *CustomFuncs) AreValuesDistinct(
input memo.RelExpr, groupingCols opt.ColSet, nullsAreDistinct bool,
) bool {
switch t := input.(type) {
case *memo.ValuesExpr:
return c.areRowsDistinct(t.Rows, t.Cols, groupingCols, nullsAreDistinct)
case *memo.SelectExpr:
return c.AreValuesDistinct(t.Input, groupingCols, nullsAreDistinct)
case *memo.ProjectExpr:
// Pass through call to input if grouping on passthrough columns.
if groupingCols.SubsetOf(t.Input.Relational().OutputCols) {
return c.AreValuesDistinct(t.Input, groupingCols, nullsAreDistinct)
}
case *memo.LeftJoinExpr:
// Pass through call to left input if grouping on its columns. Also,
// ensure that the left join does not cause duplication of left rows.
leftCols := t.Left.Relational().OutputCols
rightCols := t.Right.Relational().OutputCols
if !groupingCols.SubsetOf(leftCols) {
break
}
// If any set of key columns (lax or strict) from the right input are
// equality joined to columns in the left input, then the left join will
// never cause duplication of left rows.
var eqCols opt.ColSet
for i := range t.On {
condition := t.On[i].Condition
ok, _, rightColID := memo.ExtractJoinEquality(leftCols, rightCols, condition)
if ok {
eqCols.Add(rightColID)
}
}
if !t.Right.Relational().FuncDeps.ColsAreLaxKey(eqCols) {
// Not joining on a right input key.
break
}
return c.AreValuesDistinct(t.Left, groupingCols, nullsAreDistinct)
case *memo.UpsertDistinctOnExpr:
// Pass through call to input if grouping on passthrough columns.
if groupingCols.SubsetOf(t.Input.Relational().OutputCols) {
return c.AreValuesDistinct(t.Input, groupingCols, nullsAreDistinct)
}
case *memo.EnsureUpsertDistinctOnExpr:
// Pass through call to input if grouping on passthrough columns.
if groupingCols.SubsetOf(t.Input.Relational().OutputCols) {
return c.AreValuesDistinct(t.Input, groupingCols, nullsAreDistinct)
}
}
return false
}
// areRowsDistinct returns true if the given rows are unique on the given
// grouping columns. If nullsAreDistinct is true, then NULL values are treated
// as unique, and therefore a row containing a NULL value in any grouping column
// is always distinct from every other row.
func (c *CustomFuncs) areRowsDistinct(
rows memo.ScalarListExpr, cols opt.ColList, groupingCols opt.ColSet, nullsAreDistinct bool,
) bool {
var seen map[string]bool
var encoded []byte
for _, scalar := range rows {
row := scalar.(*memo.TupleExpr)
// Reset scratch bytes.
encoded = encoded[:0]
forceDistinct := false
for i, colID := range cols {
if !groupingCols.Contains(colID) {
// This is not a grouping column, so ignore.
continue
}
// Try to extract constant value from column. Call IsConstValueOp first,
// since this code doesn't handle the tuples and arrays that
// ExtractConstDatum can return.
col := row.Elems[i]
if !opt.IsConstValueOp(col) {
// At least one grouping column in at least one row is not constant,
// so can't determine whether the rows are distinct.
return false
}
datum := memo.ExtractConstDatum(col)
// If this is an UpsertDistinctOn operator, then treat NULL values as
// always distinct.
if nullsAreDistinct && datum == tree.DNull {
forceDistinct = true
break
}
// Encode the datum using the key encoding format. The encodings for
// multiple column datums are simply appended to one another.
var err error
encoded, err = sqlbase.EncodeTableKey(encoded, datum, encoding.Ascending)
if err != nil {
// Assume rows are not distinct if an encoding error occurs.
return false
}
}
if seen == nil {
seen = make(map[string]bool, len(rows))
}
// Determine whether key has already been seen.
key := string(encoded)
if _, ok := seen[key]; ok && !forceDistinct {
// Found duplicate.
return false
}
// Add the key to the seen map.
seen[key] = true
}
return true
}
// CanMergeAggs returns true if the given inner and outer AggregationsExprs can
// be replaced with a single equivalent AggregationsExpr.
func (c *CustomFuncs) CanMergeAggs(innerAggs, outerAggs memo.AggregationsExpr) bool {
// Create a mapping from the output ColumnID of each inner aggregate to its
// operator type.
innerColsToAggOps := map[opt.ColumnID]opt.Operator{}
for i := range innerAggs {
innerAgg := innerAggs[i].Agg
if !opt.IsAggregateOp(innerAgg) {
return false
}
innerColsToAggOps[innerAggs[i].Col] = innerAgg.Op()
}
for i := range outerAggs {
outerAgg := outerAggs[i].Agg
if !opt.IsAggregateOp(outerAgg) {
return false
}
if outerAgg.ChildCount() != 1 {
// There are no valid inner-outer aggregate pairs for which the ChildCount
// of the outer is not equal to one.
return false
}
input, ok := outerAgg.Child(0).(*memo.VariableExpr)
if !ok {
// The outer aggregate does not directly aggregate on a column.
return false
}
innerOp, ok := innerColsToAggOps[input.Col]
if !ok {
// This outer aggregate does not reference an inner aggregate.
return false
}
if !opt.AggregatesCanMerge(innerOp, outerAgg.Op()) {
// There is no single aggregate that can replace this pair.
return false
}
}
return true
}
// MergeAggs returns an AggregationsExpr that is equivalent to the two given
// AggregationsExprs.
func (c *CustomFuncs) MergeAggs(innerAggs, outerAggs memo.AggregationsExpr) memo.AggregationsExpr {
// Create a mapping from the output ColumnIDs of the inner aggregates to their
// indices in innerAggs.
innerColsToAggs := map[opt.ColumnID]int{}
for i := range innerAggs {
innerColsToAggs[innerAggs[i].Col] = i
}
newAggs := make(memo.AggregationsExpr, len(outerAggs))
for i := range outerAggs {
// For each outer aggregate, construct a new aggregate that takes the Agg
// field of the referenced inner aggregate and the Col field of the outer
// aggregate.
inputCol := outerAggs[i].Agg.Child(0).(*memo.VariableExpr).Col
innerAgg := innerAggs[innerColsToAggs[inputCol]].Agg
newAggs[i] = c.f.ConstructAggregationsItem(innerAgg, outerAggs[i].Col)
}
return newAggs
}