Skip to content

Commit

Permalink
ddl: address comments and tiny update
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Feb 8, 2023
1 parent 4b69f6c commit e07d255
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 36 deletions.
49 changes: 25 additions & 24 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -252,17 +253,15 @@ type backfillResult struct {
type reorgBackfillTask struct {
bfJob *BackfillJob
physicalTable table.PhysicalTable
index table.Index

// TODO: Remove the following fields after remove the function of run.
id int
physicalTableID int64
startKey kv.Key
endKey kv.Key
endInclude bool
jobID int64
sqlQuery string
priority int
id int
startKey kv.Key
endKey kv.Key
endInclude bool
jobID int64
sqlQuery string
priority int
}

func (r *reorgBackfillTask) getJobID() int64 {
Expand All @@ -281,7 +280,7 @@ func (r *reorgBackfillTask) excludedEndKey() kv.Key {
}

func (r *reorgBackfillTask) String() string {
physicalID := strconv.FormatInt(r.physicalTableID, 10)
physicalID := strconv.FormatInt(r.physicalTable.GetPhysicalID(), 10)
startKey := hex.EncodeToString(r.startKey)
endKey := hex.EncodeToString(r.endKey)
rangeStr := "taskID_" + strconv.Itoa(r.id) + "_physicalTableID_" + physicalID + "_" + "[" + startKey + "," + endKey
Expand Down Expand Up @@ -369,6 +368,9 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
rc := d.getReorgCtx(jobID)

isDistReorg := task.bfJob != nil
if isDistReorg {
w.initPartitionIndexInfo(task)
}
for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillDataInTxn we will never cancel the job.
Expand Down Expand Up @@ -440,10 +442,11 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
return result
}

func (w *backfillWorker) updatePartitionIndexInfo(task *reorgBackfillTask) {
if _, ok := w.GetCtx().table.(table.PartitionedTable); ok {
func (w *backfillWorker) initPartitionIndexInfo(task *reorgBackfillTask) {
if pt, ok := w.GetCtx().table.(table.PartitionedTable); ok {
if addIdxWorker, ok := w.backfiller.(*addIndexWorker); ok {
addIdxWorker.index = task.index
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, task.bfJob.EleID)
addIdxWorker.index = tables.NewIndex(task.bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
}
}
Expand All @@ -469,7 +472,6 @@ func (w *backfillWorker) runTask(task *reorgBackfillTask) (result *backfillResul
time.Sleep(100 * time.Millisecond)
})

w.updatePartitionIndexInfo(task)
// Change the batch size dynamically.
w.GetCtx().batchCnt = int(variable.GetDDLReorgBatchSize())
result = w.handleBackfillTask(w.GetCtx().ddlCtx, task, w.backfiller)
Expand Down Expand Up @@ -678,7 +680,7 @@ func (dc *ddlCtx) sendTasksAndWait(scheduler *backfillScheduler, totalAddedCount
return nil
}

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, pID int64, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange, batch int) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, batch)
var prefix kv.Key
if reorgInfo.mergingTmpIdx {
Expand Down Expand Up @@ -710,13 +712,12 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, pID int64, kvRanges []kv
}

task := &reorgBackfillTask{
id: i,
jobID: job.ID,
physicalTableID: pID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
id: i,
jobID: job.ID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
startKey: startKey,
endKey: endKey,
// If the boundaries overlap, we should ignore the preceding endKey.
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1}
batchTasks = append(batchTasks, task)
Expand All @@ -731,7 +732,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, pID int64, kvRanges []kv
// handleRangeTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
totalAddedCount *int64, kvRanges []kv.KeyRange) ([]kv.KeyRange, error) {
batchTasks := getBatchTasks(t, scheduler.reorgInfo, scheduler.reorgInfo.PhysicalTableID, kvRanges, backfillTaskChanSize)
batchTasks := getBatchTasks(t, scheduler.reorgInfo, kvRanges, backfillTaskChanSize)
if len(batchTasks) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -1171,7 +1172,7 @@ func (dc *ddlCtx) splitTableToBackfillJobs(sess *session, reorgInfo *reorgInfo,
if err != nil {
return errors.Trace(err)
}
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, pTblMeta.PhyTblID, kvRanges, batchSize)
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, batchSize)
if len(batchTasks) == 0 {
break
}
Expand Down
3 changes: 3 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ var JobNeedGCForTest = jobNeedGC
// NewSession is only used for test.
var NewSession = newSession

// GetJobWithoutPartition is only used for test.
const GetJobWithoutPartition = getJobWithoutPartition

// GetDDLCtx returns ddlCtx for test.
func GetDDLCtx(d DDL) *ddlCtx {
return d.(*ddl).ddlCtx
Expand Down
16 changes: 5 additions & 11 deletions ddl/dist_backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/gpool/spmc"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

const GetJobWithoutPartition = -1
const getJobWithoutPartition = -1

type backfillWorkerContext struct {
currID int
Expand Down Expand Up @@ -132,7 +131,7 @@ func runBackfillJobs(d *ddl, sess *session, ingestBackendCtx *ingest.BackendCont
runningPID := int64(0)
// If txn-merge we needn't to claim the backfill job through the partition table
if ingestBackendCtx == nil {
runningPID = GetJobWithoutPartition
runningPID = getJobWithoutPartition
}
proFunc := func() ([]*reorgBackfillTask, error) {
// TODO: After BackfillJob replaces reorgBackfillTask, use backfiller's GetTasks instead of it.
Expand Down Expand Up @@ -218,20 +217,15 @@ func (bwm *backfilWorkerManager) close(d *ddl) error {
// backfillJob2Task builds reorg task.
func (dc *ddlCtx) backfillJob2Task(t table.Table, bfJob *BackfillJob) (*reorgBackfillTask, error) {
pt := t.(table.PhysicalTable)
var idx table.Index
if tbl, ok := t.(table.PartitionedTable); ok {
pt = tbl.GetPartition(bfJob.PhysicalTableID)
if pt == nil {
return nil, dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", bfJob.PhysicalTableID, t.Meta().ID)
}
indexInfo := model.FindIndexInfoByID(pt.Meta().Indices, bfJob.EleID)
idx = tables.NewIndex(bfJob.PhysicalTableID, pt.Meta(), indexInfo)
}
return &reorgBackfillTask{
bfJob: bfJob,
index: idx,
physicalTable: pt,
physicalTableID: bfJob.PhysicalTableID,
bfJob: bfJob,
physicalTable: pt,
// TODO: Remove these fields after remove the old logic.
sqlQuery: bfJob.Meta.Query,
startKey: bfJob.StartKey,
Expand Down Expand Up @@ -259,7 +253,7 @@ func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, run
}
}

if *runningPID != GetJobWithoutPartition {
if *runningPID != getJobWithoutPartition {
*runningPID = bJobs[0].PhysicalTableID
}
tasks := make([]*reorgBackfillTask, 0, len(bJobs))
Expand Down
2 changes: 1 addition & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ func GetAndMarkBackfillJobsForOneEle(s *session, batch int, jobID int64, uuid st

getJobsSQL := fmt.Sprintf("(exec_ID = '' or exec_lease < '%v') and ddl_job_id = %d order by ddl_job_id, ele_id, ele_key limit %d",
leaseStr, jobID, batch)
if pTblID != GetJobWithoutPartition {
if pTblID != getJobWithoutPartition {
if pTblID == 0 {
rows, err := s.execute(context.Background(),
fmt.Sprintf("select ddl_physical_tid from mysql.%s group by ddl_job_id, ele_id, ele_key, ddl_physical_tid having sum(length(exec_id)) = 0 or max(exec_lease) < '%s' order by ddl_job_id, ele_key, ele_id, ddl_physical_tid limit 1",
Expand Down

0 comments on commit e07d255

Please sign in to comment.