diff --git a/executor/window.go b/executor/window.go index d853726b2b807..49f5efea9cf20 100644 --- a/executor/window.go +++ b/executor/window.go @@ -62,7 +62,7 @@ func (e *WindowExec) Next(ctx context.Context, chk *chunk.Chunk) error { defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }() } chk.Reset() - if e.meetNewGroup && e.remainingRowsInGroup > 0 { + if (e.executed || e.meetNewGroup) && e.remainingRowsInGroup > 0 { err := e.appendResult2Chunk(chk) if err != nil { return err @@ -88,22 +88,16 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro if err != nil { return errors.Trace(err) } - if e.meetNewGroup { + if e.meetNewGroup && e.remainingRowsInGroup > 0 { err := e.consumeGroupRows() if err != nil { return errors.Trace(err) } err = e.appendResult2Chunk(chk) - if err != nil { - return errors.Trace(err) - } + return err } e.remainingRowsInGroup++ e.groupRows = append(e.groupRows, e.inputRow) - if e.meetNewGroup { - e.inputRow = e.inputIter.Next() - return nil - } } return nil } diff --git a/executor/window_test.go b/executor/window_test.go index cfca8c62d4796..736be720cbbad 100644 --- a/executor/window_test.go +++ b/executor/window_test.go @@ -172,6 +172,10 @@ func (s *testSuite4) TestWindowFunctions(c *C) { result.Check(testkit.Rows("1 3", "2 3", "3 6", "4 6")) result = tk.MustQuery("select row_number() over w, sum(b) over w from t window w as (rows between 1 preceding and 1 following)") result.Check(testkit.Rows("1 3", "2 4", "3 5", "4 3")) + + tk.Se.GetSessionVars().MaxChunkSize = 1 + result = tk.MustQuery("select a, row_number() over (partition by a) from t") + result.Check(testkit.Rows("1 1", "1 2", "2 1", "2 2")) } func (s *testSuite4) TestWindowFunctionsIssue11614(c *C) { diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index 26d45b72978b2..86c6613cc2071 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -79,7 +79,7 @@ func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { // renewWithCapacity creates a new Chunk based on an existing Chunk with capacity. The newly // created Chunk has the same data schema with the old Chunk. -func renewWithCapacity(chk *Chunk, cap int) *Chunk { +func renewWithCapacity(chk *Chunk, cap, maxChunkSize int) *Chunk { newChk := new(Chunk) if chk.columns == nil { return newChk @@ -87,7 +87,7 @@ func renewWithCapacity(chk *Chunk, cap int) *Chunk { newChk.columns = renewColumns(chk.columns, cap) newChk.numVirtualRows = 0 newChk.capacity = cap - newChk.requiredRows = cap + newChk.requiredRows = maxChunkSize return newChk } @@ -98,7 +98,7 @@ func renewWithCapacity(chk *Chunk, cap int) *Chunk { // maxChunkSize: the limit for the max number of rows. func Renew(chk *Chunk, maxChunkSize int) *Chunk { newCap := reCalcCapacity(chk, maxChunkSize) - return renewWithCapacity(chk, newCap) + return renewWithCapacity(chk, newCap, maxChunkSize) } // renewColumns creates the columns of a Chunk. The capacity of the newly diff --git a/util/chunk/row.go b/util/chunk/row.go index 91a65b04a1e43..ee0bbdd2b4dae 100644 --- a/util/chunk/row.go +++ b/util/chunk/row.go @@ -225,7 +225,7 @@ func (r Row) IsNull(colIdx int) bool { // CopyConstruct creates a new row and copies this row's data into it. func (r Row) CopyConstruct() Row { - newChk := renewWithCapacity(r.c, 1) + newChk := renewWithCapacity(r.c, 1, 1) newChk.AppendRow(r) return newChk.GetRow(0) }