Skip to content

Commit

Permalink
Merge #42137
Browse files Browse the repository at this point in the history
42137: colexec: add support for percent_rank and cume_dist window functions r=yuzefovich a=yuzefovich

**colexec: add window peer grouper**

This commit adds an operator that populates a boolean column to signify
whether the corresponding tuple is the start of a new peer group. Peers
are such tuples that belong to the same partition and are equal on the
ordering columns. Some window functions must return the same output for
all peers in the peer group. Currently this operator is not used.

Release note: None

**colexec: minor refactor of rank and denseRank operators**

Previously, rank and denseRank operators contained the logic to figure
out the boundaries of the peer groups, but now that we have a window
peer grouper, it is no longer necessary which simplified the code a
little bit.

Release note: None

**colexec: add support for PERCENT_RANK window function**

This commit adds the support for PERCENT_RANK window function. This
function differs from two other rank variances in that it needs to know
the number of tuples in the partition. If there is no PARTITION BY
clause, then we have no other choice but to buffer the input fully. If
PARTITION BY clause is present, we need to buffer all tuples that belong
to each partition before we can populate the output. However, for
simplicity, the current implementation of the operator with PARTITION BY
clause also fully buffers the whole input before emitting any output.

This commit also adds a couple of "vec-on" configs to 'window' logic
test. This will increase the test coverage of window functions supported
by the vectorized engine.

Addresses: #37035.

Release note: None

**colexec: add support for cume_dist window function**

This commit adds support for CUME_DIST window function. This function is
quite similar to PERCENT_RANK, so it reuses the same template.
"percent_rank" things have been renamed to "relative_rank" things.

This commit also enables running of `row_number`, `rank`, and
`dense_rank` with `vectorize=auto`. The reason is that these window
functions are streaming and internally they might a sorter which can
fallback to disk if necessary.

Addresses: #37035.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed Mar 3, 2020
2 parents cf8cd92 + f91d902 commit c898b2d
Show file tree
Hide file tree
Showing 16 changed files with 962 additions and 210 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/proj_non_const_ops.eg.go \
pkg/sql/colexec/quicksort.eg.go \
pkg/sql/colexec/rank.eg.go \
pkg/sql/colexec/relative_rank.eg.go \
pkg/sql/colexec/row_number.eg.go \
pkg/sql/colexec/rowstovec.eg.go \
pkg/sql/colexec/selection_ops.eg.go \
Expand All @@ -833,7 +834,8 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/substring.eg.go \
pkg/sql/colexec/sum_agg.eg.go \
pkg/sql/colexec/tuples_differ.eg.go \
pkg/sql/colexec/vec_comparators.eg.go
pkg/sql/colexec/vec_comparators.eg.go \
pkg/sql/colexec/window_peer_grouper.eg.go

execgen-exclusions = $(addprefix -not -path ,$(EXECGEN_TARGETS))

Expand Down Expand Up @@ -1521,6 +1523,7 @@ pkg/sql/colexec/proj_const_right_ops.eg.go: pkg/sql/colexec/proj_const_ops_tmpl.
pkg/sql/colexec/proj_non_const_ops.eg.go: pkg/sql/colexec/proj_non_const_ops_tmpl.go
pkg/sql/colexec/quicksort.eg.go: pkg/sql/colexec/quicksort_tmpl.go
pkg/sql/colexec/rank.eg.go: pkg/sql/colexec/rank_tmpl.go
pkg/sql/colexec/relative_rank.eg.go: pkg/sql/colexec/relative_rank_tmpl.go
pkg/sql/colexec/row_number.eg.go: pkg/sql/colexec/row_number_tmpl.go
pkg/sql/colexec/rowstovec.eg.go: pkg/sql/colexec/rowstovec_tmpl.go
pkg/sql/colexec/select_in.eg.go: pkg/sql/colexec/select_in_tmpl.go
Expand All @@ -1530,6 +1533,7 @@ pkg/sql/colexec/substring.eg.go: pkg/sql/colexec/substring_tmpl.go
pkg/sql/colexec/sum_agg.eg.go: pkg/sql/colexec/sum_agg_tmpl.go
pkg/sql/colexec/tuples_differ.eg.go: pkg/sql/colexec/tuples_differ_tmpl.go
pkg/sql/colexec/vec_comparators.eg.go: pkg/sql/colexec/vec_comparators_tmpl.go
pkg/sql/colexec/window_peer_grouper.eg.go: pkg/sql/colexec/window_peer_grouper_tmpl.go

$(EXECGEN_TARGETS): bin/execgen
execgen $@
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colexec/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ proj_const_right_ops.eg.go
proj_non_const_ops.eg.go
quicksort.eg.go
rank.eg.go
relative_rank.eg.go
row_number.eg.go
rowstovec.eg.go
selection_ops.eg.go
Expand All @@ -35,3 +36,4 @@ substring.eg.go
sum_agg.eg.go
tuples_differ.eg.go
vec_comparators.eg.go
window_peer_grouper.eg.go
59 changes: 59 additions & 0 deletions pkg/sql/colexec/execgen/cmd/execgen/relative_rank_gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"io"
"io/ioutil"
"strings"
"text/template"
)

