diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 6e691a227ebb8..e3b6f302bab74 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -149,7 +149,7 @@ type indexMergeTableTask struct { // parTblIdx are only used in indexMergeProcessWorker.fetchLoopIntersection. parTblIdx int - // partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit. + // partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderBy. partialPlanID int } @@ -295,9 +295,12 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont util.WithRecovery( func() { if e.isIntersection { + if e.pushedLimit != nil || e.keepOrder { + panic("Not support intersection with pushedLimit or keepOrder = true") + } idxMergeProcessWorker.fetchLoopIntersection(ctx, fetch, workCh, e.resultCh, e.finished) - } else if e.pushedLimit != nil && len(e.byItems) != 0 { - idxMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit(ctx, fetch, workCh, e.resultCh, e.finished) + } else if len(e.byItems) != 0 { + idxMergeProcessWorker.fetchLoopUnionWithOrderBy(ctx, fetch, workCh, e.resultCh, e.finished) } else { idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished) } @@ -896,7 +899,9 @@ type rowIdx struct { } type handleHeap struct { + // requiredCnt == 0 means need all handles requiredCnt uint64 + tracker *memory.Tracker taskMap map[int][]*indexMergeTableTask idx []rowIdx @@ -933,23 +938,34 @@ func (h handleHeap) Swap(i, j int) { func (h *handleHeap) Push(x interface{}) { idx := x.(rowIdx) h.idx = append(h.idx, idx) + if h.tracker != nil { + h.tracker.Consume(int64(unsafe.Sizeof(h.idx))) + } } func (h *handleHeap) Pop() interface{} { idxRet := h.idx[len(h.idx)-1] h.idx = h.idx[:len(h.idx)-1] + if h.tracker != nil { + h.tracker.Consume(-int64(unsafe.Sizeof(h.idx))) + } return idxRet } -func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask) *handleHeap { +func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask, memTracker *memory.Tracker) *handleHeap { compareFuncs := make([]chunk.CompareFunc, 0, len(w.indexMerge.byItems)) for _, item := range w.indexMerge.byItems { keyType := item.Expr.GetType() compareFuncs = append(compareFuncs, chunk.GetCompareFunc(keyType)) } - requiredCnt := w.indexMerge.pushedLimit.Count + w.indexMerge.pushedLimit.Offset + + requiredCnt := uint64(0) + if w.indexMerge.pushedLimit != nil { + requiredCnt = mathutil.Max(requiredCnt, w.indexMerge.pushedLimit.Count+w.indexMerge.pushedLimit.Offset) + } return &handleHeap{ requiredCnt: requiredCnt, + tracker: memTracker, taskMap: taskMap, idx: make([]rowIdx, 0, requiredCnt), compareFunc: compareFuncs, @@ -976,7 +992,7 @@ func (w *indexMergeProcessWorker) pruneTableWorkerTaskIdxRows(task *indexMergeTa } } -func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx context.Context, fetchCh <-chan *indexMergeTableTask, +func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderBy(ctx context.Context, fetchCh <-chan *indexMergeTableTask, workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) { memTracker := memory.NewTracker(w.indexMerge.id, -1) memTracker.AttachTo(w.indexMerge.memTracker) @@ -993,8 +1009,7 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co distinctHandles := kv.NewHandleMap() taskMap := make(map[int][]*indexMergeTableTask) uselessMap := make(map[int]struct{}) - taskHeap := w.NewHandleHeap(taskMap) - memTracker.Consume(int64(taskHeap.requiredCnt) * int64(unsafe.Sizeof(rowIdx{0, 0, 0}))) + taskHeap := w.NewHandleHeap(taskMap, memTracker) for task := range fetchCh { select { @@ -1010,7 +1025,7 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co continue } if _, ok := taskMap[task.partialPlanID]; !ok { - taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0) + taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0, 1) } w.pruneTableWorkerTaskIdxRows(task) taskMap[task.partialPlanID] = append(taskMap[task.partialPlanID], task) @@ -1018,7 +1033,7 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co if _, ok := distinctHandles.Get(h); !ok { distinctHandles.Set(h, true) heap.Push(taskHeap, rowIdx{task.partialPlanID, len(taskMap[task.partialPlanID]) - 1, i}) - if taskHeap.Len() > int(taskHeap.requiredCnt) { + if int(taskHeap.requiredCnt) != 0 && taskHeap.Len() > int(taskHeap.requiredCnt) { top := heap.Pop(taskHeap).(rowIdx) if top.partialID == task.partialPlanID && top.taskID == len(taskMap[task.partialPlanID])-1 && top.rowID == i { uselessMap[task.partialPlanID] = struct{}{} @@ -1040,7 +1055,10 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co } } - needCount := mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset)) + needCount := taskHeap.Len() + if w.indexMerge.pushedLimit != nil { + needCount = mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset)) + } if needCount == 0 { return } @@ -1098,10 +1116,17 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <- defer close(workCh) failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil) + var pushedLimit *plannercore.PushedDownLimit + if w.indexMerge.pushedLimit != nil { + pushedLimit = w.indexMerge.pushedLimit.Clone() + } distinctHandles := make(map[int64]*kv.HandleMap) for { var ok bool var task *indexMergeTableTask + if pushedLimit != nil && pushedLimit.Count == 0 { + return + } select { case <-ctx.Done(): return @@ -1148,6 +1173,22 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <- if len(fhs) == 0 { continue } + if pushedLimit != nil { + fhsLen := uint64(len(fhs)) + if fhsLen > pushedLimit.Offset { + fhs = fhs[pushedLimit.Offset:] + pushedLimit.Offset = 0 + } else { + pushedLimit.Offset -= fhsLen + continue + } + + fhsLen = uint64(len(fhs)) + if fhsLen > pushedLimit.Count { + fhs = fhs[:pushedLimit.Count] + } + pushedLimit.Count -= mathutil.Min(pushedLimit.Count, fhsLen) + } task = &indexMergeTableTask{ lookupTableTask: lookupTableTask{ handles: fhs,