Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: dynamically adjusting the concurrency and batch size of reorganization job #57468

Merged
merged 24 commits into from
Nov 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
start adjust go routine after pipeline execute
fzzf678 committed Nov 19, 2024
commit 64f9f4d9f0074c185b2423614c461fee7a300420
91 changes: 49 additions & 42 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
@@ -777,47 +777,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return err
}

// Adjust worker pool size dynamically.
go func() {
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
if opR == nil || opW == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
return
}
reader, readerOk := opR.(*TableScanOperator)
writer, writerOk := opW.(*IndexIngestOperator)
if !readerOk || !writerOk {
logutil.DDLIngestLogger().Error(
"unexpected operator types, config can't be adjusted",
zap.Int64("jobID", job.ID),
zap.Bool("isReaderValid", readerOk),
zap.Bool("isWriterValid", writerOk),
)
return
}
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(job.ReorgMeta.GetConcurrency(), avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
}
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
}
}()

err = executeAndClosePipeline(opCtx, pipe)
err = executeAndClosePipeline(opCtx, pipe, reorgInfo.Job, avgRowSize)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
@@ -834,11 +794,58 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline) error {
func adjustWorkerPoolSize(ctx context.Context, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) {
opR, opW := pipe.GetLocalIngestModeReaderAndWriter()
if opR == nil || opW == nil {
logutil.DDLIngestLogger().Error("failed to get local ingest mode reader or writer", zap.Int64("jobID", job.ID))
return
}
reader, readerOk := opR.(*TableScanOperator)
writer, writerOk := opW.(*IndexIngestOperator)
if !readerOk || !writerOk {
logutil.DDLIngestLogger().Error(
"unexpected operator types, config can't be adjusted",
zap.Int64("jobID", job.ID),
zap.Bool("isReaderValid", readerOk),
zap.Bool("isWriterValid", writerOk),
)
return
}
ticker := time.NewTicker(UpdateDDLJobReorgCfgInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
targetReaderCnt, targetWriterCnt := expectedIngestWorkerCnt(job.ReorgMeta.GetConcurrency(), avgRowSize)
currentReaderCnt, currentWriterCnt := reader.GetWorkerPoolSize(), writer.GetWorkerPoolSize()
if int32(targetReaderCnt) == currentReaderCnt && int32(targetWriterCnt) == currentWriterCnt {
continue
}
reader.TuneWorkerPoolSize(int32(targetReaderCnt))
writer.TuneWorkerPoolSize(int32(targetWriterCnt))
logutil.DDLIngestLogger().Info("adjust ddl job config success",
zap.Int64("jobID", job.ID),
zap.Int32("table scan operator count", reader.GetWorkerPoolSize()),
zap.Int32("index ingest operator count", writer.GetWorkerPoolSize()))
}
}
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline, job *model.Job, avgRowSize int) error {
err := pipe.Execute()
if err != nil {
return err
}

// Adjust worker pool size dynamically.
if job != nil && avgRowSize != 0 {
go func() {
adjustWorkerPoolSize(ctx, pipe, job, avgRowSize)
}()
}

err = pipe.Close()
if opErr := ctx.OperatorErr(); opErr != nil {
return opErr
4 changes: 2 additions & 2 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
@@ -114,14 +114,14 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe)
return executeAndClosePipeline(opCtx, pipe, nil, 0)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}
err = executeAndClosePipeline(opCtx, pipe)
err = executeAndClosePipeline(opCtx, pipe, nil, 0)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.