Skip to content

Commit 51e8310

Browse files
authored
ddl: dynamically adjusting the concurrency and batch size of reorganization job (#57468) (#57607)
ref #57229
1 parent 99a2df5 commit 51e8310

14 files changed

+294
-74
lines changed

pkg/ddl/backfilling.go

+86-12
Original file line numberDiff line numberDiff line change
@@ -431,8 +431,14 @@ func (w *backfillWorker) run(d *ddlCtx, bf backfiller, job *model.Job) {
431431
})
432432

433433
// Change the batch size dynamically.
434-
newBatchCnt := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
435-
w.GetCtx().batchCnt = newBatchCnt
434+
currentBatchCnt := w.GetCtx().batchCnt
435+
targetBatchSize := job.ReorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize()))
436+
if targetBatchSize != currentBatchCnt {
437+
w.GetCtx().batchCnt = targetBatchSize
438+
logger.Info("adjust ddl job config success",
439+
zap.Int64("jobID", job.ID),
440+
zap.Int("current batch size", w.GetCtx().batchCnt))
441+
}
436442
result := w.handleBackfillTask(d, task, bf)
437443
w.sendResult(result)
438444

@@ -770,7 +776,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
770776
if err != nil {
771777
return err
772778
}
773-
err = executeAndClosePipeline(opCtx, pipe)
779+
err = executeAndClosePipeline(opCtx, pipe, job, avgRowSize)
774780
if err != nil {
775781
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
776782
if err1 != nil {
@@ -787,11 +793,59 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
787793
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
788794
}
789795

790-
func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline) error {
796+
func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) {
797+
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
798+
if opR == nil || opW == nil {
799+
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
800+
return
801+
}
802+
reader, readerOk := opR.(*TableScanOperator)
803+
writer, writerOk := opW.(*IndexIngestOperator)
804+
if !readerOk || !writerOk {
805+
logutil.DDLIngestLogger().Error(
806+
"unexpected operator types, config can't be adjusted",
807+
zap.Int64("jobID", job.ID),
808+
zap.Bool("isReaderValid", readerOk),
809+
zap.Bool("isWriterValid", writerOk),
810+
)
811+
return
812+
}
813+
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
814+
defer ticker.Stop()
815+
for {
816+
select {
817+
case <-ctx.Done():
818+
return
819+
case <-ticker.C:
820+
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(
821+
job.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter())), avgRowSize)
822+
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
823+
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
824+
continue
825+
}
826+
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
827+
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
828+
logutil.DDLIngestLogger().Info("adjust ddl job config success",
829+
zap.Int64("jobID", job.ID),
830+
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
831+
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
832+
}
833+
}
834+
}
835+
836+
func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error {
791837
err := pipe.Execute()
792838
if err != nil {
793839
return err
794840
}
841+
842+
// Adjust worker pool size dynamically.
843+
if job != nil {
844+
go func() {
845+
adjustWorkerPoolSize(ctx, pipe, job, avgRowSize)
846+
}()
847+
}
848+
795849
err = pipe.Close()
796850
if opErr := ctx.OperatorErr(); opErr != nil {
797851
return opErr
@@ -825,6 +879,9 @@ func (s *localRowCntListener) SetTotal(total int) {
825879
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + int64(total))
826880
}
827881

882+
// UpdateDDLJobReorgCfgInterval is the interval to check and update reorg configuration.
883+
const UpdateDDLJobReorgCfgInterval = 2 * time.Second
884+
828885
// writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition.
829886
// For a partitioned table, it should be handled partition by partition.
830887
//
@@ -929,14 +986,6 @@ func (dc *ddlCtx) writePhysicalTableRecord(
929986
zap.Int64("job ID", reorgInfo.ID),
930987
zap.Error(err2))
931988
}
932-
// We try to adjust the worker size regularly to reduce
933-
// the overhead of loading the DDL related global variables.
934-
err2 = scheduler.adjustWorkerSize()
935-
if err2 != nil {
936-
logutil.DDLLogger().Warn("cannot adjust backfill worker size",
937-
zap.Int64("job ID", reorgInfo.ID),
938-
zap.Error(err2))
939-
}
940989
failpoint.InjectCall("afterUpdateReorgMeta")
941990
}
942991
}
@@ -978,6 +1027,31 @@ func (dc *ddlCtx) writePhysicalTableRecord(
9781027
return nil
9791028
})
9801029

