Skip to content

Commit

Permalink
executor: fix wrong partition boundary for window functions (#11637) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway authored and ngaut committed Aug 22, 2019
1 parent 2fcfc73 commit bcd1d44
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 13 deletions.
12 changes: 3 additions & 9 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error {
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.meetNewGroup && e.remainingRowsInGroup > 0 {
if (e.executed || e.meetNewGroup) && e.remainingRowsInGroup > 0 {
err := e.appendResult2Chunk(chk)
if err != nil {
return err
Expand All @@ -88,22 +88,16 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
if err != nil {
return errors.Trace(err)
}
if e.meetNewGroup {
if e.meetNewGroup && e.remainingRowsInGroup > 0 {
err := e.consumeGroupRows()
if err != nil {
return errors.Trace(err)
}
err = e.appendResult2Chunk(chk)
if err != nil {
return errors.Trace(err)
}
return err
}
e.remainingRowsInGroup++
e.groupRows = append(e.groupRows, e.inputRow)
if e.meetNewGroup {
e.inputRow = e.inputIter.Next()
return nil
}
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions executor/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ func (s *testSuite4) TestWindowFunctions(c *C) {
result.Check(testkit.Rows("1 3", "2 3", "3 6", "4 6"))
result = tk.MustQuery("select row_number() over w, sum(b) over w from t window w as (rows between 1 preceding and 1 following)")
result.Check(testkit.Rows("1 3", "2 4", "3 5", "4 3"))

tk.Se.GetSessionVars().MaxChunkSize = 1
result = tk.MustQuery("select a, row_number() over (partition by a) from t")
result.Check(testkit.Rows("1 1", "1 2", "2 1", "2 2"))
}

func (s *testSuite4) TestWindowFunctionsIssue11614(c *C) {
Expand Down
6 changes: 3 additions & 3 deletions util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk {

// renewWithCapacity creates a new Chunk based on an existing Chunk with capacity. The newly
// created Chunk has the same data schema with the old Chunk.
func renewWithCapacity(chk *Chunk, cap int) *Chunk {
func renewWithCapacity(chk *Chunk, cap, maxChunkSize int) *Chunk {
newChk := new(Chunk)
if chk.columns == nil {
return newChk
}
newChk.columns = renewColumns(chk.columns, cap)
newChk.numVirtualRows = 0
newChk.capacity = cap
newChk.requiredRows = cap
newChk.requiredRows = maxChunkSize
return newChk
}

Expand All @@ -98,7 +98,7 @@ func renewWithCapacity(chk *Chunk, cap int) *Chunk {
// maxChunkSize: the limit for the max number of rows.
func Renew(chk *Chunk, maxChunkSize int) *Chunk {
newCap := reCalcCapacity(chk, maxChunkSize)
return renewWithCapacity(chk, newCap)
return renewWithCapacity(chk, newCap, maxChunkSize)
}

// renewColumns creates the columns of a Chunk. The capacity of the newly
Expand Down
2 changes: 1 addition & 1 deletion util/chunk/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (r Row) IsNull(colIdx int) bool {

// CopyConstruct creates a new row and copies this row's data into it.
func (r Row) CopyConstruct() Row {
newChk := renewWithCapacity(r.c, 1)
newChk := renewWithCapacity(r.c, 1, 1)
newChk.AppendRow(r)
return newChk.GetRow(0)
}

0 comments on commit bcd1d44

Please sign in to comment.