Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: run idxlookup workers in a pool #58033

Merged
merged 6 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 69 additions & 76 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"runtime/trace"
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -76,6 +77,7 @@ var LookupTableTaskChannelSize int32 = 50
// lookupTableTask is created from a partial result of an index request which
// contains the handles in those index keys.
type lookupTableTask struct {
id int
handles []kv.Handle
rowIdx []int // rowIdx represents the handle index for every row. Only used when keep order.
rows []chunk.Row
Expand Down Expand Up @@ -503,6 +505,8 @@ type IndexLookUpExecutor struct {

// cancelFunc is called when close the executor
cancelFunc context.CancelFunc
workerCtx context.Context
pool *workerPool

// If dummy flag is set, this is not a real IndexLookUpReader, it just provides the KV ranges for UnionScan.
// Used by the temporary table, cached table.
Expand Down Expand Up @@ -635,15 +639,16 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {
}

func (e *IndexLookUpExecutor) startWorkers(ctx context.Context, initBatchSize int) error {
// indexWorker will write to workCh and tableWorker will read from workCh,
// indexWorker will submit lookup-table tasks (processed by tableWorker) to the pool,
// so fetching index and getting table data can run concurrently.
ctx, cancel := context.WithCancel(ctx)
e.cancelFunc = cancel
workCh := make(chan *lookupTableTask, 1)
if err := e.startIndexWorker(ctx, workCh, initBatchSize); err != nil {
e.workerCtx, e.cancelFunc = context.WithCancel(ctx)
e.pool = &workerPool{
TolerablePendingTasks: 1,
MaxWorkers: int32(max(1, e.indexLookupConcurrency)),
}
if err := e.startIndexWorker(ctx, initBatchSize); err != nil {
return err
}
e.startTableWorker(ctx, workCh)
e.workerStarted = true
return nil
}
Expand Down Expand Up @@ -703,8 +708,8 @@ func (e *IndexLookUpExecutor) getRetTpsForIndexReader() []*types.FieldType {
return tps
}

// startIndexWorker launch a background goroutine to fetch handles, send the results to workCh.
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<- *lookupTableTask, initBatchSize int) error {
// startIndexWorker launch a background goroutine to fetch handles, submit lookup-table tasks to the pool.
func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, initBatchSize int) error {
if e.RuntimeStats() != nil {
collExec := true
e.dagPB.CollectExecutionSummaries = &collExec
Expand All @@ -725,11 +730,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
tps := e.getRetTpsForIndexReader()
idxID := e.getIndexPlanRootID()
e.idxWorkerWg.Add(1)
go func() {
defer trace.StartRegion(ctx, "IndexLookUpIndexWorker").End()
e.pool.submit(func() {
defer trace.StartRegion(ctx, "IndexLookUpIndexTask").End()
worker := &indexWorker{
idxLookup: e,
workCh: workCh,
finished: e.finished,
resultCh: e.resultCh,
keepOrder: e.keepOrder,
Expand Down Expand Up @@ -805,10 +809,9 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err))
}
}
close(workCh)
close(e.resultCh)
e.idxWorkerWg.Done()
}()
})
return nil
}

Expand Down Expand Up @@ -841,32 +844,6 @@ func CalculateBatchSize(estRows, initBatchSize, maxBatchSize int) int {
return batchSize
}