1030+
// update the worker cnt goroutine
1031+
go func() {
1032+
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
1033+
defer ticker.Stop()
1034+
for {
1035+
select {
1036+
case <-ctx.Done():
1037+
return
1038+
case <-ticker.C:
1039+
currentWorkerCnt := scheduler.currentWorkerSize()
1040+
targetWorkerCnt := reorgInfo.ReorgMeta.GetConcurrencyOrDefault(int(variable.GetDDLReorgWorkerCounter()))
1041+
if currentWorkerCnt != targetWorkerCnt {
1042+
err := scheduler.adjustWorkerSize()
1043+
if err != nil {
1044+
logutil.DDLLogger().Error("adjust ddl job config failed",
1045+
zap.Error(err))
1046+
} else {
1047+
logutil.DDLLogger().Info("adjust ddl job config success",
1048+
zap.Int("current worker count", scheduler.currentWorkerSize()))
1049+
}
1050+
}
1051+
}
1052+
}
1053+
}()
1054+
9811055
return eg.Wait()
9821056
}
9831057

pkg/ddl/backfilling_operators.go

+38-23
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"path"
2222
"strconv"
23+
"sync"
2324
"sync/atomic"
2425
"time"
2526

@@ -41,6 +42,7 @@ import (
4142
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
4243
"github.com/pingcap/tidb/pkg/resourcemanager/util"
4344
"github.com/pingcap/tidb/pkg/sessionctx"
45+
"github.com/pingcap/tidb/pkg/sessionctx/variable"
4446
"github.com/pingcap/tidb/pkg/table"
4547
"github.com/pingcap/tidb/pkg/table/tables"
4648
"github.com/pingcap/tidb/pkg/tablecodec"
@@ -170,11 +172,17 @@ func NewAddIndexIngestPipeline(
170172
if err != nil {
171173
return nil, err
172174
}
173-
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
175+
srcChkPool := createChunkPool(copCtx, reorgMeta)
174176
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
177+
rm := reorgMeta
178+
if rm.IsDistReorg {
179+
// Currently, only the batch size of local ingest mode can be adjusted
180+
rm = nil
181+
}
175182

176183
srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, cpMgr)
177-
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr, reorgMeta.BatchSize)
184+
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, cpMgr,
185+
reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), rm)
178186
ingestOp := NewIndexIngestOperator(ctx, copCtx, backendCtx, sessPool,
179187
tbl, indexes, engines, srcChkPool, writerCnt, reorgMeta, cpMgr, rowCntListener)
180188
sinkOp := newIndexWriteResultSink(ctx, backendCtx, tbl, indexes, cpMgr, rowCntListener)
@@ -221,7 +229,7 @@ func NewWriteIndexToExternalStoragePipeline(
221229
if err != nil {
222230
return nil, err
223231
}
224-
srcChkPool := createChunkPool(copCtx, concurrency, reorgMeta.BatchSize)
232+
srcChkPool := createChunkPool(copCtx, reorgMeta)
225233
readerCnt, writerCnt := expectedIngestWorkerCnt(concurrency, avgRowSize)
226234

227235
backend, err := storage.ParseBackend(extStoreURI, nil)
@@ -239,7 +247,8 @@ func NewWriteIndexToExternalStoragePipeline(
239247
})
240248

241249
srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey, nil)
242-
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil, reorgMeta.BatchSize)
250+
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt, nil,
251+
reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())), nil)
243252
writeOp := NewWriteExternalStoreOperator(
244253
ctx, copCtx, sessPool, jobID, subtaskID,
245254
tbl, indexes, extStore, srcChkPool, writerCnt,
@@ -264,14 +273,13 @@ func NewWriteIndexToExternalStoragePipeline(
264273
), nil
265274
}
266275

