Skip to content

Commit

Permalink
Revert "ddl: refactor add index worker for ingest mode (#42115)" (#42438
Browse files Browse the repository at this point in the history
)

close #42436
  • Loading branch information
tangenta authored Mar 22, 2023
1 parent c5aa58c commit 42957c5
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 186 deletions.
72 changes: 27 additions & 45 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tidb/util/topsql"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -147,34 +146,31 @@ type backfillTaskContext struct {
type backfillCtx struct {
id int
*ddlCtx
sessCtx sessionctx.Context
schemaName string
table table.Table
batchCnt int
jobContext *JobContext
metricCounter prometheus.Counter
reorgTp model.ReorgType
sessCtx sessionctx.Context
schemaName string
table table.Table
batchCnt int
}

func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context,
schemaName string, tbl table.Table, jobCtx *JobContext, label string, isDistributed bool) *backfillCtx {
func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context, reorgTp model.ReorgType,
schemaName string, tbl table.Table, isDistributed bool) *backfillCtx {
if isDistributed {
id = int(backfillContextID.Add(1))
}
return &backfillCtx{
id: id,
ddlCtx: ctx,
sessCtx: sessCtx,
reorgTp: reorgTp,
schemaName: schemaName,
table: tbl,
batchCnt: int(variable.GetDDLReorgBatchSize()),
jobContext: jobCtx,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel(label, schemaName, tbl.Meta().Name.String())),
}
}

type backfiller interface {
BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, err error)
BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error)
AddMetricInfo(float64)
GetTasks() ([]*BackfillJob, error)
UpdateTask(bfJob *BackfillJob) error
Expand Down Expand Up @@ -316,7 +312,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
}
for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillData we will never cancel the job.
// if there is panic in bf.BackfillDataInTxn we will never cancel the job.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := d.isReorgRunnable(jobID, isDistReorg)
Expand All @@ -325,7 +321,7 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
return result
}

taskCtx, err := bf.BackfillData(handleRange)
taskCtx, err := bf.BackfillDataInTxn(handleRange)
if err != nil {
result.err = err
return result
Expand Down Expand Up @@ -387,13 +383,9 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,

func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) {
if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok {
switch w := w.backfiller.(type) {
case *addIndexTxnWorker:
if addIdxWorker, ok := w.backfiller.(*addIndexWorker); ok {
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
w.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
case *addIndexIngestWorker:
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
w.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
addIdxWorker.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
}
}
Expand Down Expand Up @@ -870,31 +862,21 @@ func (b *backfillScheduler) adjustWorkerSize() error {
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "add_idx_rate", false)
if reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
idxWorker, err := newAddIndexIngestWorker(b.tbl, backfillCtx,
job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
if canSkipError(b.reorgInfo.ID, len(b.workers), err) {
continue
}
return err
}
idxWorker.copReqSenderPool = b.copReqSenderPool
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
} else {
idxWorker, err := newAddIndexTxnWorker(b.decodeColMap, b.tbl, backfillCtx,
job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
return err
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl, false)
idxWorker, err := newAddIndexWorker(b.decodeColMap, b.tbl, backfillCtx,
jc, job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
if canSkipError(b.reorgInfo.ID, len(b.workers), err) {
continue
}
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
return err
}
idxWorker.copReqSenderPool = b.copReqSenderPool
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, job.SchemaName, b.tbl, jc, "merge_tmp_idx_rate", false)
tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, b.tbl, reorgInfo.currElement.ID)
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl, false)
tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, i, b.tbl, reorgInfo.currElement.ID, jc)
runner = newBackfillWorker(jc.ddlJobCtx, tmpIdxWorker)
worker = tmpIdxWorker
case typeUpdateColumnWorker:
Expand Down Expand Up @@ -935,7 +917,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
}

