From c54335aa55eebcc820a2013f725616daa09dcafa Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 1 Sep 2022 09:40:02 +0800 Subject: [PATCH 1/6] ddl: update some function arguments --- ddl/backfilling.go | 36 ++++++++++++++++++------------------ ddl/column.go | 2 +- ddl/index.go | 4 ++-- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 72a3135c2ff76..40d8e928fa476 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -365,7 +365,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey return ranges, nil } -func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int, +func waitTaskResults(workers []*backfillWorker, taskCnt int, totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) { var ( addedCount int64 @@ -398,7 +398,7 @@ func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int, // handleReorgTasks sends tasks to workers, and waits for all the running workers to return results, // there are taskCnt running workers. -func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { +func (dc *ddlCtx) handleReorgTasks(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { for i, task := range batchTasks { workers[i].taskCh <- task } @@ -406,15 +406,15 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, startKey := batchTasks[0].startKey taskCnt := len(batchTasks) startTime := time.Now() - nextKey, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startKey) + nextKey, taskAddedCount, err := waitTaskResults(workers, taskCnt, totalAddedCount, startKey) elapsedTime := time.Since(startTime) if err == nil { - err = w.isReorgRunnable(reorgInfo.Job) + err = dc.isReorgRunnable(reorgInfo.Job) } if err != nil { // Update the reorg handle that has been processed. - err1 := reorgInfo.UpdateReorgMeta(nextKey, w.sessPool) + err1 := reorgInfo.UpdateReorgMeta(nextKey, sessPool) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds()) logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed", zap.ByteString("elementType", reorgInfo.currElement.TypeKey), @@ -430,7 +430,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, } // nextHandle will be updated periodically in runReorgJob, so no need to update it here. - w.getReorgCtx(reorgInfo.Job).setNextKey(nextKey) + dc.getReorgCtx(reorgInfo.Job).setNextKey(nextKey) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds()) logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch", zap.ByteString("elementType", reorgInfo.currElement.TypeKey), @@ -471,7 +471,7 @@ func tryDecodeToHandleString(key kv.Key) string { } // sendRangeTaskToWorkers sends tasks to workers, and returns remaining kvRanges that is not handled. -func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo, +func (dc *ddlCtx) sendRangeTaskToWorkers(sessPool *sessionPool, t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo, totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { batchTasks := make([]*reorgBackfillTask, 0, len(workers)) physicalTableID := reorgInfo.PhysicalTableID @@ -506,7 +506,7 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker } // Wait tasks finish. - err := w.handleReorgTasks(reorgInfo, totalAddedCount, workers, batchTasks) + err := dc.handleReorgTasks(sessPool, reorgInfo, totalAddedCount, workers, batchTasks) if err != nil { return nil, errors.Trace(err) } @@ -529,15 +529,15 @@ var ( TestCheckReorgTimeout = int32(0) ) -func loadDDLReorgVars(w *worker) error { +func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error { // Get sessionctx from context resource pool. - var ctx sessionctx.Context - ctx, err := w.sessPool.get() + var sCtx sessionctx.Context + sCtx, err := sessPool.get() if err != nil { return errors.Trace(err) } - defer w.sessPool.put(ctx) - return ddlutil.LoadDDLReorgVars(w.ctx, ctx) + defer sessPool.put(sCtx) + return ddlutil.LoadDDLReorgVars(ctx, sCtx) } func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) { @@ -585,7 +585,7 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { +func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { job := reorgInfo.Job totalAddedCount := job.GetRowCount() @@ -596,7 +596,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba return errors.Trace(err) } - if err := w.isReorgRunnable(reorgInfo.Job); err != nil { + if err := dc.isReorgRunnable(reorgInfo.Job); err != nil { return errors.Trace(err) } if startKey == nil && endKey == nil { @@ -616,7 +616,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba defer func() { closeBackfillWorkers(backfillWorkers) }() - jc := w.jobContext(job) + jc := dc.jobContext(job) for { kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey) @@ -625,7 +625,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba } // For dynamic adjust backfill worker number. - if err := loadDDLReorgVars(w); err != nil { + if err := loadDDLReorgVars(dc.ctx, sessPool); err != nil { logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err)) } workerCnt = variable.GetDDLReorgWorkerCounter() @@ -706,7 +706,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba zap.Int("regionCnt", len(kvRanges)), zap.String("startHandle", tryDecodeToHandleString(startKey)), zap.String("endHandle", tryDecodeToHandleString(endKey))) - remains, err := w.sendRangeTaskToWorkers(t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges) + remains, err := dc.sendRangeTaskToWorkers(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges) if err != nil { return errors.Trace(err) } diff --git a/ddl/column.go b/ddl/column.go index e3ccf6ed9b8da..20ab23273d414 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1004,7 +1004,7 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeUpdateColumnWorker, reorgInfo) + return w.writePhysicalTableRecord(t, w.sessPooltypeUpdateColumnWorker, reorgInfo) } // TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started. diff --git a/ddl/index.go b/ddl/index.go index 85b860ac4e5d4..a391f26a793d8 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1344,7 +1344,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeAddIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(t, w.sessPooltypeAddIndexWorker, reorgInfo) } // addTableIndex handles the add index reorganization state for a table. @@ -1547,7 +1547,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t // cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition. func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeCleanUpIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(t, w.sessPooltypeCleanUpIndexWorker, reorgInfo) } // cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions. From 06297b9e140dd5f1ede5bac23be2999d101c6150 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 1 Sep 2022 09:52:43 +0800 Subject: [PATCH 2/6] ddl: update functions position --- ddl/backfilling.go | 94 +++++++++++++++++++++++----------------------- 1 file changed, 47 insertions(+), 47 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 40d8e928fa476..19b947267bfce 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -143,6 +143,31 @@ type backfillTaskContext struct { warningsCount map[errors.ErrorID]int64 } +type reorgBackfillTask struct { + physicalTableID int64 + startKey kv.Key + endKey kv.Key + endInclude bool +} + +func (r *reorgBackfillTask) String() string { + physicalID := strconv.FormatInt(r.physicalTableID, 10) + startKey := tryDecodeToHandleString(r.startKey) + endKey := tryDecodeToHandleString(r.endKey) + rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey + if r.endInclude { + return rangeStr + "]" + } + return rangeStr + ")" +} + +// mergeBackfillCtxToResult merge partial result in taskCtx into result. +func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) { + result.nextKey = taskCtx.nextKey + result.addedCount += taskCtx.addedCount + result.scanCount += taskCtx.scanCount +} + type backfillWorker struct { id int reorgInfo *reorgInfo @@ -181,53 +206,6 @@ func closeBackfillWorkers(workers []*backfillWorker) { } } -type reorgBackfillTask struct { - physicalTableID int64 - startKey kv.Key - endKey kv.Key - endInclude bool -} - -func (r *reorgBackfillTask) String() string { - physicalID := strconv.FormatInt(r.physicalTableID, 10) - startKey := tryDecodeToHandleString(r.startKey) - endKey := tryDecodeToHandleString(r.endKey) - rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey - if r.endInclude { - return rangeStr + "]" - } - return rangeStr + ")" -} - -func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) { - if threshold == 0 { - threshold = atomic.LoadUint32(&variable.DDLSlowOprThreshold) - } - - if elapsed >= time.Duration(threshold)*time.Millisecond { - logutil.BgLogger().Info("[ddl] slow operations", zap.Duration("takeTimes", elapsed), zap.String("msg", slowMsg)) - } -} - -// mergeBackfillCtxToResult merge partial result in taskCtx into result. -func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) { - result.nextKey = taskCtx.nextKey - result.addedCount += taskCtx.addedCount - result.scanCount += taskCtx.scanCount -} - -func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) { - for _, warn := range partWarnings { - if _, ok := totalWarningsCount[warn.ID()]; ok { - totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()] - } else { - totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()] - totalWarnings[warn.ID()] = warn - } - } - return totalWarnings, totalWarningsCount -} - // handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table. func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult { handleRange := *task @@ -805,3 +783,25 @@ func getRangeEndKey(ctx *JobContext, store kv.Storage, priority int, t table.Tab return it.Key(), nil } + +func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) { + for _, warn := range partWarnings { + if _, ok := totalWarningsCount[warn.ID()]; ok { + totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()] + } else { + totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()] + totalWarnings[warn.ID()] = warn + } + } + return totalWarnings, totalWarningsCount +} + +func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) { + if threshold == 0 { + threshold = atomic.LoadUint32(&variable.DDLSlowOprThreshold) + } + + if elapsed >= time.Duration(threshold)*time.Millisecond { + logutil.BgLogger().Info("[ddl] slow operations", zap.Duration("takeTimes", elapsed), zap.String("msg", slowMsg)) + } +} From 5c631d07fde37c93310b4dfa48f4997b8f8b2814 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 1 Sep 2022 11:41:06 +0800 Subject: [PATCH 3/6] ddl: add backfilWorkerPool --- ddl/column.go | 2 +- ddl/ddl_workerpool.go | 67 +++++++++++++++++++++++++++++++++++++- ddl/ddl_workerpool_test.go | 47 ++++++++++++++++++++++++++ ddl/index.go | 4 +-- 4 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 ddl/ddl_workerpool_test.go diff --git a/ddl/column.go b/ddl/column.go index 20ab23273d414..d12724c76e42f 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1004,7 +1004,7 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, w.sessPooltypeUpdateColumnWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo) } // TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started. diff --git a/ddl/ddl_workerpool.go b/ddl/ddl_workerpool.go index 2d931bf7823e7..d21501f3c6ba5 100644 --- a/ddl/ddl_workerpool.go +++ b/ddl/ddl_workerpool.go @@ -83,7 +83,72 @@ func (wp *workerPool) close() { wp.resPool.Close() } -// tp return the type of worker pool. +// tp return the type of backfill worker pool. func (wp *workerPool) tp() jobType { return wp.t } + +// backfilWorkerPool is used to new backfill worker. +type backfilWorkerPool struct { + exit atomic.Bool + resPool *pools.ResourcePool +} + +func newBackfillWorkerPool(resPool *pools.ResourcePool) *backfilWorkerPool { + return &backfilWorkerPool{ + exit: *atomic.NewBool(false), + resPool: resPool, + } +} + +// setCapacity changes the capacity of the pool. +// A setCapacity of 0 is equivalent to closing the backfilWorkerPool. +func (bwp *backfilWorkerPool) setCapacity(cap int) error { + return bwp.resPool.SetCapacity(cap) +} + +// get gets backfilWorkerPool from context resource pool. +// Please remember to call put after you finished using backfilWorkerPool. +func (bwp *backfilWorkerPool) get() (*backfillWorker, error) { + if bwp.resPool == nil { + return nil, nil + } + + if bwp.exit.Load() { + return nil, errors.Errorf("backfill worker pool is closed") + } + + // no need to protect bwp.resPool + resource, err := bwp.resPool.TryGet() + if err != nil { + return nil, errors.Trace(err) + } + if resource == nil { + return nil, nil + } + + worker := resource.(*backfillWorker) + return worker, nil +} + +// put returns workerPool to context resource pool. +func (bwp *backfilWorkerPool) put(wk *backfillWorker) { + if bwp.resPool == nil || bwp.exit.Load() { + return + } + + // No need to protect bwp.resPool, even the bwp.resPool is closed, the ctx still need to + // put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing. + bwp.resPool.Put(wk) +} + +// close clean up the backfilWorkerPool. +func (bwp *backfilWorkerPool) close() { + // Prevent closing resPool twice. + if bwp.resPool == nil || bwp.exit.Load() { + return + } + bwp.exit.Store(true) + logutil.BgLogger().Info("[ddl] closing workerPool") + bwp.resPool.Close() +} diff --git a/ddl/ddl_workerpool_test.go b/ddl/ddl_workerpool_test.go new file mode 100644 index 0000000000000..4d188962cff78 --- /dev/null +++ b/ddl/ddl_workerpool_test.go @@ -0,0 +1,47 @@ +package ddl + +import ( + "testing" + + "github.com/ngaut/pools" + "github.com/pingcap/tidb/parser/model" + "github.com/stretchr/testify/require" +) + +func TestBackfillWorkerPool(t *testing.T) { + reorgInfo := &reorgInfo{Job: &model.Job{ID: 1}} + f := func() func() (pools.Resource, error) { + return func() (pools.Resource, error) { + wk := newBackfillWorker(nil, 1, nil, reorgInfo) + return wk, nil + } + } + pool := newBackfillWorkerPool(pools.NewResourcePool(f(), 1, 2, 0)) + bwp, err := pool.get() + require.NoError(t, err) + require.Equal(t, 1, bwp.id) + // test it to reach the capacity + bwp1, err := pool.get() + require.NoError(t, err) + require.Nil(t, bwp1) + + // test setCapacity + err = pool.setCapacity(2) + require.NoError(t, err) + bwp1, err = pool.get() + require.NoError(t, err) + require.Equal(t, 1, bwp1.id) + pool.put(bwp) + pool.put(bwp1) + + // test close + pool.close() + pool.close() + require.Equal(t, true, pool.exit.Load()) + pool.put(bwp1) + + bwp, err = pool.get() + require.Error(t, err) + require.Equal(t, "backfill worker pool is closed", err.Error()) + require.Nil(t, bwp) +} diff --git a/ddl/index.go b/ddl/index.go index a391f26a793d8..3350c16a83c2c 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1344,7 +1344,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, w.sessPooltypeAddIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeAddIndexWorker, reorgInfo) } // addTableIndex handles the add index reorganization state for a table. @@ -1547,7 +1547,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t // cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition. func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, w.sessPooltypeCleanUpIndexWorker, reorgInfo) + return w.writePhysicalTableRecord(w.sessPool, t, typeCleanUpIndexWorker, reorgInfo) } // cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions. From bae73a29a81fd834a9ab0424677bdfeba7bbaf05 Mon Sep 17 00:00:00 2001 From: Lynn Date: Thu, 1 Sep 2022 13:40:48 +0800 Subject: [PATCH 4/6] ddl: rename function names --- ddl/backfilling.go | 12 ++++++------ ddl/ddl_workerpool_test.go | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 19b947267bfce..a0dea2aa03277 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -374,9 +374,9 @@ func waitTaskResults(workers []*backfillWorker, taskCnt int, return nextKey, addedCount, errors.Trace(firstErr) } -// handleReorgTasks sends tasks to workers, and waits for all the running workers to return results, +// deliveryTasksAndResults sends tasks to workers, and waits for all the running workers to return results, // there are taskCnt running workers. -func (dc *ddlCtx) handleReorgTasks(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { +func (dc *ddlCtx) deliveryTasksAndResults(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { for i, task := range batchTasks { workers[i].taskCh <- task } @@ -448,8 +448,8 @@ func tryDecodeToHandleString(key kv.Key) string { return handle.String() } -// sendRangeTaskToWorkers sends tasks to workers, and returns remaining kvRanges that is not handled. -func (dc *ddlCtx) sendRangeTaskToWorkers(sessPool *sessionPool, t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo, +// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled. +func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo, totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) { batchTasks := make([]*reorgBackfillTask, 0, len(workers)) physicalTableID := reorgInfo.PhysicalTableID @@ -484,7 +484,7 @@ func (dc *ddlCtx) sendRangeTaskToWorkers(sessPool *sessionPool, t table.Table, w } // Wait tasks finish. - err := dc.handleReorgTasks(sessPool, reorgInfo, totalAddedCount, workers, batchTasks) + err := dc.deliveryTasksAndResults(sessPool, reorgInfo, totalAddedCount, workers, batchTasks) if err != nil { return nil, errors.Trace(err) } @@ -684,7 +684,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic zap.Int("regionCnt", len(kvRanges)), zap.String("startHandle", tryDecodeToHandleString(startKey)), zap.String("endHandle", tryDecodeToHandleString(endKey))) - remains, err := dc.sendRangeTaskToWorkers(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges) + remains, err := dc.handleRangeTasks(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges) if err != nil { return errors.Trace(err) } diff --git a/ddl/ddl_workerpool_test.go b/ddl/ddl_workerpool_test.go index 4d188962cff78..e83605fe6a0ba 100644 --- a/ddl/ddl_workerpool_test.go +++ b/ddl/ddl_workerpool_test.go @@ -1,3 +1,17 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package ddl import ( From 8f5985ed6498f39cbc4bf4a9898700cc0956feb0 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 6 Sep 2022 10:27:42 +0800 Subject: [PATCH 5/6] ddl: address comments --- ddl/backfilling.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index a0dea2aa03277..6a442b13d6ee3 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -374,9 +374,9 @@ func waitTaskResults(workers []*backfillWorker, taskCnt int, return nextKey, addedCount, errors.Trace(firstErr) } -// deliveryTasksAndResults sends tasks to workers, and waits for all the running workers to return results, +// sendTasksAndWait sends tasks to workers, and waits for all the running workers to return results, // there are taskCnt running workers. -func (dc *ddlCtx) deliveryTasksAndResults(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { +func (dc *ddlCtx) sendTasksAndWait(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error { for i, task := range batchTasks { workers[i].taskCh <- task } @@ -484,7 +484,7 @@ func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers } // Wait tasks finish. - err := dc.deliveryTasksAndResults(sessPool, reorgInfo, totalAddedCount, workers, batchTasks) + err := dc.sendTasksAndWait(sessPool, reorgInfo, totalAddedCount, workers, batchTasks) if err != nil { return nil, errors.Trace(err) } @@ -509,7 +509,6 @@ var ( func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error { // Get sessionctx from context resource pool. - var sCtx sessionctx.Context sCtx, err := sessPool.get() if err != nil { return errors.Trace(err) From 825798c1baee47d6c72aeb9d78d7b50649b90075 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 6 Sep 2022 13:38:33 +0800 Subject: [PATCH 6/6] ddl: tiny update --- ddl/ddl_workerpool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/ddl_workerpool.go b/ddl/ddl_workerpool.go index b174827e4b1be..de709b6faeb3b 100644 --- a/ddl/ddl_workerpool.go +++ b/ddl/ddl_workerpool.go @@ -103,8 +103,8 @@ func newBackfillWorkerPool(resPool *pools.ResourcePool) *backfilWorkerPool { // setCapacity changes the capacity of the pool. // A setCapacity of 0 is equivalent to closing the backfilWorkerPool. -func (bwp *backfilWorkerPool) setCapacity(cap int) error { - return bwp.resPool.SetCapacity(cap) +func (bwp *backfilWorkerPool) setCapacity(capacity int) error { + return bwp.resPool.SetCapacity(capacity) } // get gets backfilWorkerPool from context resource pool.