Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: implement disk-based sort (Part 1) #13718

Merged
merged 14 commits into from
Dec 18, 2019
18 changes: 17 additions & 1 deletion executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,30 @@ func (s *testSuite1) TestMemoryUsageAfterClose(c *C) {
for i := 0; i < tk.Se.GetSessionVars().MaxChunkSize*5; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i))
}
SQLs := []string{"select v+abs(k) from t"}
SQLs := []string{"select v+abs(k) from t",
"select v from t order by v"}
for _, sql := range SQLs {
tk.MustQuery(sql)
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.BytesConsumed(), Equals, int64(0))
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.MaxConsumed(), Greater, int64(0))
}
}

func (s *testSuite1) TestDiskUsageAfterClose(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (v int, k int, key(k))")
for i := 0; i < tk.Se.GetSessionVars().MaxChunkSize*5; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i))
}
SQLs := []string{
"select v from t order by v"}
for _, sql := range SQLs {
tk.MustQuery(sql)
c.Assert(tk.Se.GetSessionVars().StmtCtx.DiskTracker.BytesConsumed(), Equals, int64(0))
}
}

func (s *testSuite2) TestExplainAnalyzeExecutionInfo(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
Expand Down
215 changes: 202 additions & 13 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"context"
"fmt"
"sort"
"sync/atomic"

"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
)
Expand All @@ -49,12 +51,57 @@ type SortExec struct {
// rowPointer store the chunk index and row index for each row.
rowPtrs []chunk.RowPtr

memTracker *memory.Tracker
memTracker *memory.Tracker
diskTracker *disk.Tracker

// rowChunksInDisk is the chunks to store row values in disk.
rowChunksInDisk *chunk.ListInDisk
// rowPtrsInDisk store the disk-chunk index and row index for each row.
rowPtrsInDisk []chunk.RowPtr
// partitionList is the chunks to store row values in disk for partitions.
partitionList []*chunk.ListInDisk
// partitionRowPtrs store the disk-chunk index and row index for each row for partitions.
partitionRowPtrs [][]chunk.RowPtr
// exceeded indicates that records have exceeded memQuota during
// adding this chunk and we should spill now.
exceeded uint32
// spilled indicates that records have spilled out into disk.
spilled uint32
}

// Close implements the Executor Close interface.
func (e *SortExec) Close() error {
if e.alreadySpilled() {
if e.rowChunksInDisk != nil {
if err := e.rowChunksInDisk.Close(); err != nil {
return err
}
}
for _, chunkInDisk := range e.partitionList {
if chunkInDisk != nil {
if err := chunkInDisk.Close(); err != nil {
return err
}
}
}
e.rowChunksInDisk = nil
e.partitionList = e.partitionList[:0]

e.memTracker.Consume(int64(-8 * cap(e.rowPtrsInDisk)))
e.rowPtrsInDisk = nil
for _, partitionPtrs := range e.partitionRowPtrs {
e.memTracker.Consume(int64(-8 * cap(partitionPtrs)))
}
e.partitionRowPtrs = nil
}
if e.rowChunks != nil {
e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil
}
e.memTracker.Consume(int64(-8 * cap(e.rowPtrs)))
e.rowPtrs = nil
e.memTracker = nil
e.diskTracker = nil
return e.children[0].Close()
SunRunAway marked this conversation as resolved.
Show resolved Hide resolved
}

Expand All @@ -67,7 +114,15 @@ func (e *SortExec) Open(ctx context.Context) error {
if e.memTracker == nil {
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaSort)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.diskTracker = memory.NewTracker(e.id, -1)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)
}
e.exceeded = 0
e.spilled = 0
e.rowChunksInDisk = nil
e.rowPtrsInDisk = e.rowPtrsInDisk[:0]
e.partitionList = e.partitionList[:0]
e.partitionRowPtrs = e.partitionRowPtrs[:0]
return e.children[0].Open(ctx)
}

Expand All @@ -79,20 +134,65 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
e.initPointers()
e.initCompareFuncs()
e.buildKeyColumns()
sort.Slice(e.rowPtrs, e.keyColumnsLess)
e.fetched = true
if e.alreadySpilled() {
err = e.prepareExternalSorting()
if err != nil {
return err
}
e.fetched = true
} else {
e.initPointers()
e.initCompareFuncs()
e.buildKeyColumns()
sort.Slice(e.rowPtrs, e.keyColumnsLess)
e.fetched = true
}
}
for !req.IsFull() && e.Idx < len(e.rowPtrs) {
rowPtr := e.rowPtrs[e.Idx]
req.AppendRow(e.rowChunks.GetRow(rowPtr))
e.Idx++

if e.alreadySpilled() {
for !req.IsFull() && e.Idx < len(e.partitionRowPtrs[0]) {
rowPtr := e.partitionRowPtrs[0][e.Idx]
row, err := e.partitionList[0].GetRow(rowPtr)
if err != nil {
return err
}
req.AppendRow(row)
e.Idx++
}
} else {
for !req.IsFull() && e.Idx < len(e.rowPtrs) {
rowPtr := e.rowPtrs[e.Idx]
req.AppendRow(e.rowChunks.GetRow(rowPtr))
e.Idx++
}
}
return nil
}

