Skip to content

Commit

Permalink
address some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 committed Dec 17, 2019
1 parent 6561f68 commit f881662
Showing 1 changed file with 47 additions and 19 deletions.
66 changes: 47 additions & 19 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"container/heap"
"context"
"fmt"
"github.com/pingcap/tidb/util/disk"
"sort"
"sync/atomic"

Expand Down Expand Up @@ -50,7 +51,8 @@ type SortExec struct {
// rowPointer store the chunk index and row index for each row.
rowPtrs []chunk.RowPtr

memTracker *memory.Tracker
memTracker *memory.Tracker
diskTracker *disk.Tracker

// rowChunksInDisk is the chunks to store row values in disk.
rowChunksInDisk *chunk.ListInDisk
Expand All @@ -73,26 +75,38 @@ type SortExec struct {

// Close implements the Executor Close interface.
func (e *SortExec) Close() error {
if e.rowChunksInDisk != nil {
if err := e.rowChunksInDisk.Close(); err != nil {
return err
if e.alreadySpilled() {
if e.rowChunksInDisk != nil {
if err := e.rowChunksInDisk.Close(); err != nil {
return err
}
}
}
for _, chunkInDisk := range e.partitionList {
if chunkInDisk != nil {
if err := chunkInDisk.Close(); err != nil {
for _, chunkInDisk := range e.partitionList {
if chunkInDisk != nil {
if err := chunkInDisk.Close(); err != nil {
return err
}
}
}
if e.finalChunksInDisk != nil {
if err := e.finalChunksInDisk.Close(); err != nil {
return err
}
}
}
if e.finalChunksInDisk != nil {
if err := e.finalChunksInDisk.Close(); err != nil {
return err
e.rowChunksInDisk = nil
e.partitionList = e.partitionList[:0]
e.finalChunksInDisk = nil

e.memTracker.Consume(int64(-8 * cap(e.rowPtrsInDisk)))
e.rowPtrsInDisk = nil
for _, partitionPtrs := range e.partitionRowPtrs {
e.memTracker.Consume(int64(-8 * cap(partitionPtrs)))
}
e.partitionRowPtrs = nil
e.memTracker.Consume(int64(-8 * cap(e.finalRowPtrs)))
e.finalRowPtrs = nil
}
e.rowChunksInDisk = nil
e.partitionList = e.partitionList[:0]
e.finalChunksInDisk = nil
e.memTracker.Consume(int64(-8 * cap(e.rowPtrs)))
e.memTracker = nil
return e.children[0].Close()
}
Expand All @@ -106,6 +120,8 @@ func (e *SortExec) Open(ctx context.Context) error {
if e.memTracker == nil {
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaSort)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.diskTracker = memory.NewTracker(e.id, -1)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)
}
e.exceeded = 0
e.spilled = 0
Expand Down Expand Up @@ -178,6 +194,8 @@ func (e *SortExec) externalSorting() (err error) {
if err != nil {
return err
}
e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil
e.partitionList = append(e.partitionList, listInDisk)
e.partitionRowPtrs = append(e.partitionRowPtrs, e.initPointersForListInDisk(listInDisk))
// merge sort
Expand All @@ -189,7 +207,8 @@ func (e *SortExec) externalSorting() (err error) {
}
e.initPointers()
e.finalChunksInDisk, err = e.spillToDiskByRowPtr()

e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil
e.finalRowPtrs = e.initPointersForListInDisk(e.finalChunksInDisk)
return err
}
Expand Down Expand Up @@ -224,8 +243,8 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
if err != nil {
return err
}
e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil // GC its internal chunks.
e.memTracker.Consume(-e.memTracker.BytesConsumed())
atomic.StoreUint32(&e.spilled, 1)
}
}
Expand All @@ -234,14 +253,19 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
}

func (e *SortExec) initPointers() {
e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len())
e.memTracker.Consume(int64(8 * e.rowChunks.Len()))
if e.rowPtrs != nil {
e.memTracker.Consume(int64(-8 * cap(e.rowPtrs)))
e.rowPtrs = e.rowPtrs[:0]
} else {
e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len())
}
for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ {
rowChk := e.rowChunks.GetChunk(chkIdx)
for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ {
e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
e.memTracker.Consume(int64(8 * cap(e.rowPtrs)))
}

func (e *SortExec) initPointersForListInDisk(disk *chunk.ListInDisk) []chunk.RowPtr {
Expand Down Expand Up @@ -296,6 +320,8 @@ func (e *SortExec) keyColumnsLess(i, j int) bool {

func (e *SortExec) readPartition(disk *chunk.ListInDisk, rowPtrs []chunk.RowPtr) error {
e.rowChunks = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel)
for _, rowPtr := range rowPtrs {
rowPtr, err := disk.GetRow(rowPtr)
if err != nil {
Expand All @@ -315,6 +341,7 @@ func (e *SortExec) alreadySpilledSafe() bool { return atomic.LoadUint32(&e.spill
func (e *SortExec) spillToDisk() (disk *chunk.ListInDisk, err error) {
N := e.rowChunks.NumChunks()
rowChunksInDisk := chunk.NewListInDisk(e.retFieldTypes)
rowChunksInDisk.GetDiskTracker().AttachTo(e.diskTracker)
for i := 0; i < N; i++ {
chk := e.rowChunks.GetChunk(i)
err = rowChunksInDisk.Add(chk)
Expand All @@ -327,6 +354,7 @@ func (e *SortExec) spillToDisk() (disk *chunk.ListInDisk, err error) {

func (e *SortExec) spillToDiskByRowPtr() (disk *chunk.ListInDisk, err error) {
rowChunksInDisk := chunk.NewListInDisk(e.retFieldTypes)
rowChunksInDisk.GetDiskTracker().AttachTo(e.diskTracker)
chk := newFirstChunk(e)
for _, rowPtr := range e.rowPtrs {
chk.AppendRow(e.rowChunks.GetRow(rowPtr))
Expand Down

0 comments on commit f881662

Please sign in to comment.