Skip to content

Commit

Permalink
ddl: decoupling of backfill worker and reorg worker codes, add backfi…
Browse files Browse the repository at this point in the history
…ll workerPool (#37538)
  • Loading branch information
zimulala authored Sep 6, 2022
1 parent 70684d3 commit 737cd0e
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 72 deletions.
133 changes: 66 additions & 67 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,31 @@ type backfillTaskContext struct {
warningsCount map[errors.ErrorID]int64
}

type reorgBackfillTask struct {
physicalTableID int64
startKey kv.Key
endKey kv.Key
endInclude bool
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
endKey := tryDecodeToHandleString(r.endKey)
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
if r.endInclude {
return rangeStr + "]"
}
return rangeStr + ")"
}

// mergeBackfillCtxToResult merge partial result in taskCtx into result.
func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) {
result.nextKey = taskCtx.nextKey
result.addedCount += taskCtx.addedCount
result.scanCount += taskCtx.scanCount
}

type backfillWorker struct {
id int
reorgInfo *reorgInfo
Expand Down Expand Up @@ -181,53 +206,6 @@ func closeBackfillWorkers(workers []*backfillWorker) {
}
}

type reorgBackfillTask struct {
physicalTableID int64
startKey kv.Key
endKey kv.Key
endInclude bool
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
startKey := tryDecodeToHandleString(r.startKey)
endKey := tryDecodeToHandleString(r.endKey)
rangeStr := "physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
if r.endInclude {
return rangeStr + "]"
}
return rangeStr + ")"
}

func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
if threshold == 0 {
threshold = atomic.LoadUint32(&variable.DDLSlowOprThreshold)
}

if elapsed >= time.Duration(threshold)*time.Millisecond {
logutil.BgLogger().Info("[ddl] slow operations", zap.Duration("takeTimes", elapsed), zap.String("msg", slowMsg))
}
}

// mergeBackfillCtxToResult merge partial result in taskCtx into result.
func mergeBackfillCtxToResult(taskCtx *backfillTaskContext, result *backfillResult) {
result.nextKey = taskCtx.nextKey
result.addedCount += taskCtx.addedCount
result.scanCount += taskCtx.scanCount
}

func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
for _, warn := range partWarnings {
if _, ok := totalWarningsCount[warn.ID()]; ok {
totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()]
} else {
totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()]
totalWarnings[warn.ID()] = warn
}
}
return totalWarnings, totalWarningsCount
}

// handleBackfillTask backfills range [task.startHandle, task.endHandle) handle's index to table.
func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, bf backfiller) *backfillResult {
handleRange := *task
Expand Down Expand Up @@ -365,7 +343,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
return ranges, nil
}

