Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: decoupling of backfill worker and reorg worker codes, add backfill workerPool #37538

Merged
merged 11 commits into from
Sep 6, 2022
133 changes: 66 additions & 67 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,31 @@ type backfillTaskContext struct {
warningsCount map[errors.ErrorID]int64
}

type reorgBackfillTask struct {
physicalTableID int64
startKey kv.Key
endKey kv.Key
endInclude bool
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
endKey := tryDecodeToHandleString(r.endKey)
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
if r.endInclude {
return rangeStr + "]"
}
return rangeStr + ")"
}

// mergeBackfillCtxToResult merge partial result in taskCtx into result.
func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) {
result.nextKey = taskCtx.nextKey
result.addedCount += taskCtx.addedCount
result.scanCount += taskCtx.scanCount
}

type backfillWorker struct {
id int
reorgInfo *reorgInfo
Expand Down Expand Up @@ -181,53 +206,6 @@ func closeBackfillWorkers(workers []*backfillWorker) {
}
}

type reorgBackfillTask struct {
physicalTableID int64
startKey kv.Key
endKey kv.Key
endInclude bool
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
endKey := tryDecodeToHandleString(r.endKey)
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
if r.endInclude {
return rangeStr + "]"
}
return rangeStr + ")"
}

func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
if threshold == 0 {
threshold = atomic.LoadUint32(&variable.DDLSlowOprThreshold)
}

if elapsed >= time.Duration(threshold)*time.Millisecond {
logutil.BgLogger().Info("[ddl] slow operations", zap.Duration("takeTimes", elapsed), zap.String("msg", slowMsg))
}
}

// mergeBackfillCtxToResult merge partial result in taskCtx into result.
func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) {
result.nextKey = taskCtx.nextKey
result.addedCount += taskCtx.addedCount
result.scanCount += taskCtx.scanCount
}

func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
for _, warn := range partWarnings {
if _, ok := totalWarningsCount[warn.ID()]; ok {
totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()]
} else {
totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()]
totalWarnings[warn.ID()] = warn
}
}
return totalWarnings, totalWarningsCount
}

// handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table.
func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult {
handleRange := *task
Expand Down Expand Up @@ -365,7 +343,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
return ranges, nil
}

func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
func waitTaskResults(workers []*backfillWorker, taskCnt int,
totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) {
var (
addedCount int64
Expand Down Expand Up @@ -396,25 +374,25 @@ func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
return nextKey, addedCount, errors.Trace(firstErr)
}

// handleReorgTasks sends tasks to workers, and waits for all the running workers to return results,
// sendTasksAndWait sends tasks to workers, and waits for all the running workers to return results,
// there are taskCnt running workers.
func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error {
func (dc *ddlCtx) sendTasksAndWait(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error {
for i, task := range batchTasks {
workers[i].taskCh <- task
}

startKey := batchTasks[0].startKey
taskCnt := len(batchTasks)
startTime := time.Now()
nextKey, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startKey)
nextKey, taskAddedCount, err := waitTaskResults(workers, taskCnt, totalAddedCount, startKey)
elapsedTime := time.Since(startTime)
if err == nil {
err = w.isReorgRunnable(reorgInfo.Job)
err = dc.isReorgRunnable(reorgInfo.Job)
}

if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, w.sessPool)
err1 := reorgInfo.UpdateReorgMeta(nextKey, sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand All @@ -430,7 +408,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
}

// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
w.getReorgCtx(reorgInfo.Job).setNextKey(nextKey)
dc.getReorgCtx(reorgInfo.Job).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand Down Expand Up @@ -470,8 +448,8 @@ func tryDecodeToHandleString(key kv.Key) string {
return handle.String()
}

// sendRangeTaskToWorkers sends tasks to workers, and returns remaining kvRanges that is not handled.
func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo,
// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := make([]*reorgBackfillTask, 0, len(workers))
physicalTableID := reorgInfo.PhysicalTableID
Expand Down Expand Up @@ -506,7 +484,7 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker
}

// Wait tasks finish.
err := w.handleReorgTasks(reorgInfo, totalAddedCount, workers, batchTasks)
err := dc.sendTasksAndWait(sessPool, reorgInfo, totalAddedCount, workers, batchTasks)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -529,15 +507,14 @@ var (
TestCheckReorgTimeout = int32(0)
)

func loadDDLReorgVars(w *worker) error {
func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
sCtx, err := sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)
return ddlutil.LoadDDLReorgVars(w.ctx, ctx)
defer sessPool.put(sCtx)
return ddlutil.LoadDDLReorgVars(ctx, sCtx)
}

func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) {
Expand Down Expand Up @@ -585,7 +562,7 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

Expand All @@ -596,7 +573,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
return errors.Trace(err)
}

if err := w.isReorgRunnable(reorgInfo.Job); err != nil {
if err := dc.isReorgRunnable(reorgInfo.Job); err != nil {
return errors.Trace(err)
}
if startKey == nil && endKey == nil {
Expand All @@ -616,7 +593,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
defer func() {
closeBackfillWorkers(backfillWorkers)
}()
jc := w.jobContext(job)
jc := dc.jobContext(job)

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey)
Expand All @@ -625,7 +602,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
}

