Skip to content

Commit

Permalink
executor: run idxlookup workers in a pool (pingcap#58033)
Browse files Browse the repository at this point in the history
  • Loading branch information
zyguan authored Dec 13, 2024
1 parent a3fa29e commit 2610521
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 76 deletions.
145 changes: 69 additions & 76 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"runtime/trace"
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -76,6 +77,7 @@ var LookupTableTaskChannelSize int32 = 50
// lookupTableTask is created from a partial result of an index request which
// contains the handles in those index keys.
type lookupTableTask struct {
id int
handles []kv.Handle
rowIdx []int // rowIdx represents the handle index for every row. Only used when keep order.
rows []chunk.Row
Expand Down Expand Up @@ -503,6 +505,8 @@ type IndexLookUpExecutor struct {

// cancelFunc is called when close the executor
cancelFunc context.CancelFunc
workerCtx context.Context
pool *workerPool

// If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan.
// Used by the temporary table, cached table.
Expand Down Expand Up @@ -635,15 +639,16 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {
}

func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error {
// indexWorker will write to workCh and tableWorker will read from workCh,
// indexWorker will submit lookup-table tasks (processed by tableWorker) to the pool,
// so fetching index and getting table data can run concurrently.
ctx, cancel := context.WithCancel(ctx)
e.cancelFunc = cancel
workCh := make(chan *lookupTableTask, 1)
if err := e.startIndexWorker(ctx, workCh, initBatchSize); err != nil {
e.workerCtx, e.cancelFunc = context.WithCancel(ctx)
e.pool = &workerPool{
TolerablePendingTasks: 1,
MaxWorkers: int32(max(1, e.indexLookupConcurrency)),
}
if err := e.startIndexWorker(ctx, initBatchSize); err != nil {
return err
}
e.startTableWorker(ctx, workCh)
e.workerStarted = true
return nil
}
Expand Down Expand Up @@ -703,8 +708,8 @@ func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType {
return tps
}

// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh.
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<- *lookupTableTask, initBatchSize int) error {
// startIndexWorker launch a background goroutine to fetch handles, submit lookup-table tasks to the pool.
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, initBatchSize int) error {
if e.RuntimeStats() != nil {
collExec := true
e.dagPB.CollectExecutionSummaries = &collExec
Expand All @@ -725,11 +730,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
tps := e.getRetTpsForIndexReader()
idxID := e.getIndexPlanRootID()
e.idxWorkerWg.Add(1)
go func() {
defer trace.StartRegion(ctx, "IndexLookUpIndexWorker").End()
e.pool.submit(func() {
defer trace.StartRegion(ctx, "IndexLookUpIndexTask").End()
worker := &indexWorker{
idxLookup: e,
workCh: workCh,
finished: e.finished,
resultCh: e.resultCh,
keepOrder: e.keepOrder,
Expand Down Expand Up @@ -805,10 +809,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err))
}
}
close(workCh)
close(e.resultCh)
e.idxWorkerWg.Done()
}()
})
return nil
}

Expand Down Expand Up @@ -841,32 +844,6 @@ func CalculateBatchSize(estRows, initBatchSize, maxBatchSize int) int {
return batchSize
}

// startTableWorker launches some background goroutines which pick tasks from workCh and execute the task.
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.indexLookupConcurrency
e.tblWorkerWg.Add(lookupConcurrencyLimit)
for i := 0; i < lookupConcurrencyLimit; i++ {
workerID := i
worker := &tableWorker{
idxLookup: e,
workCh: workCh,
finished: e.finished,
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
checkIndexValue: e.checkIndexValue,
memTracker: memory.NewTracker(workerID, -1),
}
worker.memTracker.AttachTo(e.memTracker)
ctx1, cancel := context.WithCancel(ctx)
go func() {
defer trace.StartRegion(ctx1, "IndexLookUpTableWorker").End()
worker.pickAndExecTask(ctx1)
cancel()
e.tblWorkerWg.Done()
}()
}
}

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (*TableReaderExecutor, error) {
table := e.table
if e.partitionTableMode && task.partitionTable != nil {
Expand Down Expand Up @@ -1029,7 +1006,6 @@ func (e *IndexLookUpExecutor) getTableRootPlanID() int {
// indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines.
type indexWorker struct {
idxLookup *IndexLookUpExecutor
workCh chan<- *lookupTableTask
finished <-chan struct{}
resultCh chan<- *lookupTableTask
keepOrder bool
Expand All @@ -1056,7 +1032,7 @@ func (w *indexWorker) syncErr(err error) {
}

// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh
// The tasks are submitted to the pool and processed by tableWorker, and sent to e.resultCh
// at the same time to keep data ordered.
func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.SelectResult) (err error) {
defer func() {
Expand All @@ -1076,6 +1052,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID, true)
}
}
taskID := 0
for i := 0; i < len(results); {
result := results[i]
if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset {
Expand All @@ -1093,6 +1070,8 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select
continue
}
task := w.buildTableTask(handles, retChunk)
task.id = taskID
taskID++
finishBuild := time.Now()
if w.idxLookup.partitionTableMode {
task.partitionTable = w.idxLookup.prunedPartitions[i]
Expand All @@ -1102,7 +1081,18 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select
return nil
case <-w.finished:
return nil
case w.workCh <- task:
default:
e := w.idxLookup
e.tblWorkerWg.Add(1)
e.pool.submit(func() {
defer e.tblWorkerWg.Done()
select {
case <-e.finished:
return
default:
execTableTask(e, task)
}
})
w.resultCh <- task
}
if w.idxLookup.stats != nil {
Expand Down Expand Up @@ -1225,10 +1215,46 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) *
return task
}

func execTableTask(e *IndexLookUpExecutor, task *lookupTableTask) {
var (
ctx = e.workerCtx
region *trace.Region
)
if trace.IsEnabled() {
region = trace.StartRegion(ctx, "IndexLookUpTableTask"+strconv.Itoa(task.id))
}
defer func() {
if r := recover(); r != nil {
logutil.Logger(ctx).Error("TableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack"))
err := util.GetRecoverError(r)
task.doneCh <- err
}
if region != nil {
region.End()
}
}()
tracker := memory.NewTracker(task.id, -1)
tracker.AttachTo(e.memTracker)
w := &tableWorker{
idxLookup: e,
finished: e.finished,
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
checkIndexValue: e.checkIndexValue,
memTracker: tracker,
}
startTime := time.Now()
err := w.executeTask(ctx, task)
if e.stats != nil {
atomic.AddInt64(&e.stats.TableRowScan, int64(time.Since(startTime)))
atomic.AddInt64(&e.stats.TableTaskNum, 1)
}
task.doneCh <- err
}

// tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines.
type tableWorker struct {
idxLookup *IndexLookUpExecutor
workCh <-chan *lookupTableTask
finished <-chan struct{}
keepOrder bool
handleIdx []int
Expand All @@ -1240,39 +1266,6 @@ type tableWorker struct {
*checkIndexValue
}

// pickAndExecTask picks tasks from workCh, and execute them.
func (w *tableWorker) pickAndExecTask(ctx context.Context) {
var task *lookupTableTask
var ok bool
defer func() {
if r := recover(); r != nil {
logutil.Logger(ctx).Error("tableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack"))
err := util.GetRecoverError(r)
task.doneCh <- err
}
}()
for {
// Don't check ctx.Done() on purpose. If background worker get the signal and all
// exit immediately, session's goroutine doesn't know this and still calling Next(),
// it may block reading task.doneCh forever.
select {
case task, ok = <-w.workCh:
if !ok {
return
}
case <-w.finished:
return
}
startTime := time.Now()
err := w.executeTask(ctx, task)
if w.idxLookup.stats != nil {
atomic.AddInt64(&w.idxLookup.stats.TableRowScan, int64(time.Since(startTime)))
atomic.AddInt64(&w.idxLookup.stats.TableTaskNum, 1)
}
task.doneCh <- err
}
}

func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int,
isCommonHandle bool, tp getHandleType) (handle kv.Handle, err error) {
if isCommonHandle {
Expand Down
69 changes: 69 additions & 0 deletions pkg/executor/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package executor

import (
"strings"
"sync"
"sync/atomic"

"github.com/pingcap/tidb/pkg/extension"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -119,3 +121,70 @@ func encodePassword(u *ast.UserSpec, authPlugin *extension.AuthPlugin) (string,
}
return u.EncodedPassword()
}

var taskPool = sync.Pool{
New: func() any { return &workerTask{} },
}

type workerTask struct {
f func()
next *workerTask
}

type workerPool struct {
lock sync.Mutex
head *workerTask
tail *workerTask

tasks atomic.Int32
workers atomic.Int32

// TolerablePendingTasks is the number of tasks that can be tolerated in the queue, that is, the pool won't spawn a
// new goroutine if the number of tasks is less than this number.
TolerablePendingTasks int32
// MaxWorkers is the maximum number of workers that the pool can spawn.
MaxWorkers int32
}

func (p *workerPool) submit(f func()) {
task := taskPool.Get().(*workerTask)
task.f, task.next = f, nil
p.lock.Lock()
if p.head == nil {
p.head = task
} else {
p.tail.next = task
}
p.tail = task
p.lock.Unlock()
tasks := p.tasks.Add(1)

if workers := p.workers.Load(); workers == 0 || (workers < p.MaxWorkers && tasks > p.TolerablePendingTasks) {
p.workers.Add(1)
go p.run()
}
}

func (p *workerPool) run() {
for {
var task *workerTask

p.lock.Lock()
if p.head != nil {
task, p.head = p.head, p.head.next
if p.head == nil {
p.tail = nil
}
}
p.lock.Unlock()

if task == nil {
p.workers.Add(-1)
return
}
p.tasks.Add(-1)

task.f()
taskPool.Put(task)
}
}
Loading

0 comments on commit 2610521

Please sign in to comment.