267-
func createChunkPool(copCtx copr.CopContext, hintConc, hintBatchSize int) chan *chunk.Chunk {
268-
poolSize := ingest.CopReadChunkPoolSize(hintConc)
269-
batchSize := ingest.CopReadBatchSize(hintBatchSize)
270-
srcChkPool := make(chan *chunk.Chunk, poolSize)
271-
for i := 0; i < poolSize; i++ {
272-
srcChkPool <- chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes, batchSize)
276+
func createChunkPool(copCtx copr.CopContext, reorgMeta *model.DDLReorgMeta) *sync.Pool {
277+
return &sync.Pool{
278+
New: func() any {
279+
return chunk.NewChunkWithCapacity(copCtx.GetBase().FieldTypes,
280+
reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())))
281+
},
273282
}
274-
return srcChkPool
275283
}
276284

277285
// TableScanTask contains the start key and the end key of a region.
@@ -478,10 +486,11 @@ func NewTableScanOperator(
478486
ctx *OperatorCtx,
479487
sessPool opSessPool,
480488
copCtx copr.CopContext,
481-
srcChkPool chan *chunk.Chunk,
489+
srcChkPool *sync.Pool,
482490
concurrency int,
483491
cpMgr *ingest.CheckpointManager,
484492
hintBatchSize int,
493+
reorgMeta *model.DDLReorgMeta,
485494
) *TableScanOperator {
486495
totalCount := new(atomic.Int64)
487496
pool := workerpool.NewWorkerPool(
@@ -498,6 +507,7 @@ func NewTableScanOperator(
498507
cpMgr: cpMgr,
499508
hintBatchSize: hintBatchSize,
500509
totalCount: totalCount,
510+
reorgMeta: reorgMeta,
501511
}
502512
})
503513
return &TableScanOperator{
@@ -518,9 +528,10 @@ type tableScanWorker struct {
518528
copCtx copr.CopContext
519529
sessPool opSessPool
520530
se *session.Session
521-
srcChkPool chan *chunk.Chunk
531+
srcChkPool *sync.Pool
522532

523533
cpMgr *ingest.CheckpointManager
534+
reorgMeta *model.DDLReorgMeta
524535
hintBatchSize int
525536
totalCount *atomic.Int64
526537
}
@@ -588,17 +599,21 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
588599
}
589600

590601
func (w *tableScanWorker) getChunk() *chunk.Chunk {
591-
chk := <-w.srcChkPool
592-
newCap := ingest.CopReadBatchSize(w.hintBatchSize)
593-
if chk.Capacity() != newCap {
594-
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, newCap)
602+
targetCap := ingest.CopReadBatchSize(w.hintBatchSize)
603+
if w.reorgMeta != nil {
604+
targetCap = ingest.CopReadBatchSize(w.reorgMeta.GetBatchSizeOrDefault(int(variable.GetDDLReorgBatchSize())))
605+
}
606+
chk := w.srcChkPool.Get().(*chunk.Chunk)
607+
if chk.Capacity() != targetCap {
608+
chk = chunk.NewChunkWithCapacity(w.copCtx.GetBase().FieldTypes, targetCap)
609+
logutil.Logger(w.ctx).Info("adjust ddl job config success", zap.Int("current batch size", chk.Capacity()))
595610
}
596611
chk.Reset()
597612
return chk
598613
}
599614

600615
func (w *tableScanWorker) recycleChunk(chk *chunk.Chunk) {
601-
w.srcChkPool <- chk
616+
w.srcChkPool.Put(chk)
602617
}
603618

