Skip to content

Commit

Permalink
remove maxChunkSize from IsFull
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 committed Feb 12, 2019
1 parent 4ccfa41 commit 973b698
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 10 deletions.
4 changes: 2 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
// NextBatch reads the data into batch.
func (r *selectResult) NextBatch(ctx context.Context, batch *chunk.RecordBatch) error {
batch.Reset()
for !batch.IsFull(r.ctx.GetSessionVars().MaxChunkSize) {
for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
Expand Down Expand Up @@ -178,7 +178,7 @@ func (r *selectResult) getSelectResp() error {
func (r *selectResult) readRowsData(batch *chunk.RecordBatch) (err error) {
rowsData := r.selectResp.Chunks[r.respChkIdx].RowsData
decoder := codec.NewDecoder(batch.Chunk, r.ctx.GetSessionVars().Location())
for !batch.IsFull(r.ctx.GetSessionVars().MaxChunkSize) && len(rowsData) > 0 {
for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize && len(rowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
rowsData, err = decoder.DecodeOne(rowsData, i, r.fieldTypes[i])
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *streamResult) Next(ctx context.Context, chk *chunk.Chunk) error {
// NextBatch reads the data into batch.
func (r *streamResult) NextBatch(ctx context.Context, batch *chunk.RecordBatch) error {
batch.Reset()
for !batch.IsFull(r.ctx.GetSessionVars().MaxChunkSize) {
for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
err := r.readDataIfNecessary(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -121,7 +121,7 @@ func (r *streamResult) readDataIfNecessary(ctx context.Context) error {
func (r *streamResult) flushToBatch(batch *chunk.RecordBatch) (err error) {
remainRowsData := r.curr.RowsData
decoder := codec.NewDecoder(batch.Chunk, r.ctx.GetSessionVars().Location())
for !batch.IsFull(r.ctx.GetSessionVars().MaxChunkSize) && len(remainRowsData) > 0 {
for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize && len(remainRowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i])
if err != nil {
Expand Down
12 changes: 9 additions & 3 deletions util/chunk/recordbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ func (rb *RecordBatch) RequiredRows() int {
}

// IsFull returns if this batch can be considered full.
func (rb *RecordBatch) IsFull(maxChunkSize int) bool {
numRows := rb.NumRows()
return numRows >= maxChunkSize || (rb.requiredRows != UnspecifiedNumRows && numRows >= rb.requiredRows)
// IsFull only takes requiredRows into account, the caller of this method should
// also consider maxChunkSize, then it should behave like:
// if !batch.IsFull() && batch.NumRows() < maxChunkSize { ... }
func (rb *RecordBatch) IsFull() bool {
if rb.requiredRows == UnspecifiedNumRows {
return false
}

return rb.NumRows() >= rb.requiredRows
}
9 changes: 6 additions & 3 deletions util/chunk/recordbatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,18 @@ func (s *testChunkSuite) TestRecordBatch(c *check.C) {
batch.AppendInt64(0, 1)
batch.AppendInt64(0, 1)
c.Assert(batch.NumRows(), check.Equals, 4)
c.Assert(batch.IsFull(maxChunkSize), check.IsFalse)
c.Assert(batch.IsFull(), check.IsFalse)

batch.AppendInt64(0, 1)
c.Assert(batch.NumRows(), check.Equals, 5)
c.Assert(batch.IsFull(maxChunkSize), check.IsTrue)
c.Assert(batch.IsFull(), check.IsTrue)

batch.AppendInt64(0, 1)
batch.AppendInt64(0, 1)
batch.AppendInt64(0, 1)
c.Assert(batch.NumRows(), check.Equals, 8)
c.Assert(batch.IsFull(maxChunkSize), check.IsTrue)
c.Assert(batch.IsFull(), check.IsTrue)

batch.SetRequiredRows(UnspecifiedNumRows)
c.Assert(batch.IsFull(), check.IsFalse)
}

0 comments on commit 973b698

Please sign in to comment.