Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed Jun 5, 2023
1 parent 732b483 commit b53821d
Showing 1 changed file with 52 additions and 11 deletions.
63 changes: 52 additions & 11 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ type indexMergeTableTask struct {
// parTblIdx are only used in indexMergeProcessWorker.fetchLoopIntersection.
parTblIdx int

// partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit.
// partialPlanID are only used for indexMergeProcessWorker.fetchLoopUnionWithOrderBy.
partialPlanID int
}

Expand Down Expand Up @@ -295,9 +295,12 @@ func (e *IndexMergeReaderExecutor) startIndexMergeProcessWorker(ctx context.Cont
util.WithRecovery(
func() {
if e.isIntersection {
if e.pushedLimit != nil || e.keepOrder {
panic("Not support intersection with pushedLimit or keepOrder = true")
}
idxMergeProcessWorker.fetchLoopIntersection(ctx, fetch, workCh, e.resultCh, e.finished)
} else if e.pushedLimit != nil && len(e.byItems) != 0 {
idxMergeProcessWorker.fetchLoopUnionWithOrderByAndPushedLimit(ctx, fetch, workCh, e.resultCh, e.finished)
} else if len(e.byItems) != 0 {
idxMergeProcessWorker.fetchLoopUnionWithOrderBy(ctx, fetch, workCh, e.resultCh, e.finished)
} else {
idxMergeProcessWorker.fetchLoopUnion(ctx, fetch, workCh, e.resultCh, e.finished)
}
Expand Down Expand Up @@ -896,7 +899,9 @@ type rowIdx struct {
}

type handleHeap struct {
// requiredCnt == 0 means need all handles
requiredCnt uint64
tracker *memory.Tracker
taskMap map[int][]*indexMergeTableTask

idx []rowIdx
Expand Down Expand Up @@ -933,23 +938,34 @@ func (h handleHeap) Swap(i, j int) {
func (h *handleHeap) Push(x interface{}) {
idx := x.(rowIdx)
h.idx = append(h.idx, idx)
if h.tracker != nil {
h.tracker.Consume(int64(unsafe.Sizeof(h.idx)))
}
}

func (h *handleHeap) Pop() interface{} {
idxRet := h.idx[len(h.idx)-1]
h.idx = h.idx[:len(h.idx)-1]
if h.tracker != nil {
h.tracker.Consume(-int64(unsafe.Sizeof(h.idx)))
}
return idxRet
}

func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask) *handleHeap {
func (w *indexMergeProcessWorker) NewHandleHeap(taskMap map[int][]*indexMergeTableTask, memTracker *memory.Tracker) *handleHeap {
compareFuncs := make([]chunk.CompareFunc, 0, len(w.indexMerge.byItems))
for _, item := range w.indexMerge.byItems {
keyType := item.Expr.GetType()
compareFuncs = append(compareFuncs, chunk.GetCompareFunc(keyType))
}
requiredCnt := w.indexMerge.pushedLimit.Count + w.indexMerge.pushedLimit.Offset

requiredCnt := uint64(0)
if w.indexMerge.pushedLimit != nil {
requiredCnt = mathutil.Max(requiredCnt, w.indexMerge.pushedLimit.Count+w.indexMerge.pushedLimit.Offset)
}
return &handleHeap{
requiredCnt: requiredCnt,
tracker: memTracker,
taskMap: taskMap,
idx: make([]rowIdx, 0, requiredCnt),
compareFunc: compareFuncs,
Expand All @@ -976,7 +992,7 @@ func (w *indexMergeProcessWorker) pruneTableWorkerTaskIdxRows(task *indexMergeTa
}
}

func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderBy(ctx context.Context, fetchCh <-chan *indexMergeTableTask,
workCh chan<- *indexMergeTableTask, resultCh chan<- *indexMergeTableTask, finished <-chan struct{}) {
memTracker := memory.NewTracker(w.indexMerge.id, -1)
memTracker.AttachTo(w.indexMerge.memTracker)
Expand All @@ -993,8 +1009,7 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co
distinctHandles := kv.NewHandleMap()
taskMap := make(map[int][]*indexMergeTableTask)
uselessMap := make(map[int]struct{})
taskHeap := w.NewHandleHeap(taskMap)
memTracker.Consume(int64(taskHeap.requiredCnt) * int64(unsafe.Sizeof(rowIdx{0, 0, 0})))
taskHeap := w.NewHandleHeap(taskMap, memTracker)

for task := range fetchCh {
select {
Expand All @@ -1010,15 +1025,15 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co
continue
}
if _, ok := taskMap[task.partialPlanID]; !ok {
taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0)
taskMap[task.partialPlanID] = make([]*indexMergeTableTask, 0, 1)
}
w.pruneTableWorkerTaskIdxRows(task)
taskMap[task.partialPlanID] = append(taskMap[task.partialPlanID], task)
for i, h := range task.handles {
if _, ok := distinctHandles.Get(h); !ok {
distinctHandles.Set(h, true)
heap.Push(taskHeap, rowIdx{task.partialPlanID, len(taskMap[task.partialPlanID]) - 1, i})
if taskHeap.Len() > int(taskHeap.requiredCnt) {
if int(taskHeap.requiredCnt) != 0 && taskHeap.Len() > int(taskHeap.requiredCnt) {
top := heap.Pop(taskHeap).(rowIdx)
if top.partialID == task.partialPlanID && top.taskID == len(taskMap[task.partialPlanID])-1 && top.rowID == i {
uselessMap[task.partialPlanID] = struct{}{}
Expand All @@ -1040,7 +1055,10 @@ func (w *indexMergeProcessWorker) fetchLoopUnionWithOrderByAndPushedLimit(ctx co
}
}

needCount := mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset))
needCount := taskHeap.Len()
if w.indexMerge.pushedLimit != nil {
needCount = mathutil.Max(0, taskHeap.Len()-int(w.indexMerge.pushedLimit.Offset))
}
if needCount == 0 {
return
}
Expand Down Expand Up @@ -1098,10 +1116,17 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
defer close(workCh)
failpoint.Inject("testIndexMergePanicProcessWorkerUnion", nil)

var pushedLimit *plannercore.PushedDownLimit
if w.indexMerge.pushedLimit != nil {
pushedLimit = w.indexMerge.pushedLimit.Clone()
}
distinctHandles := make(map[int64]*kv.HandleMap)
for {
var ok bool
var task *indexMergeTableTask
if pushedLimit != nil && pushedLimit.Count == 0 {
return
}
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -1148,6 +1173,22 @@ func (w *indexMergeProcessWorker) fetchLoopUnion(ctx context.Context, fetchCh <-
if len(fhs) == 0 {
continue
}
if pushedLimit != nil {
fhsLen := uint64(len(fhs))
if fhsLen > pushedLimit.Offset {
fhs = fhs[pushedLimit.Offset:]
pushedLimit.Offset = 0
} else {
pushedLimit.Offset -= fhsLen
continue
}

fhsLen = uint64(len(fhs))
if fhsLen > pushedLimit.Count {
fhs = fhs[:pushedLimit.Count]
}
pushedLimit.Count -= mathutil.Min(pushedLimit.Count, fhsLen)
}
task = &indexMergeTableTask{
lookupTableTask: lookupTableTask{
handles: fhs,
Expand Down

0 comments on commit b53821d

Please sign in to comment.