func (e *SortExec) prepareExternalSorting() (err error) {
e.initCompareFuncs()
e.buildKeyColumns()
e.rowPtrsInDisk = e.initPointersForListInDisk(e.rowChunksInDisk)
// partition sort
// Now only have one partition.
// The partition will be adjusted in the next pr.
err = e.readPartition(e.rowChunksInDisk, e.rowPtrsInDisk)
if err != nil {
return err
}
e.initPointers()
sort.Slice(e.rowPtrs, e.keyColumnsLess)
listInDisk, err := e.spillToDiskByRowPtr()
if err != nil {
return err
}
e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil
e.partitionList = append(e.partitionList, listInDisk)
e.partitionRowPtrs = append(e.partitionRowPtrs, e.initPointersForListInDisk(listInDisk))
return err
}

func (e *SortExec) fetchRowChunks(ctx context.Context) error {
fields := retTypes(e)
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
Expand All @@ -108,20 +208,53 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
if rowCount == 0 {
break
}
e.rowChunks.Add(chk)
if e.alreadySpilled() {
// append chk to disk.
err := e.rowChunksInDisk.Add(chk)
if err != nil {
return err
}
} else {
e.rowChunks.Add(chk)
if atomic.LoadUint32(&e.exceeded) == 1 {
e.rowChunksInDisk, err = e.spillToDisk()
if err != nil {
return err
}
e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil // GC its internal chunks.
atomic.StoreUint32(&e.spilled, 1)
}
}
}
return nil
}

func (e *SortExec) initPointers() {
e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len())
e.memTracker.Consume(int64(8 * e.rowChunks.Len()))
if e.rowPtrs != nil {
e.memTracker.Consume(int64(-8 * cap(e.rowPtrs)))
e.rowPtrs = e.rowPtrs[:0]
} else {
e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len())
}
for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ {
rowChk := e.rowChunks.GetChunk(chkIdx)
for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ {
e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
e.memTracker.Consume(int64(8 * cap(e.rowPtrs)))
}

func (e *SortExec) initPointersForListInDisk(disk *chunk.ListInDisk) []chunk.RowPtr {
rowPtrsInDisk := make([]chunk.RowPtr, 0)
for chkIdx := 0; chkIdx < disk.NumChunks(); chkIdx++ {
for rowIdx := 0; rowIdx < disk.NumRowsOfChunk(chkIdx); rowIdx++ {
rowPtrsInDisk = append(rowPtrsInDisk, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
e.memTracker.Consume(int64(8 * len(rowPtrsInDisk)))
return rowPtrsInDisk
}

func (e *SortExec) initCompareFuncs() {
Expand Down Expand Up @@ -163,6 +296,62 @@ func (e *SortExec) keyColumnsLess(i, j int) bool {
return e.lessRow(rowI, rowJ)
}

func (e *SortExec) readPartition(disk *chunk.ListInDisk, rowPtrs []chunk.RowPtr) error {
e.rowChunks = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel)
for _, rowPtr := range rowPtrs {
rowPtr, err := disk.GetRow(rowPtr)
if err != nil {
return err
}
e.rowChunks.AppendRow(rowPtr)
}
return nil
}

// alreadySpilled indicates that records have spilled out into disk.
func (e *SortExec) alreadySpilled() bool { return e.rowChunksInDisk != nil }

// alreadySpilledSafe indicates that records have spilled out into disk. It's thread-safe.
func (e *SortExec) alreadySpilledSafe() bool { return atomic.LoadUint32(&e.spilled) == 1 }
qw4990 marked this conversation as resolved.
Show resolved Hide resolved

func (e *SortExec) spillToDisk() (disk *chunk.ListInDisk, err error) {
N := e.rowChunks.NumChunks()
rowChunksInDisk := chunk.NewListInDisk(e.retFieldTypes)
rowChunksInDisk.GetDiskTracker().AttachTo(e.diskTracker)
for i := 0; i < N; i++ {
chk := e.rowChunks.GetChunk(i)
err = rowChunksInDisk.Add(chk)
if err != nil {
return nil, err
}
}
return rowChunksInDisk, nil
}

func (e *SortExec) spillToDiskByRowPtr() (disk *chunk.ListInDisk, err error) {
rowChunksInDisk := chunk.NewListInDisk(e.retFieldTypes)
rowChunksInDisk.GetDiskTracker().AttachTo(e.diskTracker)
chk := newFirstChunk(e)
for _, rowPtr := range e.rowPtrs {
chk.AppendRow(e.rowChunks.GetRow(rowPtr))
if chk.IsFull() {
err := rowChunksInDisk.Add(chk)
if err != nil {
return nil, err
}
chk = newFirstChunk(e)
}
}
if chk.NumRows() != 0 {
if err := rowChunksInDisk.Add(chk); err != nil {
return nil, err
}
}
return rowChunksInDisk, nil
}

// TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT.
// Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage.
type TopNExec struct {
Expand Down