diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 72a3135c2ff76..6a442b13d6ee3 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -143,6 +143,31 @@ type backfillTaskContext struct { warningsCount map[errors.ErrorID]int64 } +type reorgBackfillTask struct { + physicalTableID int64 + startKey kv.Key + endKey kv.Key + endInclude bool +} + +func (r *reorgBackfillTask) String() string { + physicalID := strconv.FormatInt(r.physicalTableID, 10) + startKey := tryDecodeToHandleString(r.startKey) + endKey := tryDecodeToHandleString(r.endKey) + rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey + if r.endInclude { + return rangeStr + "]" + } + return rangeStr + ")" +} + +// mergeBackfillCtxToResult merge partial result in taskCtx into result. +func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) { + result.nextKey = taskCtx.nextKey + result.addedCount += taskCtx.addedCount + result.scanCount += taskCtx.scanCount +} + type backfillWorker struct { id int reorgInfo *reorgInfo @@ -181,53 +206,6 @@ func closeBackfillWorkers(workers []*backfillWorker) { } } -type reorgBackfillTask struct { - physicalTableID int64 - startKey kv.Key - endKey kv.Key - endInclude bool -} - -func (r *reorgBackfillTask) String() string { - physicalID := strconv.FormatInt(r.physicalTableID, 10) - startKey := tryDecodeToHandleString(r.startKey) - endKey := tryDecodeToHandleString(r.endKey) - rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey - if r.endInclude { - return rangeStr + "]" - } - return rangeStr + ")" -} - -func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) { - if threshold == 0 { - threshold = atomic.LoadUint32(&variable.DDLSlowOprThreshold) - } - - if elapsed >= time.Duration(threshold)*time.Millisecond { - logutil.BgLogger().Info("[ddl] slow operations", zap.Duration("takeTimes", elapsed), zap.String("msg", slowMsg)) - } -} - -// mergeBackfillCtxToResult merge partial result in taskCtx into result. -func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) { - result.nextKey = taskCtx.nextKey - result.addedCount += taskCtx.addedCount - result.scanCount += taskCtx.scanCount -} - -func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) { - for _, warn := range partWarnings { - if _, ok := totalWarningsCount[warn.ID()]; ok { - totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()] - } else { - totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()] - totalWarnings[warn.ID()] = warn - } - } - return totalWarnings, totalWarningsCount -} - // handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table. func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult { handleRange := *task @@ -365,7 +343,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey return ranges, nil } -func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int, +func waitTaskResults(workers []*backfillWorker, taskCnt int, totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) { var ( addedCount int64 @@ -396,9 +374,9 @@ func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int, return nextKey, addedCount, errors.Trace(firstErr) } -// handleReorgTasks sends tasks to workers, and waits for all the running workers to return results, +// sendTasksAndWait sends tasks to workers, and waits for all the running workers to return results, // there are taskCnt running workers. -func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { +func (dc *ddlCtx) sendTasksAndWait(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { for i, task := range batchTasks { workers[i].taskCh <- task } @@ -406,15 +384,15 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, startKey := batchTasks[0].startKey taskCnt := len(batchTasks) startTime := time.Now() - nextKey, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startKey) + nextKey, taskAddedCount, err := waitTaskResults(workers, taskCnt, totalAddedCount, startKey) elapsedTime := time.Since(startTime) if err == nil { - err = w.isReorgRunnable(reorgInfo.Job) + err = dc.isReorgRunnable(reorgInfo.Job) } if err != nil { // Update the reorg handle that has been processed. - err1 := reorgInfo.UpdateReorgMeta(nextKey, w.sessPool) + err1 := reorgInfo.UpdateReorgMeta(nextKey, sessPool) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds()) logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed", zap.ByteString("elementType", reorgInfo.currElement.TypeKey), @@ -430,7 +408,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, } // nextHandle will be updated periodically in runReorgJob, so no need to update it here. - w.getReorgCtx(reorgInfo.Job).setNextKey(nextKey) + dc.getReorgCtx(reorgInfo.Job).setNextKey(nextKey) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds()) logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch", zap.ByteString("elementType", reorgInfo.currElement.TypeKey), @@ -470,8 +448,8 @@ func tryDecodeToHandleString(key kv.Key) string { return handle.String() } -// sendRangeTaskToWorkers sends tasks to workers, and returns remaining kvRanges that is not handled. -func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo, +// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled. +func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo, totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { batchTasks := make([]*reorgBackfillTask, 0, len(workers)) physicalTableID := reorgInfo.PhysicalTableID @@ -506,7 +484,7 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker } // Wait tasks finish. - err := w.handleReorgTasks(reorgInfo, totalAddedCount, workers, batchTasks) + err := dc.sendTasksAndWait(sessPool, reorgInfo, totalAddedCount, workers, batchTasks) if err != nil { return nil, errors.Trace(err) } @@ -529,15 +507,14 @@ var ( TestCheckReorgTimeout = int32(0) ) -func loadDDLReorgVars(w *worker) error { +func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error { // Get sessionctx from context resource pool. - var ctx sessionctx.Context - ctx, err := w.sessPool.get() + sCtx, err := sessPool.get() if err != nil { return errors.Trace(err) } - defer w.sessPool.put(ctx) - return ddlutil.LoadDDLReorgVars(w.ctx, ctx) + defer sessPool.put(sCtx) + return ddlutil.LoadDDLReorgVars(ctx, sCtx) } func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) { @@ -585,7 +562,7 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { +func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { job := reorgInfo.Job totalAddedCount := job.GetRowCount() @@ -596,7 +573,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba return errors.Trace(err) } - if err := w.isReorgRunnable(reorgInfo.Job); err != nil { + if err := dc.isReorgRunnable(reorgInfo.Job); err != nil { return errors.Trace(err) } if startKey == nil && endKey == nil { @@ -616,7 +593,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba defer func() { closeBackfillWorkers(backfillWorkers) }() - jc := w.jobContext(job) + jc := dc.jobContext(job) for { kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey) @@ -625,7 +602,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba } // For dynamic adjust backfill worker number. - if err := loadDDLReorgVars(w); err != nil { + if err := loadDDLReorgVars(dc.ctx, sessPool); err != nil { logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err)) } workerCnt = variable.GetDDLReorgWorkerCounter() @@ -706,7 +683,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba zap.Int("regionCnt", len(kvRanges)), zap.String("startHandle", tryDecodeToHandleString(startKey)), zap.String("endHandle", tryDecodeToHandleString(endKey))) - remains, err := w.sendRangeTaskToWorkers(t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges) + remains, err := dc.handleRangeTasks(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges) if err != nil { return errors.Trace(err) } @@ -805,3 +782,25 @@ func getRangeEndKey(ctx *JobContext, store kv.Storage, priority int, t table.Tab return it.Key(), nil } + +func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) { + for _, warn := range partWarnings { + if _, ok := totalWarningsCount[warn.ID()]; ok { + totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()] + } else { + totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()] + totalWarnings[warn.ID()] = warn + } + } + return totalWarnings, totalWarningsCount +} + +func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) { + if threshold == 0 { + threshold = atomic.LoadUint32(&variable.DDLSlowOprThreshold) + } + + if elapsed >= time.Duration(threshold)*time.Millisecond { + logutil.BgLogger().Info("[ddl] slow operations", zap.Duration("takeTimes", elapsed), zap.String("msg", slowMsg)) + } +} diff --git a/ddl/column.go b/ddl/column.go index 79ac451c73380..ac5506d357998 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1004,7 +1004,7 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeUpdateColumnWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo) } // TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started. diff --git a/ddl/ddl_workerpool.go b/ddl/ddl_workerpool.go index c7b8a029f0095..de709b6faeb3b 100644 --- a/ddl/ddl_workerpool.go +++ b/ddl/ddl_workerpool.go @@ -83,7 +83,72 @@ func (wp *workerPool) close() { wp.resPool.Close() } -// tp return the type of worker pool. +// tp return the type of backfill worker pool. func (wp *workerPool) tp() jobType { return wp.t } + +// backfilWorkerPool is used to new backfill worker. +type backfilWorkerPool struct { + exit atomic.Bool + resPool *pools.ResourcePool +} + +func newBackfillWorkerPool(resPool *pools.ResourcePool) *backfilWorkerPool { + return &backfilWorkerPool{ + exit: *atomic.NewBool(false), + resPool: resPool, + } +} + +// setCapacity changes the capacity of the pool. +// A setCapacity of 0 is equivalent to closing the backfilWorkerPool. +func (bwp *backfilWorkerPool) setCapacity(capacity int) error { + return bwp.resPool.SetCapacity(capacity) +} + +// get gets backfilWorkerPool from context resource pool. +// Please remember to call put after you finished using backfilWorkerPool. +func (bwp *backfilWorkerPool) get() (*backfillWorker, error) { + if bwp.resPool == nil { + return nil, nil + } + + if bwp.exit.Load() { + return nil, errors.Errorf("backfill worker pool is closed") + } + + // no need to protect bwp.resPool + resource, err := bwp.resPool.TryGet() + if err != nil { + return nil, errors.Trace(err) + } + if resource == nil { + return nil, nil + } + + worker := resource.(*backfillWorker) + return worker, nil +} + +// put returns workerPool to context resource pool. +func (bwp *backfilWorkerPool) put(wk *backfillWorker) { + if bwp.resPool == nil || bwp.exit.Load() { + return + } + + // No need to protect bwp.resPool, even the bwp.resPool is closed, the ctx still need to + // put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing. + bwp.resPool.Put(wk) +} + +// close clean up the backfilWorkerPool. +func (bwp *backfilWorkerPool) close() { + // Prevent closing resPool twice. + if bwp.resPool == nil || bwp.exit.Load() { + return + } + bwp.exit.Store(true) + logutil.BgLogger().Info("[ddl] closing workerPool") + bwp.resPool.Close() +} diff --git a/ddl/ddl_workerpool_test.go b/ddl/ddl_workerpool_test.go index 4a89a4dfba3d2..2c9f8553e18cf 100644 --- a/ddl/ddl_workerpool_test.go +++ b/ddl/ddl_workerpool_test.go @@ -18,9 +18,11 @@ import ( "testing" "github.com/ngaut/pools" + "github.com/pingcap/tidb/parser/model" + "github.com/stretchr/testify/require" ) -func TestBackfillWorkerPool(t *testing.T) { +func TestDDLWorkerPool(t *testing.T) { f := func() func() (pools.Resource, error) { return func() (pools.Resource, error) { wk := newWorker(nil, addIdxWorker, nil, nil, nil, true) @@ -31,3 +33,41 @@ func TestBackfillWorkerPool(t *testing.T) { pool.close() pool.put(nil) } + +func TestBackfillWorkerPool(t *testing.T) { + reorgInfo := &reorgInfo{Job: &model.Job{ID: 1}} + f := func() func() (pools.Resource, error) { + return func() (pools.Resource, error) { + wk := newBackfillWorker(nil, 1, nil, reorgInfo) + return wk, nil + } + } + pool := newBackfillWorkerPool(pools.NewResourcePool(f(), 1, 2, 0)) + bwp, err := pool.get() + require.NoError(t, err) + require.Equal(t, 1, bwp.id) + // test it to reach the capacity + bwp1, err := pool.get() + require.NoError(t, err) + require.Nil(t, bwp1) + + // test setCapacity + err = pool.setCapacity(2) + require.NoError(t, err) + bwp1, err = pool.get() + require.NoError(t, err) + require.Equal(t, 1, bwp1.id) + pool.put(bwp) + pool.put(bwp1) + + // test close + pool.close() + pool.close() + require.Equal(t, true, pool.exit.Load()) + pool.put(bwp1) + + bwp, err = pool.get() + require.Error(t, err) + require.Equal(t, "backfill worker pool is closed", err.Error()) + require.Nil(t, bwp) +} diff --git a/ddl/index.go b/ddl/index.go index 85b860ac4e5d4..3350c16a83c2c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1344,7 +1344,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeAddIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexWorker, reorgInfo) } // addTableIndex handles the add index reorganization state for a table. @@ -1547,7 +1547,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t // cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition. func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeCleanUpIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) } // cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions.