Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamically adjusting the concurrency and batch size of reorganization job #57468

Merged
merged 24 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 87 additions & 14 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func newBackfillCtx(id int, rInfo *reorgInfo,
id = int(backfillContextID.Add(1))
}

batchCnt := rInfo.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
batchCnt := rInfo.ReorgMeta.GetBatchSize()
return &backfillCtx{
id: id,
ddlCtx: rInfo.jobCtx.oldDDLCtx,
Expand Down Expand Up @@ -431,8 +431,14 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
})

// Change the batch size dynamically.
newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
w.GetCtx().batchCnt = newBatchCnt
currentBatchCnt := w.GetCtx().batchCnt
targetBatchSize := job.ReorgMeta.GetBatchSize()
if targetBatchSize != currentBatchCnt {
w.GetCtx().batchCnt = targetBatchSize
logger.Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int("current batch size", w.GetCtx().batchCnt))
}
result := w.handleBackfillTask(d, task, bf)
w.sendResult(result)

Expand Down Expand Up @@ -701,7 +707,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(

//nolint: forcetypeassert
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
importConc := job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
importConc := job.ReorgMeta.GetConcurrency()
bcCtx, err := ingest.LitBackCtxMgr.Register(
ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName, importConc, job.RealStartTS)
if err != nil {
Expand Down Expand Up @@ -770,7 +776,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe)
err = executeAndClosePipeline(opCtx, pipe, reorgInfo.Job, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
Expand All @@ -787,11 +793,58 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline) error {
func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) {
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
if opR == nil || opW == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
return
}
reader, readerOk := opR.(*TableScanOperator)
writer, writerOk := opW.(*IndexIngestOperator)
if !readerOk || !writerOk {
logutil.DDLIngestLogger().Error(
"unexpected operator types, config can't be adjusted",
zap.Int64("jobID", job.ID),
zap.Bool("isReaderValid", readerOk),
zap.Bool("isWriterValid", writerOk),
)
return
}
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(job.ReorgMeta.GetConcurrency(), avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
}
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
}
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error {
err := pipe.Execute()
if err != nil {
return err
}

// Adjust worker pool size dynamically.
if job != nil {
go func() {
adjustWorkerPoolSize(ctx, pipe, job, avgRowSize)
}()
}

err = pipe.Close()
if opErr := ctx.OperatorErr(); opErr != nil {
return opErr
Expand Down Expand Up @@ -825,6 +878,9 @@ func (s *localRowCntListener) SetTotal(total int) {
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + int64(total))
}

// UpdateDDLJobReorgCfgInterval is the interval to check and update reorg configuration.
const UpdateDDLJobReorgCfgInterval = 2 * time.Second

// writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition.
// For a partitioned table, it should be handled partition by partition.
//
Expand Down Expand Up @@ -929,14 +985,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
// We try to adjust the worker size regularly to reduce
// the overhead of loading the DDL related global variables.
err2 = scheduler.adjustWorkerSize()
if err2 != nil {
logutil.DDLLogger().Warn("cannot adjust backfill worker size",
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
failpoint.InjectCall("afterUpdateReorgMeta")
}
}
Expand Down Expand Up @@ -978,6 +1026,31 @@ func (dc *ddlCtx) writePhysicalTableRecord(
return nil
})

// update the worker cnt goroutine
go func() {
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
currentWorkerCnt := scheduler.currentWorkerSize()
targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrency()
if currentWorkerCnt != targetWorkerCnt {
err := scheduler.adjustWorkerSize()
if err != nil {
logutil.DDLLogger().Error("adjust ddl job config failed",
zap.Error(err))
} else {
logutil.DDLLogger().Info("adjust ddl job config success",
zap.Int("current worker count", scheduler.currentWorkerSize()))
}
}
}
}
}()

return eg.Wait()
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/pkg/lightning/backend/external"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
Expand Down Expand Up @@ -153,7 +152,7 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
ddlObj.etcdCli,
discovery,
job.ReorgMeta.ResourceGroupName,
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())),
job.ReorgMeta.GetConcurrency(),
job.RealStartTS,
)
}
Expand Down
58 changes: 35 additions & 23 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"path"
"strconv"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -170,11 +171,16 @@ func NewAddIndexIngestPipeline(
if err != nil {
return nil, err
}
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
srcChkPool := createChunkPool(copCtx, reorgMeta)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
rm := reorgMeta
if rm.IsDistReorg {
// Currently, only the batch size of local ingest mode can be adjusted
rm = nil
}

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.GetBatchSize(), rm)
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool,
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener)
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener)
Expand Down Expand Up @@ -221,7 +227,7 @@ func NewWriteIndexToExternalStoragePipeline(
if err != nil {
return nil, err
}
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
srcChkPool := createChunkPool(copCtx, reorgMeta)
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)

backend, err := storage.ParseBackend(extStoreURI, nil)
Expand All @@ -239,7 +245,7 @@ func NewWriteIndexToExternalStoragePipeline(
})

srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.GetBatchSize(), nil)
writeOp := NewWriteExternalStoreOperator(
ctx, copCtx, sessPool, jobID, subtaskID,
tbl, indexes, extStore, srcChkPool, writerCnt,
Expand All @@ -264,14 +270,13 @@ func NewWriteIndexToExternalStoragePipeline(
), nil
}

func createChunkPool(copCtx copr.CopContext, hintConc, hintBatchSize int) chan *chunk.Chunk {
poolSize := ingest.CopReadChunkPoolSize(hintConc)
batchSize := ingest.CopReadBatchSize(hintBatchSize)
srcChkPool := make(chan *chunk.Chunk, poolSize)
for i := 0; i < poolSize; i++ {
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize)
func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *sync.Pool {
return &sync.Pool{
New: func() any {
return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes,
reorgMeta.GetBatchSize())
},
}
return srcChkPool
}

// TableScanTask contains the start key and the end key of a region.
Expand Down Expand Up @@ -478,10 +483,11 @@ func NewTableScanOperator(
ctx *OperatorCtx,
sessPool opSessPool,
copCtx copr.CopContext,
srcChkPool chan *chunk.Chunk,
srcChkPool *sync.Pool,
concurrency int,
cpMgr *ingest.CheckpointManager,
hintBatchSize int,
reorgMeta *model.DDLReorgMeta,
) *TableScanOperator {
totalCount := new(atomic.Int64)
pool := workerpool.NewWorkerPool(
Expand All @@ -498,6 +504,7 @@ func NewTableScanOperator(
cpMgr: cpMgr,
hintBatchSize: hintBatchSize,
totalCount: totalCount,
reorgMeta: reorgMeta,
}
})
return &TableScanOperator{
Expand All @@ -518,9 +525,10 @@ type tableScanWorker struct {
copCtx copr.CopContext
sessPool opSessPool
se *session.Session
srcChkPool chan *chunk.Chunk
srcChkPool *sync.Pool

cpMgr *ingest.CheckpointManager
reorgMeta *model.DDLReorgMeta
hintBatchSize int
totalCount *atomic.Int64
}
Expand Down Expand Up @@ -588,17 +596,21 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
}

func (w *tableScanWorker) getChunk() *chunk.Chunk {
chk := <-w.srcChkPool
newCap := ingest.CopReadBatchSize(w.hintBatchSize)
if chk.Capacity() != newCap {
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
targetCap := ingest.CopReadBatchSize(w.hintBatchSize)
if w.reorgMeta != nil {
targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSize())
}
chk := w.srcChkPool.Get().(*chunk.Chunk)
if chk.Capacity() != targetCap {
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, targetCap)
logutil.Logger(w.ctx).Info("adjust ddl job config success", zap.Int("current batch size", chk.Capacity()))
}
chk.Reset()
return chk
}

func (w *tableScanWorker) recycleChunk(chk *chunk.Chunk) {
w.srcChkPool <- chk
w.srcChkPool.Put(chk)
}

// WriteExternalStoreOperator writes index records to external storage.
Expand All @@ -618,7 +630,7 @@ func NewWriteExternalStoreOperator(
tbl table.PhysicalTable,
indexes []table.Index,
store storage.ExternalStorage,
srcChunkPool chan *chunk.Chunk,
srcChunkPool *sync.Pool,
concurrency int,
onClose external.OnCloseFunc,
memoryQuota uint64,
Expand Down Expand Up @@ -704,7 +716,7 @@ func NewIndexIngestOperator(
tbl table.PhysicalTable,
indexes []table.Index,
engines []ingest.Engine,
srcChunkPool chan *chunk.Chunk,
srcChunkPool *sync.Pool,
concurrency int,
reorgMeta *model.DDLReorgMeta,
cpMgr *ingest.CheckpointManager,
Expand Down Expand Up @@ -765,7 +777,7 @@ type indexIngestExternalWorker struct {
func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool <- ck.Chunk
w.srcChunkPool.Put(ck.Chunk)
}
}()
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
Expand All @@ -787,7 +799,7 @@ type indexIngestLocalWorker struct {
func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
defer func() {
if ck.Chunk != nil {
w.srcChunkPool <- ck.Chunk
w.srcChunkPool.Put(ck.Chunk)
}
}()
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
Expand Down Expand Up @@ -827,7 +839,7 @@ type indexIngestBaseWorker struct {
restore func(sessionctx.Context)

writers []ingest.Writer
srcChunkPool chan *chunk.Chunk
srcChunkPool *sync.Pool
// only available in global sort
totalCount *atomic.Int64
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe)
return executeAndClosePipeline(opCtx, pipe, nil, 0)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe)
err = executeAndClosePipeline(opCtx, pipe, nil, 0)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func newTxnBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *ses
if err != nil {
return nil, err
}
workerCnt := info.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
workerCnt := info.ReorgMeta.GetConcurrency()
return &txnBackfillScheduler{
ctx: ctx,
reorgInfo: info,
Expand Down Expand Up @@ -247,7 +247,7 @@ func restoreSessCtx(sessCtx sessionctx.Context) func(sessCtx sessionctx.Context)
}

func (b *txnBackfillScheduler) expectedWorkerSize() (size int) {
workerCnt := b.reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
workerCnt := b.reorgInfo.ReorgMeta.GetConcurrency()
return min(workerCnt, maxBackfillWorkerSize)
}

Expand Down
Loading