Skip to content

Commit

Permalink
executor: support row framed window functions (#9358)
Browse files Browse the repository at this point in the history
  • Loading branch information
alivxxx authored Feb 21, 2019
1 parent a5188a8 commit 042e410
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 26 deletions.
24 changes: 16 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1905,14 +1905,22 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec
aggDesc := aggregation.NewAggFuncDesc(b.ctx, v.WindowFuncDesc.Name, v.WindowFuncDesc.Args, false)
resultColIdx := len(v.Schema().Columns) - 1
agg := aggfuncs.Build(b.ctx, aggDesc, resultColIdx)
if agg == nil {
b.err = errors.Trace(errors.New("window evaluator only support aggregation functions without frame now"))
return nil
var processor windowProcessor
if v.Frame == nil {
processor = &aggWindowProcessor{
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
}
} else {
processor = &rowFrameWindowProcessor{
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
start: v.Frame.Start,
end: v.Frame.End,
}
}
e := &WindowExec{baseExecutor: base,
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems),
return &WindowExec{baseExecutor: base,
processor: processor,
groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems),
}
return e
}
167 changes: 149 additions & 18 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ import (
"context"
"time"

"github.com/cznic/mathutil"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

// WindowExec is the executor for window functions. Note that it only supports aggregation without frame clause now.
// WindowExec is the executor for window functions.
type WindowExec struct {
baseExecutor

Expand All @@ -32,12 +36,11 @@ type WindowExec struct {
inputRow chunk.Row
groupRows []chunk.Row
childResults []*chunk.Chunk
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
executed bool
meetNewGroup bool
remainingRowsInGroup int64
remainingRowsInGroup int
remainingRowsInChunk int
processor windowProcessor
}

// Close implements the Executor Close interface.
Expand Down Expand Up @@ -93,6 +96,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return errors.Trace(err)
}
}
e.remainingRowsInGroup++
e.groupRows = append(e.groupRows, e.inputRow)
if e.meetNewGroup {
e.inputRow = e.inputIter.Next()
Expand All @@ -102,16 +106,14 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return nil
}

func (e *WindowExec) consumeGroupRows() error {
func (e *WindowExec) consumeGroupRows() (err error) {
if len(e.groupRows) == 0 {
return nil
}
err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult)
e.groupRows, err = e.processor.consumeGroupRows(e.ctx, e.groupRows)
if err != nil {
return errors.Trace(err)
}
e.remainingRowsInGroup += int64(len(e.groupRows))
e.groupRows = e.groupRows[:0]
return nil
}

Expand Down Expand Up @@ -145,19 +147,18 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk
}

// appendResult2Chunk appends result of the window function to the result chunk.
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error {
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) (err error) {
e.copyChk(chk)
for e.remainingRowsInGroup > 0 && e.remainingRowsInChunk > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk)
if err != nil {
return err
}
e.remainingRowsInGroup--
e.remainingRowsInChunk--
remained := mathutil.Min(e.remainingRowsInChunk, e.remainingRowsInGroup)
e.groupRows, err = e.processor.appendResult2Chunk(e.ctx, e.groupRows, chk, remained)
if err != nil {
return err
}
e.remainingRowsInGroup -= remained
e.remainingRowsInChunk -= remained
if e.remainingRowsInGroup == 0 {
e.windowFunc.ResetPartialResult(e.partialResult)
e.processor.resetPartialResult()
e.groupRows = e.groupRows[:0]
}
return nil
}
Expand All @@ -174,3 +175,133 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) {
chk.MakeRefTo(i, childResult, col.Index)
}
}

// windowProcessor is the interface for processing different kinds of windows.
type windowProcessor interface {
// consumeGroupRows updates the result for an window function using the input rows
// which belong to the same partition.
consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error)
// appendResult2Chunk appends the final results to chunk.
// It is called when there are no more rows in current partition.
appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error)
// resetPartialResult resets the partial result to the original state for a specific window function.
resetPartialResult()
}

type aggWindowProcessor struct {
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
}

func (p *aggWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error) {
err := p.windowFunc.UpdatePartialResult(ctx, rows, p.partialResult)
rows = rows[:0]
return rows, err
}

func (p *aggWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error) {
for remained > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk)
if err != nil {
return rows, err
}
remained--
}
return rows, nil
}

func (p *aggWindowProcessor) resetPartialResult() {
p.windowFunc.ResetPartialResult(p.partialResult)
}

type rowFrameWindowProcessor struct {
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
start *core.FrameBound
end *core.FrameBound
curRowIdx uint64
}

func (p *rowFrameWindowProcessor) getStartOffset(numRows uint64) uint64 {
if p.start.UnBounded {
return 0
}
switch p.start.Type {
case ast.Preceding:
if p.curRowIdx >= p.start.Num {
return p.curRowIdx - p.start.Num
}
return 0
case ast.Following:
offset := p.curRowIdx + p.start.Num
if offset >= numRows {
return numRows
}
return offset
case ast.CurrentRow:
return p.curRowIdx
}
// It will never reach here.
return 0
}

func (p *rowFrameWindowProcessor) getEndOffset(numRows uint64) uint64 {
if p.end.UnBounded {
return numRows
}
switch p.end.Type {
case ast.Preceding:
if p.curRowIdx >= p.end.Num {
return p.curRowIdx - p.end.Num + 1
}
return 0
case ast.Following:
offset := p.curRowIdx + p.end.Num
if offset >= numRows {
return numRows
}
return offset + 1
case ast.CurrentRow:
return p.curRowIdx + 1
}
// It will never reach here.
return 0
}

func (p *rowFrameWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error) {
return rows, nil
}

// TODO: We can optimize it using sliding window algorithm.
func (p *rowFrameWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error) {
numRows := uint64(len(rows))
for remained > 0 {
start := p.getStartOffset(numRows)
end := p.getEndOffset(numRows)
p.curRowIdx++
remained--
if start >= end {
err := p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk)
if err != nil {
return nil, err
}
continue
}
err := p.windowFunc.UpdatePartialResult(ctx, rows[start:end], p.partialResult)
if err != nil {
return nil, err
}
err = p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk)
if err != nil {
return nil, err
}
p.windowFunc.ResetPartialResult(p.partialResult)
}
return rows, nil
}

func (p *rowFrameWindowProcessor) resetPartialResult() {
p.windowFunc.ResetPartialResult(p.partialResult)
p.curRowIdx = 0
}
7 changes: 7 additions & 0 deletions executor/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ func (s *testSuite2) TestWindowFunctions(c *C) {
result.Check(testkit.Rows("1 1", "4 2", "2 3"))
result = tk.MustQuery("select a, row_number() over(partition by a) from t")
result.Check(testkit.Rows("1 1", "2 1", "4 1"))

result = tk.MustQuery("select a, sum(a) over(rows between unbounded preceding and 1 following) from t")
result.Check(testkit.Rows("1 5", "4 7", "2 7"))
result = tk.MustQuery("select a, sum(a) over(rows between 1 preceding and 1 following) from t")
result.Check(testkit.Rows("1 5", "4 7", "2 6"))
result = tk.MustQuery("select a, sum(a) over(rows between unbounded preceding and 1 preceding) from t")
result.Check(testkit.Rows("1 <nil>", "4 1", "2 5"))
}

0 comments on commit 042e410

Please sign in to comment.