func (b *backfillScheduler) initCopReqSenderPool() {
if b.tp != typeAddIndexWorker || b.reorgInfo.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge ||
if b.tp != typeAddIndexWorker || b.reorgInfo.Job.ReorgMeta.ReorgTp != model.ReorgTypeLitMerge ||
b.copReqSenderPool != nil || len(b.workers) > 0 {
return
}
Expand Down Expand Up @@ -1023,7 +1005,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic
defer scheduler.Close()

var ingestBeCtx *ingest.BackendContext
if bfWorkerType == typeAddIndexWorker && reorgInfo.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
if bfWorkerType == typeAddIndexWorker && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
if bc, ok := ingest.LitBackCtxMgr.Load(job.ID); ok {
ingestBeCtx = bc
} else {
Expand Down
29 changes: 18 additions & 11 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1174,14 +1175,18 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error

type updateColumnWorker struct {
*backfillCtx
oldColInfo *model.ColumnInfo
newColInfo *model.ColumnInfo
oldColInfo *model.ColumnInfo
newColInfo *model.ColumnInfo
metricCounter prometheus.Counter

// The following attributes are used to reduce memory allocation.
rowRecords []*rowRecord
rowDecoder *decoder.RowDecoder

rowMap map[int64]types.Datum

// For SQL Mode and warnings.
jobContext *JobContext
}

func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
Expand All @@ -1200,11 +1205,13 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &updateColumnWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.SchemaName, t, jc, "update_col_rate", false),
oldColInfo: oldCol,
newColInfo: newCol,
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t, false),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
jobContext: jc,
}
}

Expand Down Expand Up @@ -1257,7 +1264,7 @@ func (w *updateColumnWorker) fetchRowColVals(txn kv.Transaction, taskRange reorg
taskDone := false
var lastAccessedHandle kv.Key
oprStartTime := startTime
err := iterateSnapshotKeys(w.jobContext, w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(),
err := iterateSnapshotKeys(w.GetCtx().jobContext(taskRange.getJobID()), w.sessCtx.GetStore(), taskRange.priority, taskRange.physicalTable.RecordPrefix(),
txn.StartTS(), taskRange.startKey, taskRange.endKey, func(handle kv.Handle, recordKey kv.Key, rawRow []byte) (bool, error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterateSnapshotKeys in updateColumnWorker fetchRowColVals", 0)
Expand Down Expand Up @@ -1392,8 +1399,8 @@ func (w *updateColumnWorker) cleanRowMap() {
}
}

// BackfillData will backfill the table record in a transaction. A lock corresponds to a rowKey if the value of rowKey is changed.
func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
// BackfillDataInTxn will backfill the table record in a transaction. A lock corresponds to a rowKey if the value of rowKey is changed.
func (w *updateColumnWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
Expand Down Expand Up @@ -1437,7 +1444,7 @@ func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCt

return nil
})
logSlowOperations(time.Since(oprStartTime), "BackfillData", 3000)
logSlowOperations(time.Since(oprStartTime), "BackfillDataInTxn", 3000)

return
}
Expand Down
4 changes: 2 additions & 2 deletions ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, worker
}

var bf backfiller
bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, schemaName, tbl, d.jobContext(jobID), "add_idx_rate", true))
bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, bfMeta.ReorgTp, schemaName, tbl, true))
if err != nil {
if canSkipError(jobID, len(bwCtx.backfillWorkers), err) {
err = nil
Expand Down Expand Up @@ -199,7 +199,7 @@ func runBackfillJobs(d *ddl, sess *session, ingestBackendCtx *ingest.BackendCont

workerCnt := int(variable.GetDDLReorgWorkerCounter())
// TODO: Different worker using different newBackfillerFunc.
workerCtx, err := newAddIndexWorkerContext(d, dbInfo.Name, tbl, workerCnt, bJob)
workerCtx, err := newAddIndexWorkerContext(d, dbInfo.Name, tbl, workerCnt, bJob, jobCtx)
if err != nil || workerCtx == nil {
logutil.BgLogger().Info("[ddl] new adding index worker context failed", zap.Reflect("workerCtx", workerCtx), zap.Error(err))
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 42957c5

Please sign in to comment.