type relativeRankTmplInfo struct {
IsPercentRank bool
IsCumeDist bool
HasPartition bool
String string
}

func genRelativeRankOps(wr io.Writer) error {
d, err := ioutil.ReadFile("pkg/sql/colexec/relative_rank_tmpl.go")
if err != nil {
return err
}

s := string(d)

s = strings.Replace(s, "_RELATIVE_RANK_STRING", "{{.String}}", -1)

computeNumPeersRe := makeFunctionRegex("_COMPUTE_NUM_PEERS", 0)
s = computeNumPeersRe.ReplaceAllString(s, `{{template "computeNumPeers"}}`)
computeCumeDistRe := makeFunctionRegex("_COMPUTE_CUME_DIST", 0)
s = computeCumeDistRe.ReplaceAllString(s, `{{template "computeCumeDist"}}`)

// Now, generate the op, from the template.
tmpl, err := template.New("relative_rank_op").Parse(s)
if err != nil {
return err
}

relativeRankTmplInfos := []relativeRankTmplInfo{
{IsPercentRank: true, HasPartition: false, String: "percentRankNoPartition"},
{IsPercentRank: true, HasPartition: true, String: "percentRankWithPartition"},
{IsCumeDist: true, HasPartition: false, String: "cumeDistNoPartition"},
{IsCumeDist: true, HasPartition: true, String: "cumeDistWithPartition"},
}
return tmpl.Execute(wr, relativeRankTmplInfos)
}

func init() {
registerGenerator(genRelativeRankOps, "relative_rank.eg.go")
}
52 changes: 52 additions & 0 deletions pkg/sql/colexec/execgen/cmd/execgen/window_peer_grouper_gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"io"
"io/ioutil"
"strings"
"text/template"
)

type windowPeerGrouperTmplInfo struct {
AllPeers bool
HasPartition bool
String string
}

func genWindowPeerGrouperOps(wr io.Writer) error {
d, err := ioutil.ReadFile("pkg/sql/colexec/window_peer_grouper_tmpl.go")
if err != nil {
return err
}

s := string(d)
s = strings.Replace(s, "_PEER_GROUPER_STRING", "{{.String}}", -1)

// Now, generate the op, from the template.
tmpl, err := template.New("peer_grouper_op").Parse(s)
if err != nil {
return err
}

windowPeerGrouperTmplInfos := []windowPeerGrouperTmplInfo{
{AllPeers: false, HasPartition: false, String: "windowPeerGrouperNoPartition"},
{AllPeers: false, HasPartition: true, String: "windowPeerGrouperWithPartition"},
{AllPeers: true, HasPartition: false, String: "windowPeerGrouperAllPeersNoPartition"},
{AllPeers: true, HasPartition: true, String: "windowPeerGrouperAllPeersWithPartition"},
}
return tmpl.Execute(wr, windowPeerGrouperTmplInfos)
}

func init() {
registerGenerator(genWindowPeerGrouperOps, "window_peer_grouper.eg.go")
}
96 changes: 60 additions & 36 deletions pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,7 @@ func isSupported(spec *execinfrapb.ProcessorSpec) (bool, error) {
}
}

