Skip to content

Commit

Permalink
executor: fix incorrect chunk full judgement in parallel sort spill (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhangxian1008 authored Jul 11, 2024
1 parent 4b557dd commit a490882
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
13 changes: 11 additions & 2 deletions pkg/executor/sortexec/parallel_sort_spill_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ func newParallelSortSpillHelper(sortExec *SortExec, fieldTypes []*types.FieldTyp
errOutputChan: errOutputChan,
finishCh: finishCh,
fieldTypes: fieldTypes,
tmpSpillChunk: chunk.NewChunkFromPoolWithCapacity(fieldTypes, spillChunkSize),
}
}

func (p *parallelSortSpillHelper) close() {
for _, inDisk := range p.sortedRowsInDisk {
inDisk.Close()
}
p.tmpSpillChunk.Destroy(spillChunkSize, p.fieldTypes)

if p.tmpSpillChunk != nil {
p.tmpSpillChunk.Destroy(spillChunkSize, p.fieldTypes)
}
}

func (p *parallelSortSpillHelper) isNotSpilledNoLock() bool {
Expand Down Expand Up @@ -171,8 +173,15 @@ func (p *parallelSortSpillHelper) spillTmpSpillChunk(inDisk *chunk.DataInDiskByC
return nil
}

func (p *parallelSortSpillHelper) initForSpill() {
if p.tmpSpillChunk == nil {
p.tmpSpillChunk = chunk.NewChunkFromPoolWithCapacity(p.fieldTypes, spillChunkSize)
}
}

func (p *parallelSortSpillHelper) spillImpl(merger *multiWayMerger) error {
logutil.BgLogger().Info(spillInfo, zap.Int64("consumed", p.bytesConsumed.Load()), zap.Int64("quota", p.bytesLimit.Load()))
p.initForSpill()
p.tmpSpillChunk.Reset()
inDisk := chunk.NewDataInDiskByChunks(p.fieldTypes)
inDisk.GetDiskTracker().AttachTo(p.sortExec.diskTracker)
Expand Down
1 change: 1 addition & 0 deletions pkg/util/chunk/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func NewPool(initCap int) *Pool {
func (p *Pool) GetChunk(fields []*types.FieldType) *Chunk {
chk := new(Chunk)
chk.capacity = p.initCap
chk.requiredRows = p.initCap
chk.columns = make([]*Column, len(fields))
for i, f := range fields {
switch elemLen := getFixedLen(f); elemLen {
Expand Down

0 comments on commit a490882

Please sign in to comment.