Skip to content

Commit

Permalink
colexec: implement lag and lead window functions in vectorized engine
Browse files Browse the repository at this point in the history
`lag` and `lead` are window functions that return an expression
evaluated at some row `offset` rows from the current row, defaulting
to `default` if such a row does not exist in the partition.

This patch provides vectorized implementations of the `lag` and `lead`
window functions. The logic to buffer each partition into a queue is
now separate from calculation of the window function itself so that
`lag`, `lead`, and `ntile` (and others) can share common logic. As
before, each batch is emitted as soon as its output column has been
entirely filled.

See cockroachdb#37035

Release note (sql change): the vectorized engine now supports the lag
and lead window functions.
  • Loading branch information
DrewKimball committed May 27, 2021
1 parent 408deef commit 656d50e
Show file tree
Hide file tree
Showing 17 changed files with 5,367 additions and 56 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,8 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/colexecsel/default_cmp_sel_ops.eg.go \
pkg/sql/colexec/colexecsel/selection_ops.eg.go \
pkg/sql/colexec/colexecsel/sel_like_ops.eg.go \
pkg/sql/colexec/colexecwindow/lag.eg.go \
pkg/sql/colexec/colexecwindow/lead.eg.go \
pkg/sql/colexec/colexecwindow/ntile.eg.go \
pkg/sql/colexec/colexecwindow/rank.eg.go \
pkg/sql/colexec/colexecwindow/relative_rank.eg.go \
Expand Down
1 change: 1 addition & 0 deletions pkg/col/coldata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_test(
deps = [
"//pkg/col/coldatatestutils",
"//pkg/sql/colconv",
"//pkg/sql/randgen",
"//pkg/sql/types",
"//pkg/testutils/buildutil",
"//pkg/util/leaktest",
Expand Down
7 changes: 5 additions & 2 deletions pkg/col/coldata/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestToArrowSerializationFormat(t *testing.T) {
rng, _ := randutil.NewPseudoRand()
nullChance := 0.2
maxStringLength := 10
numElements := rng.Intn(BatchSize())
numElements := 1 + rng.Intn(BatchSize())

b := NewBytes(numElements)
for i := 0; i < numElements; i++ {
Expand All @@ -485,7 +485,10 @@ func TestToArrowSerializationFormat(t *testing.T) {
b.UpdateOffsetsToBeNonDecreasing(numElements)

startIdx := rng.Intn(numElements)
endIdx := 1 + startIdx + rng.Intn(numElements-startIdx-1)
endIdx := startIdx + rng.Intn(numElements-startIdx)
if endIdx == startIdx {
endIdx++
}
wind := b.Window(startIdx, endIdx)

data, offsets := wind.ToArrowSerializationFormat(wind.Len())
Expand Down
61 changes: 50 additions & 11 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,8 +1191,9 @@ func NewColOperator(
argIdxs := make([]int, len(wf.ArgsIdxs))
for i, idx := range wf.ArgsIdxs {
// Retrieve the type of each argument and perform any necessary casting.
expectedType := colexecwindow.GetWindowFnArgType(*wf.Func.WindowFunc, i)
if !expectedType.Identical(typs[idx]) {
needsCast, expectedType := colexecwindow.WindowFnArgNeedsCast(
*wf.Func.WindowFunc, typs[idx], i)
if needsCast {
// We must cast to the expected argument type.
castIdx := len(typs)
input, err = colexecbase.GetCastOperator(
Expand Down Expand Up @@ -1272,10 +1273,8 @@ func NewColOperator(
// making sure that we stay within the memory limit, and
// they will fall back to disk if necessary.
opName := opNamePrefix + "relative-rank"
unlimitedAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, opName, spec.ProcessorID), factory,
)
diskAcc := result.createDiskAccount(ctx, flowCtx, opName, spec.ProcessorID)
unlimitedAllocator, diskAcc := result.getDiskBackedWindowFnFields(
ctx, opName, flowCtx, spec.ProcessorID, factory)
result.Root, err = colexecwindow.NewRelativeRankOperator(
unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx), args.DiskQueueCfg,
args.FDSemaphore, input, typs, windowFn, wf.Ordering.Columns,
Expand All @@ -1293,13 +1292,37 @@ func NewColOperator(
// making sure that we stay within the memory limit, and
// they will fall back to disk if necessary.
opName := opNamePrefix + "ntile"
unlimitedAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, opName, spec.ProcessorID), factory,
)
diskAcc := result.createDiskAccount(ctx, flowCtx, opName, spec.ProcessorID)
unlimitedAllocator, diskAcc := result.getDiskBackedWindowFnFields(
ctx, opName, flowCtx, spec.ProcessorID, factory)
result.Root = colexecwindow.NewNTileOperator(
unlimitedAllocator, execinfra.GetWorkMemLimit(flowCtx), args.DiskQueueCfg,
args.FDSemaphore, input, typs, outputIdx, partitionColIdx, argIdxs[0], diskAcc,
args.FDSemaphore, diskAcc, input, typs, outputIdx, partitionColIdx, argIdxs[0],
)
case execinfrapb.WindowerSpec_LAG:
opName := opNamePrefix + "lag"
unlimitedAllocator, diskAcc := result.getDiskBackedWindowFnFields(
ctx, opName, flowCtx, spec.ProcessorID, factory)
// Lag operators need an extra allocator.
bufferAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, opName, spec.ProcessorID), factory,
)
result.Root, err = colexecwindow.NewLagOperator(
unlimitedAllocator, bufferAllocator, execinfra.GetWorkMemLimit(flowCtx),
args.DiskQueueCfg, args.FDSemaphore, diskAcc, input, typs,
outputIdx, partitionColIdx, argIdxs[0], argIdxs[1], argIdxs[2],
)
case execinfrapb.WindowerSpec_LEAD:
opName := opNamePrefix + "lead"
unlimitedAllocator, diskAcc := result.getDiskBackedWindowFnFields(
ctx, opName, flowCtx, spec.ProcessorID, factory)
// Lead operators need an extra allocator.
bufferAllocator := colmem.NewAllocator(
ctx, result.createBufferingUnlimitedMemAccount(ctx, flowCtx, opName, spec.ProcessorID), factory,
)
result.Root, err = colexecwindow.NewLeadOperator(
unlimitedAllocator, bufferAllocator, execinfra.GetWorkMemLimit(flowCtx),
args.DiskQueueCfg, args.FDSemaphore, diskAcc, input, typs,
outputIdx, partitionColIdx, argIdxs[0], argIdxs[1], argIdxs[2],
)
default:
return r, errors.AssertionFailedf("window function %s is not supported", wf.String())
Expand Down Expand Up @@ -1649,6 +1672,22 @@ func (r opResult) updateWithPostProcessResult(ppr postProcessResult) {
copy(r.ColumnTypes, ppr.ColumnTypes)
}

// getDiskBackedWindowFnFields constructs fields common to all disk-backed
// buffering window operators.
func (r opResult) getDiskBackedWindowFnFields(
ctx context.Context,
opName string,
flowCtx *execinfra.FlowCtx,
processorID int32,
factory coldata.ColumnFactory,
) (*colmem.Allocator, *mon.BoundAccount) {
unlimitedAllocator := colmem.NewAllocator(
ctx, r.createBufferingUnlimitedMemAccount(ctx, flowCtx, opName, processorID), factory,
)
diskAcc := r.createDiskAccount(ctx, flowCtx, opName, processorID)
return unlimitedAllocator, diskAcc
}

// planFilterExpr creates all operators to implement filter expression.
func planFilterExpr(
ctx context.Context,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/colexecutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"cancel_checker.go",
"deselector.go",
"operator.go",
"spilling_buffer.go",
"spilling_queue.go",
"utils.go",
],
Expand Down Expand Up @@ -37,6 +38,7 @@ go_test(
"dep_test.go",
"deselector_test.go",
"main_test.go",
"spilling_buffer_test.go",
"spilling_queue_test.go",
],
embed = [":colexecutils"],
Expand All @@ -51,6 +53,7 @@ go_test(
"//pkg/sql/colexecop",
"//pkg/sql/colmem",
"//pkg/sql/execinfra",
"//pkg/sql/randgen",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils/buildutil",
Expand Down
Loading

0 comments on commit 656d50e

Please sign in to comment.