Skip to content

Commit

Permalink
ddl: use a param to decide whether exec in distributed mode (#41549) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 17, 2023
1 parent b086085 commit f4ca082
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
8 changes: 4 additions & 4 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ type backfillCtx struct {
}

func newBackfillCtx(ctx *ddlCtx, id int, sessCtx sessionctx.Context, reorgTp model.ReorgType,
schemaName string, tbl table.Table) *backfillCtx {
if id == 0 {
schemaName string, tbl table.Table, isDistributed bool) *backfillCtx {
if isDistributed {
id = int(backfillContextID.Add(1))
}
return &backfillCtx{
Expand Down Expand Up @@ -912,7 +912,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
)
switch b.tp {
case typeAddIndexWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl)
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl, false)
idxWorker, err := newAddIndexWorker(b.decodeColMap, b.tbl, backfillCtx,
jc, job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey)
if err != nil {
Expand All @@ -925,7 +925,7 @@ func (b *backfillScheduler) adjustWorkerSize() error {
runner = newBackfillWorker(jc.ddlJobCtx, idxWorker)
worker = idxWorker
case typeAddIndexMergeTmpWorker:
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl)
backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl, false)
tmpIdxWorker := newMergeTempIndexWorker(backfillCtx, i, b.tbl, reorgInfo.currElement.ID, jc)
runner = newBackfillWorker(jc.ddlJobCtx, tmpIdxWorker)
worker = tmpIdxWorker
Expand Down
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &updateColumnWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t, false),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
Expand Down
2 changes: 1 addition & 1 deletion ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newBackfillWorkerContext(d *ddl, schemaName string, tbl table.Table, worker
}

var bf backfiller
bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, bfMeta.ReorgTp, schemaName, tbl))
bf, err = bfFunc(newBackfillCtx(d.ddlCtx, 0, se, bfMeta.ReorgTp, schemaName, tbl, true))
if err != nil {
if canSkipError(jobID, len(bwCtx.backfillWorkers), err) {
err = nil
Expand Down
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,7 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
}
return &cleanUpIndexWorker{
baseIndexWorker: baseIndexWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t),
backfillCtx: newBackfillCtx(reorgInfo.d, id, sessCtx, reorgInfo.ReorgMeta.ReorgTp, reorgInfo.SchemaName, t, false),
indexes: indexes,
rowDecoder: rowDecoder,
defaultVals: make([]types.Datum, len(t.WritableCols())),
Expand Down

0 comments on commit f4ca082

Please sign in to comment.