604619
// WriteExternalStoreOperator writes index records to external storage.
@@ -618,7 +633,7 @@ func NewWriteExternalStoreOperator(
618633
tbl table.PhysicalTable,
619634
indexes []table.Index,
620635
store storage.ExternalStorage,
621-
srcChunkPool chan *chunk.Chunk,
636+
srcChunkPool *sync.Pool,
622637
concurrency int,
623638
onClose external.OnCloseFunc,
624639
memoryQuota uint64,
@@ -704,7 +719,7 @@ func NewIndexIngestOperator(
704719
tbl table.PhysicalTable,
705720
indexes []table.Index,
706721
engines []ingest.Engine,
707-
srcChunkPool chan *chunk.Chunk,
722+
srcChunkPool *sync.Pool,
708723
concurrency int,
709724
reorgMeta *model.DDLReorgMeta,
710725
cpMgr *ingest.CheckpointManager,
@@ -765,7 +780,7 @@ type indexIngestExternalWorker struct {
765780
func (w *indexIngestExternalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
766781
defer func() {
767782
if ck.Chunk != nil {
768-
w.srcChunkPool <- ck.Chunk
783+
w.srcChunkPool.Put(ck.Chunk)
769784
}
770785
}()
771786
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
@@ -787,7 +802,7 @@ type indexIngestLocalWorker struct {
787802
func (w *indexIngestLocalWorker) HandleTask(ck IndexRecordChunk, send func(IndexWriteResult)) {
788803
defer func() {
789804
if ck.Chunk != nil {
790-
w.srcChunkPool <- ck.Chunk
805+
w.srcChunkPool.Put(ck.Chunk)
791806
}
792807
}()
793808
rs, err := w.indexIngestBaseWorker.HandleTask(ck)
@@ -827,7 +842,7 @@ type indexIngestBaseWorker struct {
827842
restore func(sessionctx.Context)
828843

829844
writers []ingest.Writer
830-
srcChunkPool chan *chunk.Chunk
845+
srcChunkPool *sync.Pool
831846
// only available in global sort
832847
totalCount *atomic.Int64
833848
}

pkg/ddl/backfilling_read_index.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
114114
if err != nil {
115115
return err
116116
}
117-
return executeAndClosePipeline(opCtx, pipe)
117+
return executeAndClosePipeline(opCtx, pipe, nil, 0)
118118
}
119119

120120
pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
121121
if err != nil {
122122
return err
123123
}
124-
err = executeAndClosePipeline(opCtx, pipe)
124+
err = executeAndClosePipeline(opCtx, pipe, nil, 0)
125125
if err != nil {
126126
// For dist task local based ingest, checkpoint is unsupported.
127127
// If there is an error we should keep local sort dir clean.

pkg/ddl/backfilling_test.go

+34
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ package ddl
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"testing"
2021
"time"
2122

23+
"github.com/pingcap/tidb/pkg/ddl/copr"
2224
"github.com/pingcap/tidb/pkg/ddl/ingest"
2325
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
2426
"github.com/pingcap/tidb/pkg/errctx"
@@ -485,3 +487,35 @@ func TestValidateAndFillRanges(t *testing.T) {
485487
err = validateAndFillRanges(ranges, []byte("b"), []byte("f"))
486488
require.Error(t, err)
487489
}
490+
491+
func TestTuneTableScanWorkerBatchSize(t *testing.T) {
492+
reorgMeta := &model.DDLReorgMeta{
493+
Concurrency: 4,
494+
BatchSize: 32,
495+
}
496+
copCtx := &copr.CopContextSingleIndex{
497+
CopContextBase: &copr.CopContextBase{
498+
FieldTypes: []*types.FieldType{},
499+
},
500+
}
501+
opCtx, cancel := NewDistTaskOperatorCtx(context.Background(), 1, 1)
502+
w := tableScanWorker{
503+
copCtx: copCtx,
504+
ctx: opCtx,
505+
srcChkPool: createChunkPool(copCtx, reorgMeta),
506+
hintBatchSize: 32,
507+
reorgMeta: reorgMeta,
508+
}
509+
for i := 0; i < 10; i++ {
510+
chk := w.getChunk()
511+
require.Equal(t, 32, chk.Capacity())
512+
w.srcChkPool.Put(chk)
513+
}
514+
reorgMeta.SetBatchSize(64)
515+
for i := 0; i < 10; i++ {
516+
chk := w.getChunk()
517+
require.Equal(t, 64, chk.Capacity())
518+
w.srcChkPool.Put(chk)
519+
}
520+
cancel()
521+
}

0 commit comments

Comments
 (0)