From 813fefe46903a98ef4dca8365bcd27bdab9d0b80 Mon Sep 17 00:00:00 2001 From: yu34po Date: Thu, 14 Feb 2019 16:14:39 +0800 Subject: [PATCH 01/21] add merge sort --- executor/builder.go | 2 +- executor/merge_sort.go | 340 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 341 insertions(+), 1 deletion(-) create mode 100644 executor/merge_sort.go diff --git a/executor/builder.go b/executor/builder.go index e84c26bae68da..3dae9aee8a6b4 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1134,7 +1134,7 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor { b.err = errors.Trace(b.err) return nil } - sortExec := SortExec{ + sortExec := MergeSortExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), ByItems: v.ByItems, schema: v.Schema(), diff --git a/executor/merge_sort.go b/executor/merge_sort.go new file mode 100644 index 0000000000000..f9573284adfeb --- /dev/null +++ b/executor/merge_sort.go @@ -0,0 +1,340 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "github.com/pingcap/tidb/util" + "sort" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/expression" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" +) + +// MergeSortExec represents sorting executor. +type MergeSortExec struct { + baseExecutor + + ByItems []*plannercore.ByItems + Idx int + fetched bool + schema *expression.Schema + + keyExprs []expression.Expression + keyTypes []*types.FieldType + // keyColumns is the column index of the by items. + keyColumns []int + // keyCmpFuncs is used to compare each ByItem. + keyCmpFuncs []chunk.CompareFunc + // keyChunks is used to store ByItems values when not all ByItems are column. + keyChunks *chunk.List + // rowChunks is the chunks to store row values. + rowChunks *chunk.List + // rowPointer store the chunk index and row index for each row. + workerRowPtrs []*[]chunk.RowPtr + + workerRowLen []int + workerRowIdx []int + + memTracker *memory.Tracker + + concurrency int + + allColumnExpr bool + + workerWg *sync.WaitGroup + + finishedCh chan struct{} +} + +type SortWorker struct { + MergeSortExec + chkIdx int + rowIdx int + len int + rowPtrs []chunk.RowPtr +} + +func (sw *SortWorker) run() { + //sw.memTracker.Consume(int64(8 * sw.rowChunks.Len())) + //log.Infof("chkIdx %d rowIdx %d workerlen %d", sw.chkIdx, sw.rowIdx, sw.len) + for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks(); chkIdx++ { + rowChk := sw.rowChunks.GetChunk(chkIdx) + for rowIdx := sw.rowIdx; rowIdx < rowChk.NumRows() && len(sw.rowPtrs) < sw.len; rowIdx++ { + sw.rowPtrs = append(sw.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) + } + } + + if sw.allColumnExpr { + sort.Slice(sw.rowPtrs, sw.keyColumnsLess) + } else { + sort.Slice(sw.rowPtrs, sw.keyChunksLess) + } + return +} + +// Close implements the Executor Close interface. +func (e *MergeSortExec) Close() error { + e.memTracker.Detach() + e.memTracker = nil + return errors.Trace(e.children[0].Close()) +} + +// Open implements the Executor Open interface. +func (e *MergeSortExec) Open(ctx context.Context) error { + e.fetched = false + e.Idx = 0 + e.concurrency = 4 + e.workerRowIdx = make([]int, e.concurrency) + e.workerRowLen = make([]int, e.concurrency) + e.workerRowPtrs = make([]*[]chunk.RowPtr, e.concurrency) + // To avoid duplicated initialization for TopNExec. + if e.memTracker == nil { + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaSort) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + } + return errors.Trace(e.children[0].Open(ctx)) +} + +func (e *MergeSortExec) newSortWorker(chk, row, len int) *SortWorker { + return &SortWorker{ + MergeSortExec: *e, + chkIdx: chk, + rowIdx: row, + len: len, + rowPtrs: make([]chunk.RowPtr, 0, len), + } +} + +func (e *MergeSortExec) wait4WorkerSort(wg *sync.WaitGroup, finishedCh chan struct{}) { + wg.Wait() + close(finishedCh) +} + +// Next implements the Executor Next interface. +func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("sort.Next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + } + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Since(start), req.NumRows()) }() + } + req.Reset() + if !e.fetched { + err := e.fetchRowChunks(ctx) + if err != nil { + return errors.Trace(err) + } + + e.initCompareFuncs() + e.allColumnExpr = e.buildKeyColumns() + if !e.allColumnExpr { + e.buildKeyExprsAndTypes() + err := e.buildKeyChunks() + if err != nil { + return err + } + } + e.finishedCh = make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(int(e.concurrency)) + go e.wait4WorkerSort(wg, e.finishedCh) + avgLen := 0 + avgLen = e.rowChunks.Len() / e.concurrency + //log.Infof("row count %d avgLen %d", e.rowChunks.Len(), avgLen) + + for i := 0; i < e.concurrency; i++ { + chkIdx := avgLen * i / e.maxChunkSize + rowIdx := avgLen * i % e.maxChunkSize + if i == e.concurrency-1 { + avgLen = e.rowChunks.Len()%e.concurrency + avgLen + } + + sortWorker := e.newSortWorker(chkIdx, rowIdx, avgLen) + e.workerRowLen[i] = avgLen + e.workerRowIdx[i] = 0 + e.workerRowPtrs[i] = &sortWorker.rowPtrs + + go util.WithRecovery(func() { + defer wg.Done() + sortWorker.run() + }, nil) + } + + e.fetched = true + <-e.finishedCh + for i := 0; i < e.concurrency; i++ { + //log.Infof("worker %d row count %d",i, len(*e.workerRowPtrs[i])) + for j := 0; j < len(*e.workerRowPtrs[i]); j++ { + //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) + } + } + } + for req.NumRows() < e.maxChunkSize { + i := 0 + j := 0 + for; j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j];{ + j++ + } + //log.Infof("j %d", j) + if j >= e.concurrency { + break + } + minRowPtr := (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + + for i = j; i < e.concurrency; i++ { + if e.workerRowIdx[i] < e.workerRowLen[i] { + flag := false + if e.allColumnExpr { + keyRowI := e.rowChunks.GetRow(minRowPtr) + keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[j])[e.workerRowIdx[j]]) + flag = e.lessRow(keyRowI, keyRowJ) + } else { + keyRowI := e.keyChunks.GetRow(minRowPtr) + keyRowJ := e.keyChunks.GetRow((*e.workerRowPtrs[j])[e.workerRowIdx[j]]) + flag = e.lessRow(keyRowI, keyRowJ) + } + if flag { + minRowPtr = (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + j = i + } + } + } + e.workerRowIdx[j]++ + //log.Infof("worker %d idx increase to %d", j, e.workerRowIdx[j]) + req.AppendRow(e.rowChunks.GetRow(minRowPtr)) + } + return nil +} + +func (e *MergeSortExec) fetchRowChunks(ctx context.Context) error { + fields := e.retTypes() + e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize) + e.rowChunks.GetMemTracker().AttachTo(e.memTracker) + e.rowChunks.GetMemTracker().SetLabel("rowChunks") + for { + chk := e.children[0].newFirstChunk() + err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk)) + if err != nil { + return errors.Trace(err) + } + rowCount := chk.NumRows() + if rowCount == 0 { + break + } + e.rowChunks.Add(chk) + } + return nil +} + +//func (e *MergeSortExec) initPointers() { +// e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len()) +// e.memTracker.Consume(int64(8 * e.rowChunks.Len())) +// for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { +// rowChk := e.rowChunks.GetChunk(chkIdx) +// for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ { +// e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) +// } +// } +//} + +func (e *MergeSortExec) initCompareFuncs() { + e.keyCmpFuncs = make([]chunk.CompareFunc, len(e.ByItems)) + for i := range e.ByItems { + keyType := e.ByItems[i].Expr.GetType() + e.keyCmpFuncs[i] = chunk.GetCompareFunc(keyType) + } +} + +func (e *MergeSortExec) buildKeyColumns() (allColumnExpr bool) { + e.keyColumns = make([]int, 0, len(e.ByItems)) + for _, by := range e.ByItems { + if col, ok := by.Expr.(*expression.Column); ok { + e.keyColumns = append(e.keyColumns, col.Index) + } else { + e.keyColumns = e.keyColumns[:0] + for i := range e.ByItems { + e.keyColumns = append(e.keyColumns, i) + } + return false + } + } + return true +} + +func (e *MergeSortExec) buildKeyExprsAndTypes() { + keyLen := len(e.ByItems) + e.keyTypes = make([]*types.FieldType, keyLen) + e.keyExprs = make([]expression.Expression, keyLen) + for keyColIdx := range e.ByItems { + e.keyExprs[keyColIdx] = e.ByItems[keyColIdx].Expr + e.keyTypes[keyColIdx] = e.ByItems[keyColIdx].Expr.GetType() + } +} + +func (e *MergeSortExec) buildKeyChunks() error { + e.keyChunks = chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) + e.keyChunks.GetMemTracker().SetLabel("keyChunks") + e.keyChunks.GetMemTracker().AttachTo(e.memTracker) + + for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { + keyChk := chunk.NewChunkWithCapacity(e.keyTypes, e.rowChunks.GetChunk(chkIdx).NumRows()) + childIter := chunk.NewIterator4Chunk(e.rowChunks.GetChunk(chkIdx)) + err := expression.VectorizedExecute(e.ctx, e.keyExprs, childIter, keyChk) + if err != nil { + return errors.Trace(err) + } + e.keyChunks.Add(keyChk) + } + return nil +} + +func (e *MergeSortExec) lessRow(rowI, rowJ chunk.Row) bool { + for i, colIdx := range e.keyColumns { + cmpFunc := e.keyCmpFuncs[i] + cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) + if e.ByItems[i].Desc { + cmp = -cmp + } + if cmp < 0 { + return true + } else if cmp > 0 { + return false + } + } + return false +} + +// keyColumnsLess is the less function for key columns. +func (e *SortWorker) keyColumnsLess(i, j int) bool { + rowI := e.rowChunks.GetRow(e.rowPtrs[i]) + rowJ := e.rowChunks.GetRow(e.rowPtrs[j]) + return e.lessRow(rowI, rowJ) +} + +// keyChunksLess is the less function for key chunk. +func (e *SortWorker) keyChunksLess(i, j int) bool { + keyRowI := e.keyChunks.GetRow(e.rowPtrs[i]) + keyRowJ := e.keyChunks.GetRow(e.rowPtrs[j]) + return e.lessRow(keyRowI, keyRowJ) +} From ad39e85ffb2a316313eb27dd148ad275f5580712 Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 15 Feb 2019 14:12:43 +0800 Subject: [PATCH 02/21] fix index bug --- executor/merge_sort.go | 56 +++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index f9573284adfeb..f485c170d5ffd 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -73,15 +73,21 @@ type SortWorker struct { rowPtrs []chunk.RowPtr } -func (sw *SortWorker) run() { +func (sw *SortWorker) run(workerId int) { //sw.memTracker.Consume(int64(8 * sw.rowChunks.Len())) - //log.Infof("chkIdx %d rowIdx %d workerlen %d", sw.chkIdx, sw.rowIdx, sw.len) - for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks(); chkIdx++ { + for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks() && len(sw.rowPtrs) < sw.len; chkIdx++ { rowChk := sw.rowChunks.GetChunk(chkIdx) - for rowIdx := sw.rowIdx; rowIdx < rowChk.NumRows() && len(sw.rowPtrs) < sw.len; rowIdx++ { + rowIdx := 0 + if chkIdx == sw.chkIdx { + rowIdx = sw.rowIdx + } + //log.Infof("workerId %d chkIdx %d rowIdx %d rowChkNum %d rowPtrs %d", workerId,chkIdx, rowIdx, rowChk.NumRows(), len(sw.rowPtrs)) + for ; rowIdx < rowChk.NumRows() && len(sw.rowPtrs) < sw.len; rowIdx++ { sw.rowPtrs = append(sw.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) } + //log.Infof("workerId %d chkIdx %d rowPtrs %d", workerId,chkIdx, len(sw.rowPtrs)) } + //log.Infof("workerId %d chkIdx %d rowIdx %d workerlen %d, rowPtrsLen %d", workerId, sw.chkIdx, sw.rowIdx, sw.len, len(sw.rowPtrs)) if sw.allColumnExpr { sort.Slice(sw.rowPtrs, sw.keyColumnsLess) @@ -161,67 +167,73 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error go e.wait4WorkerSort(wg, e.finishedCh) avgLen := 0 avgLen = e.rowChunks.Len() / e.concurrency - //log.Infof("row count %d avgLen %d", e.rowChunks.Len(), avgLen) + //log.Infof("allcolumnExpr %v row count %d row chunk %d avgLen %d", e.allColumnExpr, e.rowChunks.Len(),e.rowChunks.NumChunks(), avgLen) for i := 0; i < e.concurrency; i++ { - chkIdx := avgLen * i / e.maxChunkSize - rowIdx := avgLen * i % e.maxChunkSize + chkIdx := (avgLen * i) / e.maxChunkSize + rowIdx := (avgLen * i) % e.maxChunkSize if i == e.concurrency-1 { avgLen = e.rowChunks.Len()%e.concurrency + avgLen } + //log.Infof("worker %d chunkIdx %d rowIdx %d rowLen %d maxChunkSize %d", i, chkIdx, rowIdx, avgLen, e.maxChunkSize) sortWorker := e.newSortWorker(chkIdx, rowIdx, avgLen) e.workerRowLen[i] = avgLen e.workerRowIdx[i] = 0 e.workerRowPtrs[i] = &sortWorker.rowPtrs - + workerId := i go util.WithRecovery(func() { defer wg.Done() - sortWorker.run() + sortWorker.run(workerId) }, nil) } e.fetched = true <-e.finishedCh for i := 0; i < e.concurrency; i++ { - //log.Infof("worker %d row count %d",i, len(*e.workerRowPtrs[i])) - for j := 0; j < len(*e.workerRowPtrs[i]); j++ { - //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) - } + //log.Infof("worker %d row count %d row len %d",i, len(*e.workerRowPtrs[i]), e.workerRowLen[i]) + // for j := 0; j < len(*e.workerRowPtrs[i]); j++ { + // //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) + // } } } for req.NumRows() < e.maxChunkSize { - i := 0 j := 0 for; j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j];{ j++ } - //log.Infof("j %d", j) if j >= e.concurrency { break } - minRowPtr := (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + //log.Infof("start worker %d ptr len %d idx %d len %d",j , len(*e.workerRowPtrs[j]), e.workerRowIdx[j], e.workerRowLen[j] ) - for i = j; i < e.concurrency; i++ { + minRowPtr := (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + //log.Infof("%v", e.rowChunks.GetRow(minRowPtr)) + for i := j + 1; i < e.concurrency ; i++ { if e.workerRowIdx[i] < e.workerRowLen[i] { flag := false if e.allColumnExpr { keyRowI := e.rowChunks.GetRow(minRowPtr) - keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[j])[e.workerRowIdx[j]]) + //log.Infof("compare worker %d ptr len %d idx %d len %d",i , len(*e.workerRowPtrs[i]), e.workerRowIdx[i], e.workerRowLen[i] ) + //if e.workerRowIdx[i] == 8850 { + // log.Infof("compare worker %d reach 8874", i) + // break + //} + keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) flag = e.lessRow(keyRowI, keyRowJ) } else { keyRowI := e.keyChunks.GetRow(minRowPtr) - keyRowJ := e.keyChunks.GetRow((*e.workerRowPtrs[j])[e.workerRowIdx[j]]) + keyRowJ := e.keyChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) flag = e.lessRow(keyRowI, keyRowJ) } - if flag { - minRowPtr = (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + if !flag { + minRowPtr = (*e.workerRowPtrs[i])[e.workerRowIdx[i]] j = i } } } + //log.Infof("worker %d idx %d append rowPtr %v", j, e.workerRowIdx[j], (*e.workerRowPtrs[j])[e.workerRowIdx[j]]) e.workerRowIdx[j]++ - //log.Infof("worker %d idx increase to %d", j, e.workerRowIdx[j]) req.AppendRow(e.rowChunks.GetRow(minRowPtr)) } return nil From b0fefa6e96cc42bea45fffa8959e06fbdac1591e Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 15 Feb 2019 14:56:51 +0800 Subject: [PATCH 03/21] add log --- executor/merge_sort.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index f485c170d5ffd..b74a08bd99d48 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -27,6 +27,8 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/memory" + log "github.com/sirupsen/logrus" + ) // MergeSortExec represents sorting executor. @@ -167,7 +169,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error go e.wait4WorkerSort(wg, e.finishedCh) avgLen := 0 avgLen = e.rowChunks.Len() / e.concurrency - //log.Infof("allcolumnExpr %v row count %d row chunk %d avgLen %d", e.allColumnExpr, e.rowChunks.Len(),e.rowChunks.NumChunks(), avgLen) + log.Infof("allcolumnExpr %v row count %d row chunk %d avgLen %d", e.allColumnExpr, e.rowChunks.Len(),e.rowChunks.NumChunks(), avgLen) for i := 0; i < e.concurrency; i++ { chkIdx := (avgLen * i) / e.maxChunkSize @@ -191,7 +193,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error e.fetched = true <-e.finishedCh for i := 0; i < e.concurrency; i++ { - //log.Infof("worker %d row count %d row len %d",i, len(*e.workerRowPtrs[i]), e.workerRowLen[i]) + log.Infof("worker %d row count %d row len %d",i, len(*e.workerRowPtrs[i]), e.workerRowLen[i]) // for j := 0; j < len(*e.workerRowPtrs[i]); j++ { // //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) // } From f4ab45dd28452984a86f9be5d511ab0b67b646ef Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 15 Feb 2019 15:03:41 +0800 Subject: [PATCH 04/21] add concurrency --- executor/merge_sort.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index b74a08bd99d48..0b1a2fe26fcbd 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -110,7 +110,7 @@ func (e *MergeSortExec) Close() error { func (e *MergeSortExec) Open(ctx context.Context) error { e.fetched = false e.Idx = 0 - e.concurrency = 4 + e.concurrency = 8 e.workerRowIdx = make([]int, e.concurrency) e.workerRowLen = make([]int, e.concurrency) e.workerRowPtrs = make([]*[]chunk.RowPtr, e.concurrency) From 507f658a44b92c82a1505f05719d0404c06463df Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 22 Feb 2019 17:13:55 +0800 Subject: [PATCH 05/21] fix unit test --- executor/executor.go | 1 + executor/merge_sort.go | 117 ++++++++++++++----------------- sessionctx/variable/session.go | 4 ++ sessionctx/variable/tidb_vars.go | 1 + 4 files changed, 58 insertions(+), 65 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index de47b45adf8ff..5e841afc80bad 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -71,6 +71,7 @@ var ( _ Executor = &HashJoinExec{} _ Executor = &IndexLookUpExecutor{} _ Executor = &MergeJoinExec{} + _ Executor = &MergeSortExec{} ) type baseExecutor struct { diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 0b1a2fe26fcbd..49bd75c69bf7b 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -27,8 +27,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/memory" - log "github.com/sirupsen/logrus" - ) // MergeSortExec represents sorting executor. @@ -36,7 +34,6 @@ type MergeSortExec struct { baseExecutor ByItems []*plannercore.ByItems - Idx int fetched bool schema *expression.Schema @@ -56,15 +53,9 @@ type MergeSortExec struct { workerRowLen []int workerRowIdx []int - memTracker *memory.Tracker - - concurrency int - + memTracker *memory.Tracker + concurrency int allColumnExpr bool - - workerWg *sync.WaitGroup - - finishedCh chan struct{} } type SortWorker struct { @@ -75,7 +66,7 @@ type SortWorker struct { rowPtrs []chunk.RowPtr } -func (sw *SortWorker) run(workerId int) { +func (sw *SortWorker) run() { //sw.memTracker.Consume(int64(8 * sw.rowChunks.Len())) for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks() && len(sw.rowPtrs) < sw.len; chkIdx++ { rowChk := sw.rowChunks.GetChunk(chkIdx) @@ -83,14 +74,10 @@ func (sw *SortWorker) run(workerId int) { if chkIdx == sw.chkIdx { rowIdx = sw.rowIdx } - //log.Infof("workerId %d chkIdx %d rowIdx %d rowChkNum %d rowPtrs %d", workerId,chkIdx, rowIdx, rowChk.NumRows(), len(sw.rowPtrs)) for ; rowIdx < rowChk.NumRows() && len(sw.rowPtrs) < sw.len; rowIdx++ { sw.rowPtrs = append(sw.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) } - //log.Infof("workerId %d chkIdx %d rowPtrs %d", workerId,chkIdx, len(sw.rowPtrs)) } - //log.Infof("workerId %d chkIdx %d rowIdx %d workerlen %d, rowPtrsLen %d", workerId, sw.chkIdx, sw.rowIdx, sw.len, len(sw.rowPtrs)) - if sw.allColumnExpr { sort.Slice(sw.rowPtrs, sw.keyColumnsLess) } else { @@ -109,8 +96,8 @@ func (e *MergeSortExec) Close() error { // Open implements the Executor Open interface. func (e *MergeSortExec) Open(ctx context.Context) error { e.fetched = false - e.Idx = 0 - e.concurrency = 8 + e.concurrency = e.ctx.GetSessionVars().MergeSortConcurrency + e.workerRowIdx = make([]int, e.concurrency) e.workerRowLen = make([]int, e.concurrency) e.workerRowPtrs = make([]*[]chunk.RowPtr, e.concurrency) @@ -122,14 +109,18 @@ func (e *MergeSortExec) Open(ctx context.Context) error { return errors.Trace(e.children[0].Open(ctx)) } -func (e *MergeSortExec) newSortWorker(chk, row, len int) *SortWorker { - return &SortWorker{ +func (e *MergeSortExec) newSortWorker(workerId, chk, row, len int) *SortWorker { + sw := &SortWorker{ MergeSortExec: *e, chkIdx: chk, rowIdx: row, len: len, rowPtrs: make([]chunk.RowPtr, 0, len), } + e.workerRowLen[workerId] = len + e.workerRowIdx[workerId] = 0 + e.workerRowPtrs[workerId] = &sw.rowPtrs + return sw } func (e *MergeSortExec) wait4WorkerSort(wg *sync.WaitGroup, finishedCh chan struct{}) { @@ -137,6 +128,32 @@ func (e *MergeSortExec) wait4WorkerSort(wg *sync.WaitGroup, finishedCh chan stru close(finishedCh) } +//sortWorkerIndex calc the chunk index and row index with every worker start to sort, first column of swIdx is chunk idx, second columm of swIdx is row idx +func (e *MergeSortExec) sortWorkerIndex(workerRowsCount int) [][]int { + chkIdx := 0 + rowIdx := 0 + swIdx := make([][]int, e.concurrency) + swIdx[0] = []int{0, 0} + for i := 1; i < e.concurrency; i++ { + count := 0 + swIdx[i] = []int{0, 0} + for j := chkIdx; j < e.rowChunks.NumChunks(); j++ { + curChk := e.rowChunks.GetChunk(j) + count += curChk.NumRows() + if j == chkIdx { + count -= rowIdx + } + if count > workerRowsCount { + rowIdx = curChk.NumRows() - (count - workerRowsCount) + chkIdx = j + swIdx[i] = []int{chkIdx, rowIdx} + break + } + } + } + return swIdx +} + // Next implements the Executor Next interface. func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -163,64 +180,46 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error return err } } - e.finishedCh = make(chan struct{}) + finishedCh := make(chan struct{}) + + workerRowsCount := e.rowChunks.Len() / e.concurrency + workerIdx := e.sortWorkerIndex(workerRowsCount) + wg := &sync.WaitGroup{} wg.Add(int(e.concurrency)) - go e.wait4WorkerSort(wg, e.finishedCh) - avgLen := 0 - avgLen = e.rowChunks.Len() / e.concurrency - log.Infof("allcolumnExpr %v row count %d row chunk %d avgLen %d", e.allColumnExpr, e.rowChunks.Len(),e.rowChunks.NumChunks(), avgLen) + go e.wait4WorkerSort(wg, finishedCh) for i := 0; i < e.concurrency; i++ { - chkIdx := (avgLen * i) / e.maxChunkSize - rowIdx := (avgLen * i) % e.maxChunkSize + workerId := i + // last worker must complete the rest of rows if i == e.concurrency-1 { - avgLen = e.rowChunks.Len()%e.concurrency + avgLen + workerRowsCount += e.rowChunks.Len() % e.concurrency } - //log.Infof("worker %d chunkIdx %d rowIdx %d rowLen %d maxChunkSize %d", i, chkIdx, rowIdx, avgLen, e.maxChunkSize) - - sortWorker := e.newSortWorker(chkIdx, rowIdx, avgLen) - e.workerRowLen[i] = avgLen - e.workerRowIdx[i] = 0 - e.workerRowPtrs[i] = &sortWorker.rowPtrs - workerId := i + sw := e.newSortWorker(workerId, workerIdx[i][0], workerIdx[i][1], workerRowsCount) go util.WithRecovery(func() { defer wg.Done() - sortWorker.run(workerId) + sw.run() }, nil) } + <-finishedCh e.fetched = true - <-e.finishedCh - for i := 0; i < e.concurrency; i++ { - log.Infof("worker %d row count %d row len %d",i, len(*e.workerRowPtrs[i]), e.workerRowLen[i]) - // for j := 0; j < len(*e.workerRowPtrs[i]); j++ { - // //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) - // } - } } + for req.NumRows() < e.maxChunkSize { j := 0 - for; j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j];{ + for j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j] { j++ } if j >= e.concurrency { break } - //log.Infof("start worker %d ptr len %d idx %d len %d",j , len(*e.workerRowPtrs[j]), e.workerRowIdx[j], e.workerRowLen[j] ) - minRowPtr := (*e.workerRowPtrs[j])[e.workerRowIdx[j]] - //log.Infof("%v", e.rowChunks.GetRow(minRowPtr)) - for i := j + 1; i < e.concurrency ; i++ { + for i := j + 1; i < e.concurrency; i++ { if e.workerRowIdx[i] < e.workerRowLen[i] { flag := false if e.allColumnExpr { keyRowI := e.rowChunks.GetRow(minRowPtr) - //log.Infof("compare worker %d ptr len %d idx %d len %d",i , len(*e.workerRowPtrs[i]), e.workerRowIdx[i], e.workerRowLen[i] ) - //if e.workerRowIdx[i] == 8850 { - // log.Infof("compare worker %d reach 8874", i) - // break - //} keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) flag = e.lessRow(keyRowI, keyRowJ) } else { @@ -234,7 +233,6 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error } } } - //log.Infof("worker %d idx %d append rowPtr %v", j, e.workerRowIdx[j], (*e.workerRowPtrs[j])[e.workerRowIdx[j]]) e.workerRowIdx[j]++ req.AppendRow(e.rowChunks.GetRow(minRowPtr)) } @@ -261,17 +259,6 @@ func (e *MergeSortExec) fetchRowChunks(ctx context.Context) error { return nil } -//func (e *MergeSortExec) initPointers() { -// e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len()) -// e.memTracker.Consume(int64(8 * e.rowChunks.Len())) -// for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { -// rowChk := e.rowChunks.GetChunk(chkIdx) -// for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ { -// e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) -// } -// } -//} - func (e *MergeSortExec) initCompareFuncs() { e.keyCmpFuncs = make([]chunk.CompareFunc, len(e.ByItems)) for i := range e.ByItems { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 66ba563f0d422..00e88d70782c2 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -367,6 +367,7 @@ func NewSessionVars() *SessionVars { DistSQLScanConcurrency: DefDistSQLScanConcurrency, HashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency, HashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency, + MergeSortConcurrency: DefTiDBMergeSortConcurrency, } vars.MemQuota = MemQuota{ MemQuotaQuery: config.GetGlobalConfig().MemQuotaQuery, @@ -759,6 +760,9 @@ type Concurrency struct { // IndexSerialScanConcurrency is the number of concurrent index serial scan worker. IndexSerialScanConcurrency int + + //MergeSort is the number of concurrent sort worker + MergeSortConcurrency int } // MemQuota defines memory quota values. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 684e2883741f4..5432452fa45c8 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -278,6 +278,7 @@ const ( DefTiDBForcePriority = mysql.NoPriority DefTiDBUseRadixJoin = false DefEnableWindowFunction = false + DefTiDBMergeSortConcurrency = 8 ) // Process global variables. From 9c6c9590f54b1ec953bb885dc913536406b6e0a9 Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 22 Feb 2019 17:26:28 +0800 Subject: [PATCH 06/21] fix lint test --- executor/merge_sort.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 49bd75c69bf7b..653c70617f39a 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -57,7 +57,7 @@ type MergeSortExec struct { concurrency int allColumnExpr bool } - +// SortWorker represents worker routine to process sort type SortWorker struct { MergeSortExec chkIdx int @@ -109,7 +109,7 @@ func (e *MergeSortExec) Open(ctx context.Context) error { return errors.Trace(e.children[0].Open(ctx)) } -func (e *MergeSortExec) newSortWorker(workerId, chk, row, len int) *SortWorker { +func (e *MergeSortExec) newSortWorker(workerID, chk, row, len int) *SortWorker { sw := &SortWorker{ MergeSortExec: *e, chkIdx: chk, @@ -117,9 +117,9 @@ func (e *MergeSortExec) newSortWorker(workerId, chk, row, len int) *SortWorker { len: len, rowPtrs: make([]chunk.RowPtr, 0, len), } - e.workerRowLen[workerId] = len - e.workerRowIdx[workerId] = 0 - e.workerRowPtrs[workerId] = &sw.rowPtrs + e.workerRowLen[workerID] = len + e.workerRowIdx[workerID] = 0 + e.workerRowPtrs[workerID] = &sw.rowPtrs return sw } @@ -190,12 +190,11 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error go e.wait4WorkerSort(wg, finishedCh) for i := 0; i < e.concurrency; i++ { - workerId := i // last worker must complete the rest of rows if i == e.concurrency-1 { workerRowsCount += e.rowChunks.Len() % e.concurrency } - sw := e.newSortWorker(workerId, workerIdx[i][0], workerIdx[i][1], workerRowsCount) + sw := e.newSortWorker(i, workerIdx[i][0], workerIdx[i][1], workerRowsCount) go util.WithRecovery(func() { defer wg.Done() sw.run() @@ -327,15 +326,15 @@ func (e *MergeSortExec) lessRow(rowI, rowJ chunk.Row) bool { } // keyColumnsLess is the less function for key columns. -func (e *SortWorker) keyColumnsLess(i, j int) bool { - rowI := e.rowChunks.GetRow(e.rowPtrs[i]) - rowJ := e.rowChunks.GetRow(e.rowPtrs[j]) - return e.lessRow(rowI, rowJ) +func (sw *SortWorker) keyColumnsLess(i, j int) bool { + rowI := sw.rowChunks.GetRow(sw.rowPtrs[i]) + rowJ := sw.rowChunks.GetRow(sw.rowPtrs[j]) + return sw.lessRow(rowI, rowJ) } // keyChunksLess is the less function for key chunk. -func (e *SortWorker) keyChunksLess(i, j int) bool { - keyRowI := e.keyChunks.GetRow(e.rowPtrs[i]) - keyRowJ := e.keyChunks.GetRow(e.rowPtrs[j]) - return e.lessRow(keyRowI, keyRowJ) +func (sw *SortWorker) keyChunksLess(i, j int) bool { + keyRowI := sw.keyChunks.GetRow(sw.rowPtrs[i]) + keyRowJ := sw.keyChunks.GetRow(sw.rowPtrs[j]) + return sw.lessRow(keyRowI, keyRowJ) } From a1caa20c84033755851a60d2f561de818fa21f16 Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 22 Feb 2019 17:30:43 +0800 Subject: [PATCH 07/21] fix fmt --- executor/merge_sort.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 653c70617f39a..2bff3cae463fa 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -57,6 +57,7 @@ type MergeSortExec struct { concurrency int allColumnExpr bool } + // SortWorker represents worker routine to process sort type SortWorker struct { MergeSortExec From 8cb76c320fb30dd85ec5a657d4994059fba60aae Mon Sep 17 00:00:00 2001 From: yu34po Date: Mon, 25 Feb 2019 15:55:09 +0800 Subject: [PATCH 08/21] add uni test and fix some spell errors --- executor/merge_sort.go | 22 +++++++++++----------- sessionctx/variable/varsutil_test.go | 1 + 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 2bff3cae463fa..115f423e99efa 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -1,4 +1,4 @@ -// Copyright 2017 PingCAP, Inc. +// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ package executor import ( "context" - "github.com/pingcap/tidb/util" "sort" "sync" "time" @@ -25,6 +24,7 @@ import ( "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/memory" ) @@ -58,8 +58,8 @@ type MergeSortExec struct { allColumnExpr bool } -// SortWorker represents worker routine to process sort -type SortWorker struct { +// sortWorker represents worker routine to process sort. +type sortWorker struct { MergeSortExec chkIdx int rowIdx int @@ -67,7 +67,7 @@ type SortWorker struct { rowPtrs []chunk.RowPtr } -func (sw *SortWorker) run() { +func (sw *sortWorker) run() { //sw.memTracker.Consume(int64(8 * sw.rowChunks.Len())) for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks() && len(sw.rowPtrs) < sw.len; chkIdx++ { rowChk := sw.rowChunks.GetChunk(chkIdx) @@ -91,7 +91,7 @@ func (sw *SortWorker) run() { func (e *MergeSortExec) Close() error { e.memTracker.Detach() e.memTracker = nil - return errors.Trace(e.children[0].Close()) + return e.children[0].Close() } // Open implements the Executor Open interface. @@ -110,8 +110,8 @@ func (e *MergeSortExec) Open(ctx context.Context) error { return errors.Trace(e.children[0].Open(ctx)) } -func (e *MergeSortExec) newSortWorker(workerID, chk, row, len int) *SortWorker { - sw := &SortWorker{ +func (e *MergeSortExec) newsortWorker(workerID, chk, row, len int) *sortWorker { + sw := &sortWorker{ MergeSortExec: *e, chkIdx: chk, rowIdx: row, @@ -195,7 +195,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error if i == e.concurrency-1 { workerRowsCount += e.rowChunks.Len() % e.concurrency } - sw := e.newSortWorker(i, workerIdx[i][0], workerIdx[i][1], workerRowsCount) + sw := e.newsortWorker(i, workerIdx[i][0], workerIdx[i][1], workerRowsCount) go util.WithRecovery(func() { defer wg.Done() sw.run() @@ -327,14 +327,14 @@ func (e *MergeSortExec) lessRow(rowI, rowJ chunk.Row) bool { } // keyColumnsLess is the less function for key columns. -func (sw *SortWorker) keyColumnsLess(i, j int) bool { +func (sw *sortWorker) keyColumnsLess(i, j int) bool { rowI := sw.rowChunks.GetRow(sw.rowPtrs[i]) rowJ := sw.rowChunks.GetRow(sw.rowPtrs[j]) return sw.lessRow(rowI, rowJ) } // keyChunksLess is the less function for key chunk. -func (sw *SortWorker) keyChunksLess(i, j int) bool { +func (sw *sortWorker) keyChunksLess(i, j int) bool { keyRowI := sw.keyChunks.GetRow(sw.rowPtrs[i]) keyRowJ := sw.keyChunks.GetRow(sw.rowPtrs[j]) return sw.lessRow(keyRowI, keyRowJ) diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index b54dd8d392b96..394acca9303a8 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -61,6 +61,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.IndexLookupSize, Equals, DefIndexLookupSize) c.Assert(vars.IndexLookupConcurrency, Equals, DefIndexLookupConcurrency) c.Assert(vars.IndexSerialScanConcurrency, Equals, DefIndexSerialScanConcurrency) + //c.Assert(vars.MergeSortConcurrency, Equals, DefTiDBMergeSortConcurrency) c.Assert(vars.IndexLookupJoinConcurrency, Equals, DefIndexLookupJoinConcurrency) c.Assert(vars.HashJoinConcurrency, Equals, DefTiDBHashJoinConcurrency) c.Assert(vars.ProjectionConcurrency, Equals, int64(DefTiDBProjectionConcurrency)) From 8144e5d33d06472fb794511ea76bf18e0259d161 Mon Sep 17 00:00:00 2001 From: yu34po Date: Mon, 25 Feb 2019 15:56:39 +0800 Subject: [PATCH 09/21] fix spell --- executor/merge_sort.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 115f423e99efa..5439714e6a0f0 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -191,7 +191,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error go e.wait4WorkerSort(wg, finishedCh) for i := 0; i < e.concurrency; i++ { - // last worker must complete the rest of rows + // Last worker must complete the rest of rows. if i == e.concurrency-1 { workerRowsCount += e.rowChunks.Len() % e.concurrency } From 063a9a2b61f0e85996da262c35e4d8742cb3857c Mon Sep 17 00:00:00 2001 From: yu34po Date: Thu, 14 Feb 2019 16:14:39 +0800 Subject: [PATCH 10/21] add merge sort --- executor/builder.go | 2 +- executor/merge_sort.go | 340 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 341 insertions(+), 1 deletion(-) create mode 100644 executor/merge_sort.go diff --git a/executor/builder.go b/executor/builder.go index c956ea46343a9..439609ca8b8d6 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1141,7 +1141,7 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor { b.err = errors.Trace(b.err) return nil } - sortExec := SortExec{ + sortExec := MergeSortExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec), ByItems: v.ByItems, schema: v.Schema(), diff --git a/executor/merge_sort.go b/executor/merge_sort.go new file mode 100644 index 0000000000000..f9573284adfeb --- /dev/null +++ b/executor/merge_sort.go @@ -0,0 +1,340 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "github.com/pingcap/tidb/util" + "sort" + "sync" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/expression" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" +) + +// MergeSortExec represents sorting executor. +type MergeSortExec struct { + baseExecutor + + ByItems []*plannercore.ByItems + Idx int + fetched bool + schema *expression.Schema + + keyExprs []expression.Expression + keyTypes []*types.FieldType + // keyColumns is the column index of the by items. + keyColumns []int + // keyCmpFuncs is used to compare each ByItem. + keyCmpFuncs []chunk.CompareFunc + // keyChunks is used to store ByItems values when not all ByItems are column. + keyChunks *chunk.List + // rowChunks is the chunks to store row values. + rowChunks *chunk.List + // rowPointer store the chunk index and row index for each row. + workerRowPtrs []*[]chunk.RowPtr + + workerRowLen []int + workerRowIdx []int + + memTracker *memory.Tracker + + concurrency int + + allColumnExpr bool + + workerWg *sync.WaitGroup + + finishedCh chan struct{} +} + +type SortWorker struct { + MergeSortExec + chkIdx int + rowIdx int + len int + rowPtrs []chunk.RowPtr +} + +func (sw *SortWorker) run() { + //sw.memTracker.Consume(int64(8 * sw.rowChunks.Len())) + //log.Infof("chkIdx %d rowIdx %d workerlen %d", sw.chkIdx, sw.rowIdx, sw.len) + for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks(); chkIdx++ { + rowChk := sw.rowChunks.GetChunk(chkIdx) + for rowIdx := sw.rowIdx; rowIdx < rowChk.NumRows() && len(sw.rowPtrs) < sw.len; rowIdx++ { + sw.rowPtrs = append(sw.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) + } + } + + if sw.allColumnExpr { + sort.Slice(sw.rowPtrs, sw.keyColumnsLess) + } else { + sort.Slice(sw.rowPtrs, sw.keyChunksLess) + } + return +} + +// Close implements the Executor Close interface. +func (e *MergeSortExec) Close() error { + e.memTracker.Detach() + e.memTracker = nil + return errors.Trace(e.children[0].Close()) +} + +// Open implements the Executor Open interface. +func (e *MergeSortExec) Open(ctx context.Context) error { + e.fetched = false + e.Idx = 0 + e.concurrency = 4 + e.workerRowIdx = make([]int, e.concurrency) + e.workerRowLen = make([]int, e.concurrency) + e.workerRowPtrs = make([]*[]chunk.RowPtr, e.concurrency) + // To avoid duplicated initialization for TopNExec. + if e.memTracker == nil { + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaSort) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + } + return errors.Trace(e.children[0].Open(ctx)) +} + +func (e *MergeSortExec) newSortWorker(chk, row, len int) *SortWorker { + return &SortWorker{ + MergeSortExec: *e, + chkIdx: chk, + rowIdx: row, + len: len, + rowPtrs: make([]chunk.RowPtr, 0, len), + } +} + +func (e *MergeSortExec) wait4WorkerSort(wg *sync.WaitGroup, finishedCh chan struct{}) { + wg.Wait() + close(finishedCh) +} + +// Next implements the Executor Next interface. +func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("sort.Next", opentracing.ChildOf(span.Context())) + defer span1.Finish() + } + if e.runtimeStats != nil { + start := time.Now() + defer func() { e.runtimeStats.Record(time.Since(start), req.NumRows()) }() + } + req.Reset() + if !e.fetched { + err := e.fetchRowChunks(ctx) + if err != nil { + return errors.Trace(err) + } + + e.initCompareFuncs() + e.allColumnExpr = e.buildKeyColumns() + if !e.allColumnExpr { + e.buildKeyExprsAndTypes() + err := e.buildKeyChunks() + if err != nil { + return err + } + } + e.finishedCh = make(chan struct{}) + wg := &sync.WaitGroup{} + wg.Add(int(e.concurrency)) + go e.wait4WorkerSort(wg, e.finishedCh) + avgLen := 0 + avgLen = e.rowChunks.Len() / e.concurrency + //log.Infof("row count %d avgLen %d", e.rowChunks.Len(), avgLen) + + for i := 0; i < e.concurrency; i++ { + chkIdx := avgLen * i / e.maxChunkSize + rowIdx := avgLen * i % e.maxChunkSize + if i == e.concurrency-1 { + avgLen = e.rowChunks.Len()%e.concurrency + avgLen + } + + sortWorker := e.newSortWorker(chkIdx, rowIdx, avgLen) + e.workerRowLen[i] = avgLen + e.workerRowIdx[i] = 0 + e.workerRowPtrs[i] = &sortWorker.rowPtrs + + go util.WithRecovery(func() { + defer wg.Done() + sortWorker.run() + }, nil) + } + + e.fetched = true + <-e.finishedCh + for i := 0; i < e.concurrency; i++ { + //log.Infof("worker %d row count %d",i, len(*e.workerRowPtrs[i])) + for j := 0; j < len(*e.workerRowPtrs[i]); j++ { + //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) + } + } + } + for req.NumRows() < e.maxChunkSize { + i := 0 + j := 0 + for; j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j];{ + j++ + } + //log.Infof("j %d", j) + if j >= e.concurrency { + break + } + minRowPtr := (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + + for i = j; i < e.concurrency; i++ { + if e.workerRowIdx[i] < e.workerRowLen[i] { + flag := false + if e.allColumnExpr { + keyRowI := e.rowChunks.GetRow(minRowPtr) + keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[j])[e.workerRowIdx[j]]) + flag = e.lessRow(keyRowI, keyRowJ) + } else { + keyRowI := e.keyChunks.GetRow(minRowPtr) + keyRowJ := e.keyChunks.GetRow((*e.workerRowPtrs[j])[e.workerRowIdx[j]]) + flag = e.lessRow(keyRowI, keyRowJ) + } + if flag { + minRowPtr = (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + j = i + } + } + } + e.workerRowIdx[j]++ + //log.Infof("worker %d idx increase to %d", j, e.workerRowIdx[j]) + req.AppendRow(e.rowChunks.GetRow(minRowPtr)) + } + return nil +} + +func (e *MergeSortExec) fetchRowChunks(ctx context.Context) error { + fields := e.retTypes() + e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize) + e.rowChunks.GetMemTracker().AttachTo(e.memTracker) + e.rowChunks.GetMemTracker().SetLabel("rowChunks") + for { + chk := e.children[0].newFirstChunk() + err := e.children[0].Next(ctx, chunk.NewRecordBatch(chk)) + if err != nil { + return errors.Trace(err) + } + rowCount := chk.NumRows() + if rowCount == 0 { + break + } + e.rowChunks.Add(chk) + } + return nil +} + +//func (e *MergeSortExec) initPointers() { +// e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len()) +// e.memTracker.Consume(int64(8 * e.rowChunks.Len())) +// for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { +// rowChk := e.rowChunks.GetChunk(chkIdx) +// for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ { +// e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) +// } +// } +//} + +func (e *MergeSortExec) initCompareFuncs() { + e.keyCmpFuncs = make([]chunk.CompareFunc, len(e.ByItems)) + for i := range e.ByItems { + keyType := e.ByItems[i].Expr.GetType() + e.keyCmpFuncs[i] = chunk.GetCompareFunc(keyType) + } +} + +func (e *MergeSortExec) buildKeyColumns() (allColumnExpr bool) { + e.keyColumns = make([]int, 0, len(e.ByItems)) + for _, by := range e.ByItems { + if col, ok := by.Expr.(*expression.Column); ok { + e.keyColumns = append(e.keyColumns, col.Index) + } else { + e.keyColumns = e.keyColumns[:0] + for i := range e.ByItems { + e.keyColumns = append(e.keyColumns, i) + } + return false + } + } + return true +} + +func (e *MergeSortExec) buildKeyExprsAndTypes() { + keyLen := len(e.ByItems) + e.keyTypes = make([]*types.FieldType, keyLen) + e.keyExprs = make([]expression.Expression, keyLen) + for keyColIdx := range e.ByItems { + e.keyExprs[keyColIdx] = e.ByItems[keyColIdx].Expr + e.keyTypes[keyColIdx] = e.ByItems[keyColIdx].Expr.GetType() + } +} + +func (e *MergeSortExec) buildKeyChunks() error { + e.keyChunks = chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) + e.keyChunks.GetMemTracker().SetLabel("keyChunks") + e.keyChunks.GetMemTracker().AttachTo(e.memTracker) + + for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { + keyChk := chunk.NewChunkWithCapacity(e.keyTypes, e.rowChunks.GetChunk(chkIdx).NumRows()) + childIter := chunk.NewIterator4Chunk(e.rowChunks.GetChunk(chkIdx)) + err := expression.VectorizedExecute(e.ctx, e.keyExprs, childIter, keyChk) + if err != nil { + return errors.Trace(err) + } + e.keyChunks.Add(keyChk) + } + return nil +} + +func (e *MergeSortExec) lessRow(rowI, rowJ chunk.Row) bool { + for i, colIdx := range e.keyColumns { + cmpFunc := e.keyCmpFuncs[i] + cmp := cmpFunc(rowI, colIdx, rowJ, colIdx) + if e.ByItems[i].Desc { + cmp = -cmp + } + if cmp < 0 { + return true + } else if cmp > 0 { + return false + } + } + return false +} + +// keyColumnsLess is the less function for key columns. +func (e *SortWorker) keyColumnsLess(i, j int) bool { + rowI := e.rowChunks.GetRow(e.rowPtrs[i]) + rowJ := e.rowChunks.GetRow(e.rowPtrs[j]) + return e.lessRow(rowI, rowJ) +} + +// keyChunksLess is the less function for key chunk. +func (e *SortWorker) keyChunksLess(i, j int) bool { + keyRowI := e.keyChunks.GetRow(e.rowPtrs[i]) + keyRowJ := e.keyChunks.GetRow(e.rowPtrs[j]) + return e.lessRow(keyRowI, keyRowJ) +} From 850d01f0b428cb72ccebaa172fa102a7628ea86e Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 15 Feb 2019 14:12:43 +0800 Subject: [PATCH 11/21] fix index bug --- executor/merge_sort.go | 56 +++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index f9573284adfeb..f485c170d5ffd 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -73,15 +73,21 @@ type SortWorker struct { rowPtrs []chunk.RowPtr } -func (sw *SortWorker) run() { +func (sw *SortWorker) run(workerId int) { //sw.memTracker.Consume(int64(8 * sw.rowChunks.Len())) - //log.Infof("chkIdx %d rowIdx %d workerlen %d", sw.chkIdx, sw.rowIdx, sw.len) - for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks(); chkIdx++ { + for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks() && len(sw.rowPtrs) < sw.len; chkIdx++ { rowChk := sw.rowChunks.GetChunk(chkIdx) - for rowIdx := sw.rowIdx; rowIdx < rowChk.NumRows() && len(sw.rowPtrs) < sw.len; rowIdx++ { + rowIdx := 0 + if chkIdx == sw.chkIdx { + rowIdx = sw.rowIdx + } + //log.Infof("workerId %d chkIdx %d rowIdx %d rowChkNum %d rowPtrs %d", workerId,chkIdx, rowIdx, rowChk.NumRows(), len(sw.rowPtrs)) + for ; rowIdx < rowChk.NumRows() && len(sw.rowPtrs) < sw.len; rowIdx++ { sw.rowPtrs = append(sw.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) } + //log.Infof("workerId %d chkIdx %d rowPtrs %d", workerId,chkIdx, len(sw.rowPtrs)) } + //log.Infof("workerId %d chkIdx %d rowIdx %d workerlen %d, rowPtrsLen %d", workerId, sw.chkIdx, sw.rowIdx, sw.len, len(sw.rowPtrs)) if sw.allColumnExpr { sort.Slice(sw.rowPtrs, sw.keyColumnsLess) @@ -161,67 +167,73 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error go e.wait4WorkerSort(wg, e.finishedCh) avgLen := 0 avgLen = e.rowChunks.Len() / e.concurrency - //log.Infof("row count %d avgLen %d", e.rowChunks.Len(), avgLen) + //log.Infof("allcolumnExpr %v row count %d row chunk %d avgLen %d", e.allColumnExpr, e.rowChunks.Len(),e.rowChunks.NumChunks(), avgLen) for i := 0; i < e.concurrency; i++ { - chkIdx := avgLen * i / e.maxChunkSize - rowIdx := avgLen * i % e.maxChunkSize + chkIdx := (avgLen * i) / e.maxChunkSize + rowIdx := (avgLen * i) % e.maxChunkSize if i == e.concurrency-1 { avgLen = e.rowChunks.Len()%e.concurrency + avgLen } + //log.Infof("worker %d chunkIdx %d rowIdx %d rowLen %d maxChunkSize %d", i, chkIdx, rowIdx, avgLen, e.maxChunkSize) sortWorker := e.newSortWorker(chkIdx, rowIdx, avgLen) e.workerRowLen[i] = avgLen e.workerRowIdx[i] = 0 e.workerRowPtrs[i] = &sortWorker.rowPtrs - + workerId := i go util.WithRecovery(func() { defer wg.Done() - sortWorker.run() + sortWorker.run(workerId) }, nil) } e.fetched = true <-e.finishedCh for i := 0; i < e.concurrency; i++ { - //log.Infof("worker %d row count %d",i, len(*e.workerRowPtrs[i])) - for j := 0; j < len(*e.workerRowPtrs[i]); j++ { - //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) - } + //log.Infof("worker %d row count %d row len %d",i, len(*e.workerRowPtrs[i]), e.workerRowLen[i]) + // for j := 0; j < len(*e.workerRowPtrs[i]); j++ { + // //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) + // } } } for req.NumRows() < e.maxChunkSize { - i := 0 j := 0 for; j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j];{ j++ } - //log.Infof("j %d", j) if j >= e.concurrency { break } - minRowPtr := (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + //log.Infof("start worker %d ptr len %d idx %d len %d",j , len(*e.workerRowPtrs[j]), e.workerRowIdx[j], e.workerRowLen[j] ) - for i = j; i < e.concurrency; i++ { + minRowPtr := (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + //log.Infof("%v", e.rowChunks.GetRow(minRowPtr)) + for i := j + 1; i < e.concurrency ; i++ { if e.workerRowIdx[i] < e.workerRowLen[i] { flag := false if e.allColumnExpr { keyRowI := e.rowChunks.GetRow(minRowPtr) - keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[j])[e.workerRowIdx[j]]) + //log.Infof("compare worker %d ptr len %d idx %d len %d",i , len(*e.workerRowPtrs[i]), e.workerRowIdx[i], e.workerRowLen[i] ) + //if e.workerRowIdx[i] == 8850 { + // log.Infof("compare worker %d reach 8874", i) + // break + //} + keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) flag = e.lessRow(keyRowI, keyRowJ) } else { keyRowI := e.keyChunks.GetRow(minRowPtr) - keyRowJ := e.keyChunks.GetRow((*e.workerRowPtrs[j])[e.workerRowIdx[j]]) + keyRowJ := e.keyChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) flag = e.lessRow(keyRowI, keyRowJ) } - if flag { - minRowPtr = (*e.workerRowPtrs[j])[e.workerRowIdx[j]] + if !flag { + minRowPtr = (*e.workerRowPtrs[i])[e.workerRowIdx[i]] j = i } } } + //log.Infof("worker %d idx %d append rowPtr %v", j, e.workerRowIdx[j], (*e.workerRowPtrs[j])[e.workerRowIdx[j]]) e.workerRowIdx[j]++ - //log.Infof("worker %d idx increase to %d", j, e.workerRowIdx[j]) req.AppendRow(e.rowChunks.GetRow(minRowPtr)) } return nil From d65836f8eca8fad2581d2f0f36ac3779a61dcb84 Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 15 Feb 2019 14:56:51 +0800 Subject: [PATCH 12/21] add log --- executor/merge_sort.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index f485c170d5ffd..b74a08bd99d48 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -27,6 +27,8 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/memory" + log "github.com/sirupsen/logrus" + ) // MergeSortExec represents sorting executor. @@ -167,7 +169,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error go e.wait4WorkerSort(wg, e.finishedCh) avgLen := 0 avgLen = e.rowChunks.Len() / e.concurrency - //log.Infof("allcolumnExpr %v row count %d row chunk %d avgLen %d", e.allColumnExpr, e.rowChunks.Len(),e.rowChunks.NumChunks(), avgLen) + log.Infof("allcolumnExpr %v row count %d row chunk %d avgLen %d", e.allColumnExpr, e.rowChunks.Len(),e.rowChunks.NumChunks(), avgLen) for i := 0; i < e.concurrency; i++ { chkIdx := (avgLen * i) / e.maxChunkSize @@ -191,7 +193,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error e.fetched = true <-e.finishedCh for i := 0; i < e.concurrency; i++ { - //log.Infof("worker %d row count %d row len %d",i, len(*e.workerRowPtrs[i]), e.workerRowLen[i]) + log.Infof("worker %d row count %d row len %d",i, len(*e.workerRowPtrs[i]), e.workerRowLen[i]) // for j := 0; j < len(*e.workerRowPtrs[i]); j++ { // //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) // } From ee09e17ed5cf85bd3485d9a186780f5db1a41df3 Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 15 Feb 2019 15:03:41 +0800 Subject: [PATCH 13/21] add concurrency --- executor/merge_sort.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index b74a08bd99d48..0b1a2fe26fcbd 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -110,7 +110,7 @@ func (e *MergeSortExec) Close() error { func (e *MergeSortExec) Open(ctx context.Context) error { e.fetched = false e.Idx = 0 - e.concurrency = 4 + e.concurrency = 8 e.workerRowIdx = make([]int, e.concurrency) e.workerRowLen = make([]int, e.concurrency) e.workerRowPtrs = make([]*[]chunk.RowPtr, e.concurrency) From bbb9cccd686ed941d9d9a8805e35ee32086ea0a6 Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 22 Feb 2019 17:13:55 +0800 Subject: [PATCH 14/21] fix unit test --- executor/executor.go | 1 + executor/merge_sort.go | 117 ++++++++++++++----------------- sessionctx/variable/session.go | 4 ++ sessionctx/variable/tidb_vars.go | 1 + 4 files changed, 58 insertions(+), 65 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index 0b35afb995a2a..10a8b6c5f982d 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -71,6 +71,7 @@ var ( _ Executor = &HashJoinExec{} _ Executor = &IndexLookUpExecutor{} _ Executor = &MergeJoinExec{} + _ Executor = &MergeSortExec{} ) type baseExecutor struct { diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 0b1a2fe26fcbd..49bd75c69bf7b 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -27,8 +27,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/memory" - log "github.com/sirupsen/logrus" - ) // MergeSortExec represents sorting executor. @@ -36,7 +34,6 @@ type MergeSortExec struct { baseExecutor ByItems []*plannercore.ByItems - Idx int fetched bool schema *expression.Schema @@ -56,15 +53,9 @@ type MergeSortExec struct { workerRowLen []int workerRowIdx []int - memTracker *memory.Tracker - - concurrency int - + memTracker *memory.Tracker + concurrency int allColumnExpr bool - - workerWg *sync.WaitGroup - - finishedCh chan struct{} } type SortWorker struct { @@ -75,7 +66,7 @@ type SortWorker struct { rowPtrs []chunk.RowPtr } -func (sw *SortWorker) run(workerId int) { +func (sw *SortWorker) run() { //sw.memTracker.Consume(int64(8 * sw.rowChunks.Len())) for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks() && len(sw.rowPtrs) < sw.len; chkIdx++ { rowChk := sw.rowChunks.GetChunk(chkIdx) @@ -83,14 +74,10 @@ func (sw *SortWorker) run(workerId int) { if chkIdx == sw.chkIdx { rowIdx = sw.rowIdx } - //log.Infof("workerId %d chkIdx %d rowIdx %d rowChkNum %d rowPtrs %d", workerId,chkIdx, rowIdx, rowChk.NumRows(), len(sw.rowPtrs)) for ; rowIdx < rowChk.NumRows() && len(sw.rowPtrs) < sw.len; rowIdx++ { sw.rowPtrs = append(sw.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) } - //log.Infof("workerId %d chkIdx %d rowPtrs %d", workerId,chkIdx, len(sw.rowPtrs)) } - //log.Infof("workerId %d chkIdx %d rowIdx %d workerlen %d, rowPtrsLen %d", workerId, sw.chkIdx, sw.rowIdx, sw.len, len(sw.rowPtrs)) - if sw.allColumnExpr { sort.Slice(sw.rowPtrs, sw.keyColumnsLess) } else { @@ -109,8 +96,8 @@ func (e *MergeSortExec) Close() error { // Open implements the Executor Open interface. func (e *MergeSortExec) Open(ctx context.Context) error { e.fetched = false - e.Idx = 0 - e.concurrency = 8 + e.concurrency = e.ctx.GetSessionVars().MergeSortConcurrency + e.workerRowIdx = make([]int, e.concurrency) e.workerRowLen = make([]int, e.concurrency) e.workerRowPtrs = make([]*[]chunk.RowPtr, e.concurrency) @@ -122,14 +109,18 @@ func (e *MergeSortExec) Open(ctx context.Context) error { return errors.Trace(e.children[0].Open(ctx)) } -func (e *MergeSortExec) newSortWorker(chk, row, len int) *SortWorker { - return &SortWorker{ +func (e *MergeSortExec) newSortWorker(workerId, chk, row, len int) *SortWorker { + sw := &SortWorker{ MergeSortExec: *e, chkIdx: chk, rowIdx: row, len: len, rowPtrs: make([]chunk.RowPtr, 0, len), } + e.workerRowLen[workerId] = len + e.workerRowIdx[workerId] = 0 + e.workerRowPtrs[workerId] = &sw.rowPtrs + return sw } func (e *MergeSortExec) wait4WorkerSort(wg *sync.WaitGroup, finishedCh chan struct{}) { @@ -137,6 +128,32 @@ func (e *MergeSortExec) wait4WorkerSort(wg *sync.WaitGroup, finishedCh chan stru close(finishedCh) } +//sortWorkerIndex calc the chunk index and row index with every worker start to sort, first column of swIdx is chunk idx, second columm of swIdx is row idx +func (e *MergeSortExec) sortWorkerIndex(workerRowsCount int) [][]int { + chkIdx := 0 + rowIdx := 0 + swIdx := make([][]int, e.concurrency) + swIdx[0] = []int{0, 0} + for i := 1; i < e.concurrency; i++ { + count := 0 + swIdx[i] = []int{0, 0} + for j := chkIdx; j < e.rowChunks.NumChunks(); j++ { + curChk := e.rowChunks.GetChunk(j) + count += curChk.NumRows() + if j == chkIdx { + count -= rowIdx + } + if count > workerRowsCount { + rowIdx = curChk.NumRows() - (count - workerRowsCount) + chkIdx = j + swIdx[i] = []int{chkIdx, rowIdx} + break + } + } + } + return swIdx +} + // Next implements the Executor Next interface. func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { @@ -163,64 +180,46 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error return err } } - e.finishedCh = make(chan struct{}) + finishedCh := make(chan struct{}) + + workerRowsCount := e.rowChunks.Len() / e.concurrency + workerIdx := e.sortWorkerIndex(workerRowsCount) + wg := &sync.WaitGroup{} wg.Add(int(e.concurrency)) - go e.wait4WorkerSort(wg, e.finishedCh) - avgLen := 0 - avgLen = e.rowChunks.Len() / e.concurrency - log.Infof("allcolumnExpr %v row count %d row chunk %d avgLen %d", e.allColumnExpr, e.rowChunks.Len(),e.rowChunks.NumChunks(), avgLen) + go e.wait4WorkerSort(wg, finishedCh) for i := 0; i < e.concurrency; i++ { - chkIdx := (avgLen * i) / e.maxChunkSize - rowIdx := (avgLen * i) % e.maxChunkSize + workerId := i + // last worker must complete the rest of rows if i == e.concurrency-1 { - avgLen = e.rowChunks.Len()%e.concurrency + avgLen + workerRowsCount += e.rowChunks.Len() % e.concurrency } - //log.Infof("worker %d chunkIdx %d rowIdx %d rowLen %d maxChunkSize %d", i, chkIdx, rowIdx, avgLen, e.maxChunkSize) - - sortWorker := e.newSortWorker(chkIdx, rowIdx, avgLen) - e.workerRowLen[i] = avgLen - e.workerRowIdx[i] = 0 - e.workerRowPtrs[i] = &sortWorker.rowPtrs - workerId := i + sw := e.newSortWorker(workerId, workerIdx[i][0], workerIdx[i][1], workerRowsCount) go util.WithRecovery(func() { defer wg.Done() - sortWorker.run(workerId) + sw.run() }, nil) } + <-finishedCh e.fetched = true - <-e.finishedCh - for i := 0; i < e.concurrency; i++ { - log.Infof("worker %d row count %d row len %d",i, len(*e.workerRowPtrs[i]), e.workerRowLen[i]) - // for j := 0; j < len(*e.workerRowPtrs[i]); j++ { - // //log.Infof("worker %d row %d ptr %v",i, j, (*e.workerRowPtrs[i])[j]) - // } - } } + for req.NumRows() < e.maxChunkSize { j := 0 - for; j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j];{ + for j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j] { j++ } if j >= e.concurrency { break } - //log.Infof("start worker %d ptr len %d idx %d len %d",j , len(*e.workerRowPtrs[j]), e.workerRowIdx[j], e.workerRowLen[j] ) - minRowPtr := (*e.workerRowPtrs[j])[e.workerRowIdx[j]] - //log.Infof("%v", e.rowChunks.GetRow(minRowPtr)) - for i := j + 1; i < e.concurrency ; i++ { + for i := j + 1; i < e.concurrency; i++ { if e.workerRowIdx[i] < e.workerRowLen[i] { flag := false if e.allColumnExpr { keyRowI := e.rowChunks.GetRow(minRowPtr) - //log.Infof("compare worker %d ptr len %d idx %d len %d",i , len(*e.workerRowPtrs[i]), e.workerRowIdx[i], e.workerRowLen[i] ) - //if e.workerRowIdx[i] == 8850 { - // log.Infof("compare worker %d reach 8874", i) - // break - //} keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) flag = e.lessRow(keyRowI, keyRowJ) } else { @@ -234,7 +233,6 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error } } } - //log.Infof("worker %d idx %d append rowPtr %v", j, e.workerRowIdx[j], (*e.workerRowPtrs[j])[e.workerRowIdx[j]]) e.workerRowIdx[j]++ req.AppendRow(e.rowChunks.GetRow(minRowPtr)) } @@ -261,17 +259,6 @@ func (e *MergeSortExec) fetchRowChunks(ctx context.Context) error { return nil } -//func (e *MergeSortExec) initPointers() { -// e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len()) -// e.memTracker.Consume(int64(8 * e.rowChunks.Len())) -// for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { -// rowChk := e.rowChunks.GetChunk(chkIdx) -// for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ { -// e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) -// } -// } -//} - func (e *MergeSortExec) initCompareFuncs() { e.keyCmpFuncs = make([]chunk.CompareFunc, len(e.ByItems)) for i := range e.ByItems { diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 15f350d2160b6..e1ac3cd71c8d6 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -367,6 +367,7 @@ func NewSessionVars() *SessionVars { DistSQLScanConcurrency: DefDistSQLScanConcurrency, HashAggPartialConcurrency: DefTiDBHashAggPartialConcurrency, HashAggFinalConcurrency: DefTiDBHashAggFinalConcurrency, + MergeSortConcurrency: DefTiDBMergeSortConcurrency, } vars.MemQuota = MemQuota{ MemQuotaQuery: config.GetGlobalConfig().MemQuotaQuery, @@ -763,6 +764,9 @@ type Concurrency struct { // IndexSerialScanConcurrency is the number of concurrent index serial scan worker. IndexSerialScanConcurrency int + + //MergeSort is the number of concurrent sort worker + MergeSortConcurrency int } // MemQuota defines memory quota values. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index e0eeb79a3a5d4..1ce2409268943 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -285,6 +285,7 @@ const ( DefTiDBForcePriority = mysql.NoPriority DefTiDBUseRadixJoin = false DefEnableWindowFunction = false + DefTiDBMergeSortConcurrency = 8 ) // Process global variables. From fa0d7e566c1a73e49d9535911a0b004526129aa6 Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 22 Feb 2019 17:26:28 +0800 Subject: [PATCH 15/21] fix lint test --- executor/merge_sort.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 49bd75c69bf7b..653c70617f39a 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -57,7 +57,7 @@ type MergeSortExec struct { concurrency int allColumnExpr bool } - +// SortWorker represents worker routine to process sort type SortWorker struct { MergeSortExec chkIdx int @@ -109,7 +109,7 @@ func (e *MergeSortExec) Open(ctx context.Context) error { return errors.Trace(e.children[0].Open(ctx)) } -func (e *MergeSortExec) newSortWorker(workerId, chk, row, len int) *SortWorker { +func (e *MergeSortExec) newSortWorker(workerID, chk, row, len int) *SortWorker { sw := &SortWorker{ MergeSortExec: *e, chkIdx: chk, @@ -117,9 +117,9 @@ func (e *MergeSortExec) newSortWorker(workerId, chk, row, len int) *SortWorker { len: len, rowPtrs: make([]chunk.RowPtr, 0, len), } - e.workerRowLen[workerId] = len - e.workerRowIdx[workerId] = 0 - e.workerRowPtrs[workerId] = &sw.rowPtrs + e.workerRowLen[workerID] = len + e.workerRowIdx[workerID] = 0 + e.workerRowPtrs[workerID] = &sw.rowPtrs return sw } @@ -190,12 +190,11 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error go e.wait4WorkerSort(wg, finishedCh) for i := 0; i < e.concurrency; i++ { - workerId := i // last worker must complete the rest of rows if i == e.concurrency-1 { workerRowsCount += e.rowChunks.Len() % e.concurrency } - sw := e.newSortWorker(workerId, workerIdx[i][0], workerIdx[i][1], workerRowsCount) + sw := e.newSortWorker(i, workerIdx[i][0], workerIdx[i][1], workerRowsCount) go util.WithRecovery(func() { defer wg.Done() sw.run() @@ -327,15 +326,15 @@ func (e *MergeSortExec) lessRow(rowI, rowJ chunk.Row) bool { } // keyColumnsLess is the less function for key columns. -func (e *SortWorker) keyColumnsLess(i, j int) bool { - rowI := e.rowChunks.GetRow(e.rowPtrs[i]) - rowJ := e.rowChunks.GetRow(e.rowPtrs[j]) - return e.lessRow(rowI, rowJ) +func (sw *SortWorker) keyColumnsLess(i, j int) bool { + rowI := sw.rowChunks.GetRow(sw.rowPtrs[i]) + rowJ := sw.rowChunks.GetRow(sw.rowPtrs[j]) + return sw.lessRow(rowI, rowJ) } // keyChunksLess is the less function for key chunk. -func (e *SortWorker) keyChunksLess(i, j int) bool { - keyRowI := e.keyChunks.GetRow(e.rowPtrs[i]) - keyRowJ := e.keyChunks.GetRow(e.rowPtrs[j]) - return e.lessRow(keyRowI, keyRowJ) +func (sw *SortWorker) keyChunksLess(i, j int) bool { + keyRowI := sw.keyChunks.GetRow(sw.rowPtrs[i]) + keyRowJ := sw.keyChunks.GetRow(sw.rowPtrs[j]) + return sw.lessRow(keyRowI, keyRowJ) } From 54cc33bc5abed2c74afa343d7ba460581cc810ab Mon Sep 17 00:00:00 2001 From: yu34po Date: Fri, 22 Feb 2019 17:30:43 +0800 Subject: [PATCH 16/21] fix fmt --- executor/merge_sort.go | 1 + 1 file changed, 1 insertion(+) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 653c70617f39a..2bff3cae463fa 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -57,6 +57,7 @@ type MergeSortExec struct { concurrency int allColumnExpr bool } + // SortWorker represents worker routine to process sort type SortWorker struct { MergeSortExec From be1acd43263886be2ed444dbd56178e21e12ba8f Mon Sep 17 00:00:00 2001 From: yu34po Date: Mon, 25 Feb 2019 15:55:09 +0800 Subject: [PATCH 17/21] add uni test and fix some spell errors --- executor/merge_sort.go | 22 +++++++++++----------- sessionctx/variable/varsutil_test.go | 1 + 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 2bff3cae463fa..115f423e99efa 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -1,4 +1,4 @@ -// Copyright 2017 PingCAP, Inc. +// Copyright 2019 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ package executor import ( "context" - "github.com/pingcap/tidb/util" "sort" "sync" "time" @@ -25,6 +24,7 @@ import ( "github.com/pingcap/tidb/expression" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/memory" ) @@ -58,8 +58,8 @@ type MergeSortExec struct { allColumnExpr bool } -// SortWorker represents worker routine to process sort -type SortWorker struct { +// sortWorker represents worker routine to process sort. +type sortWorker struct { MergeSortExec chkIdx int rowIdx int @@ -67,7 +67,7 @@ type SortWorker struct { rowPtrs []chunk.RowPtr } -func (sw *SortWorker) run() { +func (sw *sortWorker) run() { //sw.memTracker.Consume(int64(8 * sw.rowChunks.Len())) for chkIdx := sw.chkIdx; chkIdx < sw.rowChunks.NumChunks() && len(sw.rowPtrs) < sw.len; chkIdx++ { rowChk := sw.rowChunks.GetChunk(chkIdx) @@ -91,7 +91,7 @@ func (sw *SortWorker) run() { func (e *MergeSortExec) Close() error { e.memTracker.Detach() e.memTracker = nil - return errors.Trace(e.children[0].Close()) + return e.children[0].Close() } // Open implements the Executor Open interface. @@ -110,8 +110,8 @@ func (e *MergeSortExec) Open(ctx context.Context) error { return errors.Trace(e.children[0].Open(ctx)) } -func (e *MergeSortExec) newSortWorker(workerID, chk, row, len int) *SortWorker { - sw := &SortWorker{ +func (e *MergeSortExec) newsortWorker(workerID, chk, row, len int) *sortWorker { + sw := &sortWorker{ MergeSortExec: *e, chkIdx: chk, rowIdx: row, @@ -195,7 +195,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error if i == e.concurrency-1 { workerRowsCount += e.rowChunks.Len() % e.concurrency } - sw := e.newSortWorker(i, workerIdx[i][0], workerIdx[i][1], workerRowsCount) + sw := e.newsortWorker(i, workerIdx[i][0], workerIdx[i][1], workerRowsCount) go util.WithRecovery(func() { defer wg.Done() sw.run() @@ -327,14 +327,14 @@ func (e *MergeSortExec) lessRow(rowI, rowJ chunk.Row) bool { } // keyColumnsLess is the less function for key columns. -func (sw *SortWorker) keyColumnsLess(i, j int) bool { +func (sw *sortWorker) keyColumnsLess(i, j int) bool { rowI := sw.rowChunks.GetRow(sw.rowPtrs[i]) rowJ := sw.rowChunks.GetRow(sw.rowPtrs[j]) return sw.lessRow(rowI, rowJ) } // keyChunksLess is the less function for key chunk. -func (sw *SortWorker) keyChunksLess(i, j int) bool { +func (sw *sortWorker) keyChunksLess(i, j int) bool { keyRowI := sw.keyChunks.GetRow(sw.rowPtrs[i]) keyRowJ := sw.keyChunks.GetRow(sw.rowPtrs[j]) return sw.lessRow(keyRowI, keyRowJ) diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 580d2506488c8..8fe9e799ba1fb 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -61,6 +61,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) { c.Assert(vars.IndexLookupSize, Equals, DefIndexLookupSize) c.Assert(vars.IndexLookupConcurrency, Equals, DefIndexLookupConcurrency) c.Assert(vars.IndexSerialScanConcurrency, Equals, DefIndexSerialScanConcurrency) + //c.Assert(vars.MergeSortConcurrency, Equals, DefTiDBMergeSortConcurrency) c.Assert(vars.IndexLookupJoinConcurrency, Equals, DefIndexLookupJoinConcurrency) c.Assert(vars.HashJoinConcurrency, Equals, DefTiDBHashJoinConcurrency) c.Assert(vars.ProjectionConcurrency, Equals, int64(DefTiDBProjectionConcurrency)) From 3f6452e58989d4aa7851bea8dd7aec87dfb259fd Mon Sep 17 00:00:00 2001 From: yu34po Date: Mon, 25 Feb 2019 15:56:39 +0800 Subject: [PATCH 18/21] fix spell --- executor/merge_sort.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 115f423e99efa..5439714e6a0f0 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -191,7 +191,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error go e.wait4WorkerSort(wg, finishedCh) for i := 0; i < e.concurrency; i++ { - // last worker must complete the rest of rows + // Last worker must complete the rest of rows. if i == e.concurrency-1 { workerRowsCount += e.rowChunks.Len() % e.concurrency } From 9e761c89189ec2d51653becc7b11e10d766aef91 Mon Sep 17 00:00:00 2001 From: yu34po Date: Wed, 27 Feb 2019 16:29:10 +0800 Subject: [PATCH 19/21] del finishch --- executor/merge_sort.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 5439714e6a0f0..52d4cac14c45c 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -124,11 +124,6 @@ func (e *MergeSortExec) newsortWorker(workerID, chk, row, len int) *sortWorker { return sw } -func (e *MergeSortExec) wait4WorkerSort(wg *sync.WaitGroup, finishedCh chan struct{}) { - wg.Wait() - close(finishedCh) -} - //sortWorkerIndex calc the chunk index and row index with every worker start to sort, first column of swIdx is chunk idx, second columm of swIdx is row idx func (e *MergeSortExec) sortWorkerIndex(workerRowsCount int) [][]int { chkIdx := 0 @@ -181,14 +176,12 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error return err } } - finishedCh := make(chan struct{}) workerRowsCount := e.rowChunks.Len() / e.concurrency workerIdx := e.sortWorkerIndex(workerRowsCount) wg := &sync.WaitGroup{} wg.Add(int(e.concurrency)) - go e.wait4WorkerSort(wg, finishedCh) for i := 0; i < e.concurrency; i++ { // Last worker must complete the rest of rows. @@ -202,11 +195,11 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error }, nil) } - <-finishedCh + wg.Wait() e.fetched = true } - for req.NumRows() < e.maxChunkSize { + for !req.IsFull() { j := 0 for j < e.concurrency && e.workerRowIdx[j] >= e.workerRowLen[j] { j++ From d98f32ab6e886ea5fec21345e3e7af9a4743a194 Mon Sep 17 00:00:00 2001 From: yu34po Date: Thu, 28 Feb 2019 11:57:40 +0800 Subject: [PATCH 20/21] del key chunk expr --- executor/merge_sort.go | 70 +++++++----------------------------------- 1 file changed, 11 insertions(+), 59 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index 52d4cac14c45c..b5a53af79068a 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -53,9 +53,8 @@ type MergeSortExec struct { workerRowLen []int workerRowIdx []int - memTracker *memory.Tracker - concurrency int - allColumnExpr bool + memTracker *memory.Tracker + concurrency int } // sortWorker represents worker routine to process sort. @@ -79,11 +78,8 @@ func (sw *sortWorker) run() { sw.rowPtrs = append(sw.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) } } - if sw.allColumnExpr { - sort.Slice(sw.rowPtrs, sw.keyColumnsLess) - } else { - sort.Slice(sw.rowPtrs, sw.keyChunksLess) - } + sort.Slice(sw.rowPtrs, sw.keyColumnsLess) + return } @@ -168,14 +164,7 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error } e.initCompareFuncs() - e.allColumnExpr = e.buildKeyColumns() - if !e.allColumnExpr { - e.buildKeyExprsAndTypes() - err := e.buildKeyChunks() - if err != nil { - return err - } - } + e.buildKeyColumns() workerRowsCount := e.rowChunks.Len() / e.concurrency workerIdx := e.sortWorkerIndex(workerRowsCount) @@ -211,15 +200,9 @@ func (e *MergeSortExec) Next(ctx context.Context, req *chunk.RecordBatch) error for i := j + 1; i < e.concurrency; i++ { if e.workerRowIdx[i] < e.workerRowLen[i] { flag := false - if e.allColumnExpr { - keyRowI := e.rowChunks.GetRow(minRowPtr) - keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) - flag = e.lessRow(keyRowI, keyRowJ) - } else { - keyRowI := e.keyChunks.GetRow(minRowPtr) - keyRowJ := e.keyChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) - flag = e.lessRow(keyRowI, keyRowJ) - } + keyRowI := e.rowChunks.GetRow(minRowPtr) + keyRowJ := e.rowChunks.GetRow((*e.workerRowPtrs[i])[e.workerRowIdx[i]]) + flag = e.lessRow(keyRowI, keyRowJ) if !flag { minRowPtr = (*e.workerRowPtrs[i])[e.workerRowIdx[i]] j = i @@ -260,20 +243,12 @@ func (e *MergeSortExec) initCompareFuncs() { } } -func (e *MergeSortExec) buildKeyColumns() (allColumnExpr bool) { +func (e *MergeSortExec) buildKeyColumns() { e.keyColumns = make([]int, 0, len(e.ByItems)) for _, by := range e.ByItems { - if col, ok := by.Expr.(*expression.Column); ok { - e.keyColumns = append(e.keyColumns, col.Index) - } else { - e.keyColumns = e.keyColumns[:0] - for i := range e.ByItems { - e.keyColumns = append(e.keyColumns, i) - } - return false - } + col := by.Expr.(*expression.Column) + e.keyColumns = append(e.keyColumns, col.Index) } - return true } func (e *MergeSortExec) buildKeyExprsAndTypes() { @@ -286,23 +261,6 @@ func (e *MergeSortExec) buildKeyExprsAndTypes() { } } -func (e *MergeSortExec) buildKeyChunks() error { - e.keyChunks = chunk.NewList(e.keyTypes, e.initCap, e.maxChunkSize) - e.keyChunks.GetMemTracker().SetLabel("keyChunks") - e.keyChunks.GetMemTracker().AttachTo(e.memTracker) - - for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ { - keyChk := chunk.NewChunkWithCapacity(e.keyTypes, e.rowChunks.GetChunk(chkIdx).NumRows()) - childIter := chunk.NewIterator4Chunk(e.rowChunks.GetChunk(chkIdx)) - err := expression.VectorizedExecute(e.ctx, e.keyExprs, childIter, keyChk) - if err != nil { - return errors.Trace(err) - } - e.keyChunks.Add(keyChk) - } - return nil -} - func (e *MergeSortExec) lessRow(rowI, rowJ chunk.Row) bool { for i, colIdx := range e.keyColumns { cmpFunc := e.keyCmpFuncs[i] @@ -326,9 +284,3 @@ func (sw *sortWorker) keyColumnsLess(i, j int) bool { return sw.lessRow(rowI, rowJ) } -// keyChunksLess is the less function for key chunk. -func (sw *sortWorker) keyChunksLess(i, j int) bool { - keyRowI := sw.keyChunks.GetRow(sw.rowPtrs[i]) - keyRowJ := sw.keyChunks.GetRow(sw.rowPtrs[j]) - return sw.lessRow(keyRowI, keyRowJ) -} From 2a1b2bc09b66f11fbd0a92178d47205dd042f683 Mon Sep 17 00:00:00 2001 From: yu34po Date: Thu, 28 Feb 2019 13:35:36 +0800 Subject: [PATCH 21/21] delete key column --- executor/merge_sort.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/executor/merge_sort.go b/executor/merge_sort.go index b5a53af79068a..79332a89a4863 100644 --- a/executor/merge_sort.go +++ b/executor/merge_sort.go @@ -43,8 +43,6 @@ type MergeSortExec struct { keyColumns []int // keyCmpFuncs is used to compare each ByItem. keyCmpFuncs []chunk.CompareFunc - // keyChunks is used to store ByItems values when not all ByItems are column. - keyChunks *chunk.List // rowChunks is the chunks to store row values. rowChunks *chunk.List // rowPointer store the chunk index and row index for each row. @@ -283,4 +281,3 @@ func (sw *sortWorker) keyColumnsLess(i, j int) bool { rowJ := sw.rowChunks.GetRow(sw.rowPtrs[j]) return sw.lessRow(rowI, rowJ) } -