Skip to content

Commit 50d888e

Browse files
authored
ddl: replace local ingest impl with backfill operators (#54149) (#54225)
close #53909, close #53910
1 parent 9d2494d commit 50d888e

19 files changed

+396
-676
lines changed

pkg/ddl/backfilling.go

+153-6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/pingcap/errors"
2727
"github.com/pingcap/failpoint"
28+
"github.com/pingcap/tidb/pkg/ddl/ingest"
2829
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
2930
"github.com/pingcap/tidb/pkg/ddl/logutil"
3031
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
@@ -619,6 +620,154 @@ func SetBackfillTaskChanSizeForTest(n int) {
619620
backfillTaskChanSize = n
620621
}
621622

623+
func (dc *ddlCtx) runAddIndexInLocalIngestMode(
624+
ctx context.Context,
625+
sessPool *sess.Pool,
626+
t table.PhysicalTable,
627+
reorgInfo *reorgInfo,
628+
) error {
629+
// TODO(tangenta): support adjust worker count dynamically.
630+
if err := dc.isReorgRunnable(reorgInfo.Job.ID, false); err != nil {
631+
return errors.Trace(err)
632+
}
633+
job := reorgInfo.Job
634+
opCtx := NewLocalOperatorCtx(ctx, job.ID)
635+
idxCnt := len(reorgInfo.elements)
636+
indexIDs := make([]int64, 0, idxCnt)
637+
indexInfos := make([]*model.IndexInfo, 0, idxCnt)
638+
uniques := make([]bool, 0, idxCnt)
639+
hasUnique := false
640+
for _, e := range reorgInfo.elements {
641+
indexIDs = append(indexIDs, e.ID)
642+
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, e.ID)
643+
if indexInfo == nil {
644+
logutil.DDLIngestLogger().Warn("index info not found",
645+
zap.Int64("jobID", job.ID),
646+
zap.Int64("tableID", t.Meta().ID),
647+
zap.Int64("indexID", e.ID))
648+
return errors.Errorf("index info not found: %d", e.ID)
649+
}
650+
indexInfos = append(indexInfos, indexInfo)
651+
uniques = append(uniques, indexInfo.Unique)
652+
hasUnique = hasUnique || indexInfo.Unique
653+
}
654+
655+
//nolint: forcetypeassert
656+
discovery := dc.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
657+
bcCtx, err := ingest.LitBackCtxMgr.Register(
658+
ctx, job.ID, hasUnique, dc.etcdCli, discovery, job.ReorgMeta.ResourceGroupName)
659+
if err != nil {
660+
return errors.Trace(err)
661+
}
662+
defer ingest.LitBackCtxMgr.Unregister(job.ID)
663+
engines, err := bcCtx.Register(indexIDs, uniques, t.Meta())
664+
if err != nil {
665+
logutil.DDLIngestLogger().Error("cannot register new engine",
666+
zap.Int64("jobID", job.ID),
667+
zap.Error(err),
668+
zap.Int64s("index IDs", indexIDs))
669+
return errors.Trace(err)
670+
}
671+
defer bcCtx.UnregisterEngines()
672+
sctx, err := sessPool.Get()
673+
if err != nil {
674+
return errors.Trace(err)
675+
}
676+
defer sessPool.Put(sctx)
677+
678+
cpMgr, err := ingest.NewCheckpointManager(
679+
ctx,
680+
bcCtx,
681+
sessPool,
682+
job.ID,
683+
indexIDs,
684+
bcCtx.GetLocalBackend().LocalStoreDir,
685+
dc.store.(kv.StorageWithPD).GetPDClient(),
686+
)
687+
if err != nil {
688+
logutil.DDLIngestLogger().Warn("create checkpoint manager failed",
689+
zap.Int64("jobID", job.ID),
690+
zap.Error(err))
691+
} else {
692+
defer cpMgr.Close()
693+
cpMgr.Reset(t.GetPhysicalID(), reorgInfo.StartKey, reorgInfo.EndKey)
694+
bcCtx.AttachCheckpointManager(cpMgr)
695+
}
696+
697+
reorgCtx := dc.getReorgCtx(reorgInfo.Job.ID)
698+
rowCntListener := &localRowCntListener{
699+
prevPhysicalRowCnt: reorgCtx.getRowCount(),
700+
reorgCtx: dc.getReorgCtx(reorgInfo.Job.ID),
701+
counter: metrics.BackfillTotalCounter.WithLabelValues(
702+
metrics.GenerateReorgLabel("add_idx_rate", job.SchemaName, job.TableName)),
703+
}
704+
705+
avgRowSize := estimateTableRowSize(ctx, dc.store, sctx.GetRestrictedSQLExecutor(), t)
706+
concurrency := int(variable.GetDDLReorgWorkerCounter())
707+
708+
pipe, err := NewAddIndexIngestPipeline(
709+
opCtx,
710+
dc.store,
711+
sessPool,
712+
bcCtx,
713+
engines,
714+
job.ID,
715+
t,
716+
indexInfos,
717+
reorgInfo.StartKey,
718+
reorgInfo.EndKey,
719+
job.ReorgMeta,
720+
avgRowSize,
721+
concurrency,
722+
cpMgr,
723+
rowCntListener,
724+
)
725+
if err != nil {
726+
return err
727+
}
728+
err = pipe.Execute()
729+
if err != nil {
730+
return err
731+
}
732+
err = pipe.Close()
733+
if opCtx.OperatorErr() != nil {
734+
return opCtx.OperatorErr()
735+
}
736+
if err != nil {
737+
return err
738+
}
739+
for i, isUK := range uniques {
740+
if isUK {
741+
err := bcCtx.CollectRemoteDuplicateRows(indexIDs[i], t)
742+
if err != nil {
743+
return err
744+
}
745+
}
746+
}
747+
return nil
748+
}
749+
750+
type localRowCntListener struct {
751+
EmptyRowCntListener
752+
reorgCtx *reorgCtx
753+
counter prometheus.Counter
754+
755+
// prevPhysicalRowCnt records the row count from previous physical tables (partitions).
756+
prevPhysicalRowCnt int64
757+
// curPhysicalRowCnt records the row count of current physical table.
758+
curPhysicalRowCnt int64
759+
}
760+
761+
func (s *localRowCntListener) Written(rowCnt int) {
762+
s.curPhysicalRowCnt += int64(rowCnt)
763+
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + s.curPhysicalRowCnt)
764+
s.counter.Add(float64(rowCnt))
765+
}
766+
767+
func (s *localRowCntListener) SetTotal(total int) {
768+
s.reorgCtx.setRowCount(s.prevPhysicalRowCnt + int64(total))
769+
}
770+
622771
// writePhysicalTableRecord handles the "add index" or "modify/change column" reorganization state for a non-partitioned table or a partition.
623772
// For a partitioned table, it should be handled partition by partition.
624773
//
@@ -653,21 +802,19 @@ func (dc *ddlCtx) writePhysicalTableRecord(
653802
failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(reorgInfo.Job.ErrorCount)) + ", mock unknown type: ast.whenClause."))
654803
}
655804
})
805+
if bfWorkerType == typeAddIndexWorker && reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
806+
return dc.runAddIndexInLocalIngestMode(ctx, sessPool, t, reorgInfo)
807+
}
656808

657809
jc := reorgInfo.NewJobContext()
658810

659811
eg, egCtx := util.NewErrorGroupWithRecoverWithCtx(ctx)
660812

661-
scheduler, err := newBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
813+
scheduler, err := newTxnBackfillScheduler(egCtx, reorgInfo, sessPool, bfWorkerType, t, jc)
662814
if err != nil {
663815
return errors.Trace(err)
664816
}
665817
defer scheduler.close(true)
666-
if lit, ok := scheduler.(*ingestBackfillScheduler); ok {
667-
if lit.importStarted() {
668-
return nil
669-
}
670-
}
671818

672819
err = scheduler.setupWorkers()
673820
if err != nil {

0 commit comments

Comments
 (0)