From 973b69807e6c7b3f498bf677e0a51783e3b212fc Mon Sep 17 00:00:00 2001 From: qw4990 Date: Tue, 12 Feb 2019 16:19:18 +0800 Subject: [PATCH] remove maxChunkSize from IsFull --- distsql/select_result.go | 4 ++-- distsql/stream.go | 4 ++-- util/chunk/recordbatch.go | 12 +++++++++--- util/chunk/recordbatch_test.go | 9 ++++++--- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/distsql/select_result.go b/distsql/select_result.go index 697c1aa7f4510..5080cba753b1a 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -124,7 +124,7 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { // NextBatch reads the data into batch. func (r *selectResult) NextBatch(ctx context.Context, batch *chunk.RecordBatch) error { batch.Reset() - for !batch.IsFull(r.ctx.GetSessionVars().MaxChunkSize) { + for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize { if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) { err := r.getSelectResp() if err != nil || r.selectResp == nil { @@ -178,7 +178,7 @@ func (r *selectResult) getSelectResp() error { func (r *selectResult) readRowsData(batch *chunk.RecordBatch) (err error) { rowsData := r.selectResp.Chunks[r.respChkIdx].RowsData decoder := codec.NewDecoder(batch.Chunk, r.ctx.GetSessionVars().Location()) - for !batch.IsFull(r.ctx.GetSessionVars().MaxChunkSize) && len(rowsData) > 0 { + for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize && len(rowsData) > 0 { for i := 0; i < r.rowLen; i++ { rowsData, err = decoder.DecodeOne(rowsData, i, r.fieldTypes[i]) if err != nil { diff --git a/distsql/stream.go b/distsql/stream.go index a8d9d1445a43f..a8a87a7738229 100644 --- a/distsql/stream.go +++ b/distsql/stream.go @@ -51,7 +51,7 @@ func (r *streamResult) Next(ctx context.Context, chk *chunk.Chunk) error { // NextBatch reads the data into batch. func (r *streamResult) NextBatch(ctx context.Context, batch *chunk.RecordBatch) error { batch.Reset() - for !batch.IsFull(r.ctx.GetSessionVars().MaxChunkSize) { + for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize { err := r.readDataIfNecessary(ctx) if err != nil { return errors.Trace(err) @@ -121,7 +121,7 @@ func (r *streamResult) readDataIfNecessary(ctx context.Context) error { func (r *streamResult) flushToBatch(batch *chunk.RecordBatch) (err error) { remainRowsData := r.curr.RowsData decoder := codec.NewDecoder(batch.Chunk, r.ctx.GetSessionVars().Location()) - for !batch.IsFull(r.ctx.GetSessionVars().MaxChunkSize) && len(remainRowsData) > 0 { + for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize && len(remainRowsData) > 0 { for i := 0; i < r.rowLen; i++ { remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i]) if err != nil { diff --git a/util/chunk/recordbatch.go b/util/chunk/recordbatch.go index c539e4dd83322..4c1a3520af642 100644 --- a/util/chunk/recordbatch.go +++ b/util/chunk/recordbatch.go @@ -46,7 +46,13 @@ func (rb *RecordBatch) RequiredRows() int { } // IsFull returns if this batch can be considered full. -func (rb *RecordBatch) IsFull(maxChunkSize int) bool { - numRows := rb.NumRows() - return numRows >= maxChunkSize || (rb.requiredRows != UnspecifiedNumRows && numRows >= rb.requiredRows) +// IsFull only takes requiredRows into account, the caller of this method should +// also consider maxChunkSize, then it should behave like: +// if !batch.IsFull() && batch.NumRows() < maxChunkSize { ... } +func (rb *RecordBatch) IsFull() bool { + if rb.requiredRows == UnspecifiedNumRows { + return false + } + + return rb.NumRows() >= rb.requiredRows } diff --git a/util/chunk/recordbatch_test.go b/util/chunk/recordbatch_test.go index 5e83d2f80f6d6..b2274ef54190a 100644 --- a/util/chunk/recordbatch_test.go +++ b/util/chunk/recordbatch_test.go @@ -41,15 +41,18 @@ func (s *testChunkSuite) TestRecordBatch(c *check.C) { batch.AppendInt64(0, 1) batch.AppendInt64(0, 1) c.Assert(batch.NumRows(), check.Equals, 4) - c.Assert(batch.IsFull(maxChunkSize), check.IsFalse) + c.Assert(batch.IsFull(), check.IsFalse) batch.AppendInt64(0, 1) c.Assert(batch.NumRows(), check.Equals, 5) - c.Assert(batch.IsFull(maxChunkSize), check.IsTrue) + c.Assert(batch.IsFull(), check.IsTrue) batch.AppendInt64(0, 1) batch.AppendInt64(0, 1) batch.AppendInt64(0, 1) c.Assert(batch.NumRows(), check.Equals, 8) - c.Assert(batch.IsFull(maxChunkSize), check.IsTrue) + c.Assert(batch.IsFull(), check.IsTrue) + + batch.SetRequiredRows(UnspecifiedNumRows) + c.Assert(batch.IsFull(), check.IsFalse) }