switch *wf.Func.WindowFunc {
case execinfrapb.WindowerSpec_ROW_NUMBER:
case execinfrapb.WindowerSpec_RANK:
case execinfrapb.WindowerSpec_DENSE_RANK:
default:
if _, supported := SupportedWindowFns[*wf.Func.WindowFunc]; !supported {
return false, errors.Newf("window function %s is not supported", wf.String())
}
return true, nil
Expand Down Expand Up @@ -901,20 +897,17 @@ func NewColOperator(
if err != nil {
return result, err
}
tempPartitionColOffset, partitionColIdx := 0, -1
tempColOffset, partitionColIdx := uint32(0), columnOmitted
peersColIdx := columnOmitted
memMonitorsPrefix := "window-"
windowFn := *wf.Func.WindowFunc
if len(core.Windower.PartitionBy) > 0 {
// TODO(yuzefovich): add support for hashing partitioner (probably by
// leveraging hash routers once we can distribute). The decision about
// which kind of partitioner to use should come from the optimizer.
windowSortingPartitionerMemAccount := streamingMemAccount
if !useStreamingMemAccountForBuffering {
windowSortingPartitionerMemAccount = result.createBufferingMemAccount(
ctx, flowCtx, memMonitorsPrefix+"sorting-partitioner",
)
}
partitionColIdx = int(wf.OutputColIdx)
input, err = NewWindowSortingPartitioner(
NewAllocator(ctx, windowSortingPartitionerMemAccount), input, typs,
NewAllocator(ctx, streamingMemAccount), input, typs,
core.Windower.PartitionBy, wf.Ordering.Columns, int(wf.OutputColIdx),
func(input Operator, inputTypes []coltypes.T, orderingCols []execinfrapb.Ordering_Column) (Operator, error) {
return result.createDiskBackedSort(
Expand All @@ -924,7 +917,9 @@ func NewColOperator(
&execinfrapb.PostProcessSpec{}, memMonitorsPrefix)
},
)
tempPartitionColOffset, partitionColIdx = 1, int(wf.OutputColIdx)
// Window partitioner will append a boolean column.
tempColOffset++
typs = append(typs, coltypes.Bool)
} else {
if len(wf.Ordering.Columns) > 0 {
input, err = result.createDiskBackedSort(
Expand All @@ -936,37 +931,66 @@ func NewColOperator(
if err != nil {
return result, err
}

orderingCols := make([]uint32, len(wf.Ordering.Columns))
for i, col := range wf.Ordering.Columns {
orderingCols[i] = col.ColIdx
if windowFnNeedsPeersInfo(*wf.Func.WindowFunc) {
peersColIdx = int(wf.OutputColIdx + tempColOffset)
input, err = NewWindowPeerGrouper(
NewAllocator(ctx, streamingMemAccount),
input, typs, wf.Ordering.Columns,
partitionColIdx, peersColIdx,
)
// Window peer grouper will append a boolean column.
tempColOffset++
typs = append(typs, coltypes.Bool)
}
switch *wf.Func.WindowFunc {

switch windowFn {
case execinfrapb.WindowerSpec_ROW_NUMBER:
result.Op = NewRowNumberOperator(NewAllocator(ctx, streamingMemAccount), input, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx)
case execinfrapb.WindowerSpec_RANK:
result.Op, err = NewRankOperator(NewAllocator(ctx, streamingMemAccount), input, typs, false /* dense */, orderingCols, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx)
case execinfrapb.WindowerSpec_DENSE_RANK:
result.Op, err = NewRankOperator(NewAllocator(ctx, streamingMemAccount), input, typs, true /* dense */, orderingCols, int(wf.OutputColIdx)+tempPartitionColOffset, partitionColIdx)
result.Op = NewRowNumberOperator(
NewAllocator(ctx, streamingMemAccount), input, int(wf.OutputColIdx+tempColOffset), partitionColIdx,
)
case execinfrapb.WindowerSpec_RANK, execinfrapb.WindowerSpec_DENSE_RANK:
result.Op, err = NewRankOperator(
NewAllocator(ctx, streamingMemAccount), input, windowFn, wf.Ordering.Columns,
int(wf.OutputColIdx+tempColOffset), partitionColIdx, peersColIdx,
)
case execinfrapb.WindowerSpec_PERCENT_RANK, execinfrapb.WindowerSpec_CUME_DIST:
memAccount := streamingMemAccount
if !useStreamingMemAccountForBuffering {
memAccount = result.createBufferingMemAccount(ctx, flowCtx, "relative-rank")
}
result.Op, err = NewRelativeRankOperator(
NewAllocator(ctx, memAccount), input, typs, windowFn, wf.Ordering.Columns,
int(wf.OutputColIdx+tempColOffset), partitionColIdx, peersColIdx,
)
default:
return result, errors.AssertionFailedf("window function %s is not supported", wf.String())
}

if partitionColIdx != -1 {
// Window partitioner will append a temporary column to the batch which
// we want to project out.
projection := make([]uint32, 0, wf.OutputColIdx+1)
if tempColOffset > 0 {
// We want to project out temporary columns (which have indices in the
// range [wf.OutputColIdx, wf.OutputColIdx+tempColOffset)).
projection := make([]uint32, 0, wf.OutputColIdx+tempColOffset)
for i := uint32(0); i < wf.OutputColIdx; i++ {
projection = append(projection, i)
}
projection = append(projection, wf.OutputColIdx+1)
result.Op = NewSimpleProjectOp(result.Op, int(wf.OutputColIdx+1), projection)
projection = append(projection, wf.OutputColIdx+tempColOffset)
result.Op = NewSimpleProjectOp(result.Op, int(wf.OutputColIdx+tempColOffset), projection)
}

result.ColumnTypes = append(spec.Input[0].ColumnTypes, *types.Int)
// Window functions can run in auto mode because they are streaming
// operators and internally they use a sorter which can fall back to disk
// if needed.
// TODO(yuzefovich): currently disabled.
// result.CanRunInAutoMode = true
_, returnType, err := execinfrapb.GetWindowFunctionInfo(wf.Func, []types.T{}...)
if err != nil {
return result, err
}
result.ColumnTypes = append(spec.Input[0].ColumnTypes, *returnType)
if windowFn != execinfrapb.WindowerSpec_PERCENT_RANK &&
windowFn != execinfrapb.WindowerSpec_CUME_DIST {
// Window functions can run in auto mode because they are streaming
// operators (except for percent_rank and cume_dist) and internally
// they might use a sorter which can fall back to disk if needed.
result.CanRunInAutoMode = true
// TODO(yuzefovich): add spilling to disk for percent_rank and
// cume_dist.
}

default:
return result, errors.Newf("unsupported processor core %q", core)
Expand Down
Loading

0 comments on commit c898b2d

Please sign in to comment.