// For dynamic adjust backfill worker number.
if err := loadDDLReorgVars(w); err != nil {
if err := loadDDLReorgVars(dc.ctx, sessPool); err != nil {
logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err))
}
workerCnt = variable.GetDDLReorgWorkerCounter()
Expand Down Expand Up @@ -706,7 +683,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
zap.Int("regionCnt", len(kvRanges)),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("endHandle", tryDecodeToHandleString(endKey)))
remains, err := w.sendRangeTaskToWorkers(t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges)
remains, err := dc.handleRangeTasks(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -805,3 +782,25 @@ func getRangeEndKey(ctx *JobContext, store kv.Storage, priority int, t table.Tab

return it.Key(), nil
}

func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
for _, warn := range partWarnings {
if _, ok := totalWarningsCount[warn.ID()]; ok {
totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()]
} else {
totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()]
totalWarnings[warn.ID()] = warn
}
}
return totalWarnings, totalWarningsCount
}

func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
if threshold == 0 {
threshold = atomic.LoadUint32(&variable.DDLSlowOprThreshold)
}

if elapsed >= time.Duration(threshold)*time.Millisecond {
logutil.BgLogger().Info("[ddl] slow operations", zap.Duration("takeTimes", elapsed), zap.String("msg", slowMsg))
}
}
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf

func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(t, typeUpdateColumnWorker, reorgInfo)
return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo)
}

// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
Expand Down
67 changes: 66 additions & 1 deletion ddl/ddl_workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,72 @@ func (wp *workerPool) close() {
wp.resPool.Close()
}

// tp return the type of worker pool.
// tp return the type of backfill worker pool.
func (wp *workerPool) tp() jobType {
return wp.t
}

// backfilWorkerPool is used to new backfill worker.
type backfilWorkerPool struct {
exit atomic.Bool
resPool *pools.ResourcePool
}

func newBackfillWorkerPool(resPool *pools.ResourcePool) *backfilWorkerPool {
return &backfilWorkerPool{
exit: *atomic.NewBool(false),
resPool: resPool,
}
}

// setCapacity changes the capacity of the pool.
// A setCapacity of 0 is equivalent to closing the backfilWorkerPool.
func (bwp *backfilWorkerPool) setCapacity(cap int) error {
return bwp.resPool.SetCapacity(cap)
}

// get gets backfilWorkerPool from context resource pool.
// Please remember to call put after you finished using backfilWorkerPool.
func (bwp *backfilWorkerPool) get() (*backfillWorker, error) {
if bwp.resPool == nil {
return nil, nil
}

if bwp.exit.Load() {
return nil, errors.Errorf("backfill worker pool is closed")
}

// no need to protect bwp.resPool
resource, err := bwp.resPool.TryGet()
if err != nil {
return nil, errors.Trace(err)
}
if resource == nil {
return nil, nil
}

worker := resource.(*backfillWorker)
return worker, nil
}

// put returns workerPool to context resource pool.
func (bwp *backfilWorkerPool) put(wk *backfillWorker) {
if bwp.resPool == nil || bwp.exit.Load() {
return
}

// No need to protect bwp.resPool, even the bwp.resPool is closed, the ctx still need to
// put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing.
bwp.resPool.Put(wk)
}

// close clean up the backfilWorkerPool.
func (bwp *backfilWorkerPool) close() {
// Prevent closing resPool twice.
if bwp.resPool == nil || bwp.exit.Load() {
return
}
bwp.exit.Store(true)
logutil.BgLogger().Info("[ddl] closing workerPool")
bwp.resPool.Close()
}
42 changes: 41 additions & 1 deletion ddl/ddl_workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"testing"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

func TestBackfillWorkerPool(t *testing.T) {
func TestDDLWorkerPool(t *testing.T) {
f := func() func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newWorker(nil, addIdxWorker, nil, nil, nil, true)
Expand All @@ -31,3 +33,41 @@ func TestBackfillWorkerPool(t *testing.T) {
pool.close()
pool.put(nil)
}

func TestBackfillWorkerPool(t *testing.T) {
reorgInfo := &reorgInfo{Job: &model.Job{ID: 1}}
f := func() func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newBackfillWorker(nil, 1, nil, reorgInfo)
return wk, nil
}
}
pool := newBackfillWorkerPool(pools.NewResourcePool(f(), 1, 2, 0))
bwp, err := pool.get()
require.NoError(t, err)
require.Equal(t, 1, bwp.id)
// test it to reach the capacity
bwp1, err := pool.get()
require.NoError(t, err)
require.Nil(t, bwp1)

// test setCapacity
err = pool.setCapacity(2)
require.NoError(t, err)
bwp1, err = pool.get()
require.NoError(t, err)
require.Equal(t, 1, bwp1.id)
pool.put(bwp)
pool.put(bwp1)

// test close
pool.close()
pool.close()
require.Equal(t, true, pool.exit.Load())
pool.put(bwp1)

bwp, err = pool.get()
require.Error(t, err)
require.Equal(t, "backfill worker pool is closed", err.Error())
require.Nil(t, bwp)
}
Loading