func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
func waitTaskResults(workers []*backfillWorker, taskCnt int,
totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) {
var (
addedCount int64
Expand Down Expand Up @@ -396,25 +374,25 @@ func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
return nextKey, addedCount, errors.Trace(firstErr)
}

// handleReorgTasks sends tasks to workers, and waits for all the running workers to return results,
// sendTasksAndWait sends tasks to workers, and waits for all the running workers to return results,
// there are taskCnt running workers.
func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error {
func (dc *ddlCtx) sendTasksAndWait(sessPool *sessionPool, reorgInfo *reorgInfo, totalAddedCount *int64, workers []*backfillWorker, batchTasks []*reorgBackfillTask) error {
for i, task := range batchTasks {
workers[i].taskCh <- task
}

startKey := batchTasks[0].startKey
taskCnt := len(batchTasks)
startTime := time.Now()
nextKey, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startKey)
nextKey, taskAddedCount, err := waitTaskResults(workers, taskCnt, totalAddedCount, startKey)
elapsedTime := time.Since(startTime)
if err == nil {
err = w.isReorgRunnable(reorgInfo.Job)
err = dc.isReorgRunnable(reorgInfo.Job)
}

if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey, w.sessPool)
err1 := reorgInfo.UpdateReorgMeta(nextKey, sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand All @@ -430,7 +408,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
}

// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
w.getReorgCtx(reorgInfo.Job).setNextKey(nextKey)
dc.getReorgCtx(reorgInfo.Job).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand Down Expand Up @@ -470,8 +448,8 @@ func tryDecodeToHandleString(key kv.Key) string {
return handle.String()
}

// sendRangeTaskToWorkers sends tasks to workers, and returns remaining kvRanges that is not handled.
func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo,
// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers []*backfillWorker, reorgInfo *reorgInfo,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := make([]*reorgBackfillTask, 0, len(workers))
physicalTableID := reorgInfo.PhysicalTableID
Expand Down Expand Up @@ -506,7 +484,7 @@ func (w *worker) sendRangeTaskToWorkers(t table.Table, workers []*backfillWorker
}

// Wait tasks finish.
err := w.handleReorgTasks(reorgInfo, totalAddedCount, workers, batchTasks)
err := dc.sendTasksAndWait(sessPool, reorgInfo, totalAddedCount, workers, batchTasks)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -529,15 +507,14 @@ var (
TestCheckReorgTimeout = int32(0)
)

func loadDDLReorgVars(w *worker) error {
func loadDDLReorgVars(ctx context.Context, sessPool *sessionPool) error {
// Get sessionctx from context resource pool.
var ctx sessionctx.Context
ctx, err := w.sessPool.get()
sCtx, err := sessPool.get()
if err != nil {
return errors.Trace(err)
}
defer w.sessPool.put(ctx)
return ddlutil.LoadDDLReorgVars(w.ctx, ctx)
defer sessPool.put(sCtx)
return ddlutil.LoadDDLReorgVars(ctx, sCtx)
}

func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) {
Expand Down Expand Up @@ -585,7 +562,7 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

Expand All @@ -596,7 +573,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
return errors.Trace(err)
}

if err := w.isReorgRunnable(reorgInfo.Job); err != nil {
if err := dc.isReorgRunnable(reorgInfo.Job); err != nil {
return errors.Trace(err)
}
if startKey == nil && endKey == nil {
Expand All @@ -616,7 +593,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
defer func() {
closeBackfillWorkers(backfillWorkers)
}()
jc := w.jobContext(job)
jc := dc.jobContext(job)

for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey)
Expand All @@ -625,7 +602,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
}

// For dynamic adjust backfill worker number.
if err := loadDDLReorgVars(w); err != nil {
if err := loadDDLReorgVars(dc.ctx, sessPool); err != nil {
logutil.BgLogger().Error("[ddl] load DDL reorganization variable failed", zap.Error(err))
}
workerCnt = variable.GetDDLReorgWorkerCounter()
Expand Down Expand Up @@ -706,7 +683,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
zap.Int("regionCnt", len(kvRanges)),
zap.String("startHandle", tryDecodeToHandleString(startKey)),
zap.String("endHandle", tryDecodeToHandleString(endKey)))
remains, err := w.sendRangeTaskToWorkers(t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges)
remains, err := dc.handleRangeTasks(sessPool, t, backfillWorkers, reorgInfo, &totalAddedCount, kvRanges)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -805,3 +782,25 @@ func getRangeEndKey(ctx *JobContext, store kv.Storage, priority int, t table.Tab

return it.Key(), nil
}

func mergeWarningsAndWarningsCount(partWarnings, totalWarnings map[errors.ErrorID]*terror.Error, partWarningsCount, totalWarningsCount map[errors.ErrorID]int64) (map[errors.ErrorID]*terror.Error, map[errors.ErrorID]int64) {
for _, warn := range partWarnings {
if _, ok := totalWarningsCount[warn.ID()]; ok {
totalWarningsCount[warn.ID()] += partWarningsCount[warn.ID()]
} else {
totalWarningsCount[warn.ID()] = partWarningsCount[warn.ID()]
totalWarnings[warn.ID()] = warn
}
}
return totalWarnings, totalWarningsCount
}

func logSlowOperations(elapsed time.Duration, slowMsg string, threshold uint32) {
if threshold == 0 {
threshold = atomic.LoadUint32(&variable.DDLSlowOprThreshold)
}

if elapsed >= time.Duration(threshold)*time.Millisecond {
logutil.BgLogger().Info("[ddl] slow operations", zap.Duration("takeTimes", elapsed), zap.String("msg", slowMsg))
}
}
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf

func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(t, typeUpdateColumnWorker, reorgInfo)
return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo)
}

// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
Expand Down
67 changes: 66 additions & 1 deletion ddl/ddl_workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,72 @@ func (wp *workerPool) close() {
wp.resPool.Close()
}

// tp return the type of worker pool.
// tp return the type of backfill worker pool.
func (wp *workerPool) tp() jobType {
return wp.t
}

// backfilWorkerPool is used to new backfill worker.
type backfilWorkerPool struct {
exit atomic.Bool
resPool *pools.ResourcePool
}

func newBackfillWorkerPool(resPool *pools.ResourcePool) *backfilWorkerPool {
return &backfilWorkerPool{
exit: *atomic.NewBool(false),
resPool: resPool,
}
}

// setCapacity changes the capacity of the pool.
// A setCapacity of 0 is equivalent to closing the backfilWorkerPool.
func (bwp *backfilWorkerPool) setCapacity(capacity int) error {
return bwp.resPool.SetCapacity(capacity)
}

// get gets backfilWorkerPool from context resource pool.
// Please remember to call put after you finished using backfilWorkerPool.
func (bwp *backfilWorkerPool) get() (*backfillWorker, error) {
if bwp.resPool == nil {
return nil, nil
}

if bwp.exit.Load() {
return nil, errors.Errorf("backfill worker pool is closed")
}

// no need to protect bwp.resPool
resource, err := bwp.resPool.TryGet()
if err != nil {
return nil, errors.Trace(err)
}
if resource == nil {
return nil, nil
}

worker := resource.(*backfillWorker)
return worker, nil
}

// put returns workerPool to context resource pool.
func (bwp *backfilWorkerPool) put(wk *backfillWorker) {
if bwp.resPool == nil || bwp.exit.Load() {
return
}

// No need to protect bwp.resPool, even the bwp.resPool is closed, the ctx still need to
// put into resPool, because when resPool is closing, it will wait all the ctx returns, then resPool finish closing.
bwp.resPool.Put(wk)
}

// close clean up the backfilWorkerPool.
func (bwp *backfilWorkerPool) close() {
// Prevent closing resPool twice.
if bwp.resPool == nil || bwp.exit.Load() {
return
}
bwp.exit.Store(true)
logutil.BgLogger().Info("[ddl] closing workerPool")
bwp.resPool.Close()
}
42 changes: 41 additions & 1 deletion ddl/ddl_workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"testing"

"github.com/ngaut/pools"
"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

func TestBackfillWorkerPool(t *testing.T) {
func TestDDLWorkerPool(t *testing.T) {
f := func() func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newWorker(nil, addIdxWorker, nil, nil, nil, true)
Expand All @@ -31,3 +33,41 @@ func TestBackfillWorkerPool(t *testing.T) {
pool.close()
pool.put(nil)
}

func TestBackfillWorkerPool(t *testing.T) {
reorgInfo := &reorgInfo{Job: &model.Job{ID: 1}}
f := func() func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newBackfillWorker(nil, 1, nil, reorgInfo)
return wk, nil
}
}
pool := newBackfillWorkerPool(pools.NewResourcePool(f(), 1, 2, 0))
bwp, err := pool.get()
require.NoError(t, err)
require.Equal(t, 1, bwp.id)
// test it to reach the capacity
bwp1, err := pool.get()
require.NoError(t, err)
require.Nil(t, bwp1)

// test setCapacity
err = pool.setCapacity(2)
require.NoError(t, err)
bwp1, err = pool.get()
require.NoError(t, err)
require.Equal(t, 1, bwp1.id)
pool.put(bwp)
pool.put(bwp1)

// test close
pool.close()
pool.close()
require.Equal(t, true, pool.exit.Load())
pool.put(bwp1)

bwp, err = pool.get()
require.Error(t, err)
require.Equal(t, "backfill worker pool is closed", err.Error())
require.Nil(t, bwp)
}
Loading

0 comments on commit 737cd0e

Please sign in to comment.