// startTableWorker launches some background goroutines which pick tasks from workCh and execute the task.
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.indexLookupConcurrency
e.tblWorkerWg.Add(lookupConcurrencyLimit)
for i := 0; i < lookupConcurrencyLimit; i++ {
workerID := i
worker := &tableWorker{
idxLookup: e,
workCh: workCh,
finished: e.finished,
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
checkIndexValue: e.checkIndexValue,
memTracker: memory.NewTracker(workerID, -1),
}
worker.memTracker.AttachTo(e.memTracker)
ctx1, cancel := context.WithCancel(ctx)
go func() {
defer trace.StartRegion(ctx1, "IndexLookUpTableWorker").End()
worker.pickAndExecTask(ctx1)
cancel()
e.tblWorkerWg.Done()
}()
}
}

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (*TableReaderExecutor, error) {
table := e.table
if e.partitionTableMode && task.partitionTable != nil {
Expand Down Expand Up @@ -1029,7 +1006,6 @@ func (e *IndexLookUpExecutor) getTableRootPlanID() int {
// indexWorker is used by IndexLookUpExecutor to maintain index lookup background goroutines.
type indexWorker struct {
idxLookup *IndexLookUpExecutor
workCh chan<- *lookupTableTask
finished <-chan struct{}
resultCh chan<- *lookupTableTask
keepOrder bool
Expand All @@ -1056,7 +1032,7 @@ func (w *indexWorker) syncErr(err error) {
}

// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh
// The tasks are submitted to the pool and processed by tableWorker, and sent to e.resultCh
// at the same time to keep data ordered.
func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.SelectResult) (err error) {
defer func() {
Expand All @@ -1076,6 +1052,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.stmtRuntimeStatsColl.GetBasicRuntimeStats(idxID, true)
}
}
taskID := 0
for i := 0; i < len(results); {
result := results[i]
if w.PushedLimit != nil && w.scannedKeys >= w.PushedLimit.Count+w.PushedLimit.Offset {
Expand All @@ -1093,6 +1070,8 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select
continue
}
task := w.buildTableTask(handles, retChunk)
task.id = taskID
taskID++
finishBuild := time.Now()
if w.idxLookup.partitionTableMode {
task.partitionTable = w.idxLookup.prunedPartitions[i]
Expand All @@ -1102,7 +1081,18 @@ func (w *indexWorker) fetchHandles(ctx context.Context, results []distsql.Select
return nil
case <-w.finished:
return nil
case w.workCh <- task:
default:
e := w.idxLookup
e.tblWorkerWg.Add(1)
e.pool.submit(func() {
defer e.tblWorkerWg.Done()
select {
case <-e.finished:
return
default:
execTableTask(e, task)
}
})
w.resultCh <- task
}
if w.idxLookup.stats != nil {
Expand Down Expand Up @@ -1223,10 +1213,46 @@ func (w *indexWorker) buildTableTask(handles []kv.Handle, retChk *chunk.Chunk) *
return task
}

func execTableTask(e *IndexLookUpExecutor, task *lookupTableTask) {
var (
ctx = e.workerCtx
region *trace.Region
)
if trace.IsEnabled() {
region = trace.StartRegion(ctx, "IndexLookUpTableTask"+strconv.Itoa(task.id))
}
defer func() {
if r := recover(); r != nil {
logutil.Logger(ctx).Error("TableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack"))
err := util.GetRecoverError(r)
task.doneCh <- err
}
if region != nil {
region.End()
}
}()
tracker := memory.NewTracker(task.id, -1)
tracker.AttachTo(e.memTracker)
w := &tableWorker{
idxLookup: e,
finished: e.finished,
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
checkIndexValue: e.checkIndexValue,
memTracker: tracker,
}
startTime := time.Now()
err := w.executeTask(ctx, task)
if e.stats != nil {
atomic.AddInt64(&e.stats.TableRowScan, int64(time.Since(startTime)))
atomic.AddInt64(&e.stats.TableTaskNum, 1)
}
task.doneCh <- err
}

// tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines.
type tableWorker struct {
idxLookup *IndexLookUpExecutor
workCh <-chan *lookupTableTask
finished <-chan struct{}
keepOrder bool
handleIdx []int
Expand All @@ -1238,39 +1264,6 @@ type tableWorker struct {
*checkIndexValue
}

// pickAndExecTask picks tasks from workCh, and execute them.
func (w *tableWorker) pickAndExecTask(ctx context.Context) {
var task *lookupTableTask
var ok bool
defer func() {
if r := recover(); r != nil {
logutil.Logger(ctx).Error("tableWorker in IndexLookUpExecutor panicked", zap.Any("recover", r), zap.Stack("stack"))
err := util.GetRecoverError(r)
task.doneCh <- err
}
}()
for {
// Don't check ctx.Done() on purpose. If background worker get the signal and all
// exit immediately, session's goroutine doesn't know this and still calling Next(),
// it may block reading task.doneCh forever.
select {
case task, ok = <-w.workCh:
if !ok {
return
}
case <-w.finished:
return
}
startTime := time.Now()
err := w.executeTask(ctx, task)
if w.idxLookup.stats != nil {
atomic.AddInt64(&w.idxLookup.stats.TableRowScan, int64(time.Since(startTime)))
atomic.AddInt64(&w.idxLookup.stats.TableTaskNum, 1)
}
task.doneCh <- err
}
}

func (e *IndexLookUpExecutor) getHandle(row chunk.Row, handleIdx []int,
isCommonHandle bool, tp getHandleType) (handle kv.Handle, err error) {
if isCommonHandle {
Expand Down
69 changes: 69 additions & 0 deletions pkg/executor/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package executor

import (
"strings"
"sync"
"sync/atomic"

"github.com/pingcap/tidb/pkg/extension"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand Down Expand Up @@ -119,3 +121,70 @@ func encodePassword(u *ast.UserSpec, authPlugin *extension.AuthPlugin) (string,
}
return u.EncodedPassword()
}

var taskPool = sync.Pool{
New: func() any { return &workerTask{} },
}

type workerTask struct {
f func()
next *workerTask
}

type workerPool struct {
lock sync.Mutex
head *workerTask
tail *workerTask

tasks atomic.Int32
workers atomic.Int32

// TolerablePendingTasks is the number of tasks that can be tolerated in the queue, that is, the pool won't spawn a
// new goroutine if the number of tasks is less than this number.
TolerablePendingTasks int32
// MaxWorkers is the maximum number of workers that the pool can spawn.
MaxWorkers int32
}

func (p *workerPool) submit(f func()) {
task := taskPool.Get().(*workerTask)
task.f, task.next = f, nil
p.lock.Lock()
if p.head == nil {
p.head = task
} else {
p.tail.next = task
}
p.tail = task
p.lock.Unlock()
tasks := p.tasks.Add(1)

if workers := p.workers.Load(); workers == 0 || (workers < p.MaxWorkers && tasks > p.TolerablePendingTasks) {
p.workers.Add(1)
go p.run()
}
}

func (p *workerPool) run() {
for {
var task *workerTask

p.lock.Lock()
if p.head != nil {
task, p.head = p.head, p.head.next
if p.head == nil {
p.tail = nil
}
}
p.lock.Unlock()

if task == nil {
p.workers.Add(-1)
return
}
p.tasks.Add(-1)

task.f()
taskPool.Put(task)
}
}
Loading